🌊 Migration on read: Fix missing cases (#221101)

Follow-up of https://github.com/elastic/kibana/pull/220878

The previous PR didn't fix all places where we load stream definition
and then assert its schema. This PR adds the same logic to the remaining
places and extends the test to hit those.

Now all should be captured.

To test, you can log in as `system_indices_superuser:changeme` and
update the definition docs to something invalid, like here with a null
description:
```
POST .kibana_streams-000001/_update/logs
{
  "doc": {
    "name": "logs",
    "description": null,
    "ingest": {
      "lifecycle": {
        "dsl": {}
      },
      "processing": [],
      "wired": {
        "routing": [],
        "fields": {
          "@timestamp": {
            "type": "date"
          },
          "message": {
            "type": "match_only_text"
          },
          "host.name": {
            "type": "keyword"
          },
          "log.level": {
            "type": "keyword"
          },
          "stream.name": {
            "type": "system"
          }
        }
      }
    }
  }
}
```

On a follow-up I will move this logic into the storage adapter so it's
harder to cause problems in the future and add tests for the current
state of queries and asset links as well.
This commit is contained in:
Joe Reuter 2025-05-21 14:51:34 +02:00 committed by GitHub
parent e8494adf77
commit 972cc70fc9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 49 additions and 18 deletions

View file

@ -325,6 +325,15 @@ export class StreamsClient {
throw streamDefinition;
}
private getStreamDefinitionFromSource(source: Streams.all.Definition | undefined) {
if (!source) {
throw new DefinitionNotFoundError(`Cannot find stream definition`);
}
const migratedSource = migrateOnRead(source);
Streams.all.Definition.asserts(migratedSource);
return migratedSource;
}
/**
* Returns a stream definition for the given name:
* - if a wired stream definition exists
@ -340,9 +349,7 @@ export class StreamsClient {
try {
const response = await this.dependencies.storageClient.get({ id: name });
const streamDefinition = migrateOnRead(response._source!);
Streams.all.Definition.asserts(streamDefinition);
const streamDefinition = this.getStreamDefinitionFromSource(response._source);
if (Streams.ingest.all.Definition.is(streamDefinition)) {
const privileges = await checkAccess({
@ -373,9 +380,7 @@ export class StreamsClient {
private async getStoredStreamDefinition(name: string): Promise<Streams.all.Definition> {
return await Promise.all([
this.dependencies.storageClient.get({ id: name }).then((response) => {
const source = response._source!;
Streams.all.Definition.asserts(source);
return source;
return this.getStreamDefinitionFromSource(response._source);
}),
checkAccess({ name, scopedClusterClient: this.dependencies.scopedClusterClient }).then(
(privileges) => {
@ -566,11 +571,9 @@ export class StreamsClient {
query,
});
const streams = streamsSearchResponse.hits.hits.flatMap((hit) => {
const source = hit._source!;
Streams.all.Definition.asserts(source);
return source;
});
const streams = streamsSearchResponse.hits.hits.flatMap((hit) =>
this.getStreamDefinitionFromSource(hit._source)
);
const privileges = await checkAccessBulk({
names: streams

View file

@ -77,13 +77,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
before(async () => {
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
await enableStreams(apiClient);
});
after(async () => {
await disableStreams(apiClient);
});
it('should read and return existing orphaned classic stream', async () => {
await esClient.index({
index: '.kibana_streams-000001',
id: TEST_STREAM_NAME,
@ -92,6 +85,13 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
// Refresh the index to make the document searchable
await esClient.indices.refresh({ index: '.kibana_streams-000001' });
});
after(async () => {
await disableStreams(apiClient);
});
it('should read and return existing orphaned classic stream', async () => {
const getResponse = await apiClient.fetch('GET /api/streams/{name} 2023-10-31', {
params: {
path: { name: TEST_STREAM_NAME },
@ -100,6 +100,20 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
expect(getResponse.status).to.eql(200);
expect(getResponse.body.stream).to.eql(expectedStreamsResponse);
const listResponse = await apiClient.fetch('GET /api/streams 2023-10-31');
expect(listResponse.status).to.eql(200);
expect(listResponse.body.streams).to.have.length(2); // logs stream + classic stream
const dashboardResponse = await apiClient.fetch(
'GET /api/streams/{name}/dashboards 2023-10-31',
{
params: {
path: { name: TEST_STREAM_NAME },
},
}
);
expect(dashboardResponse.status).to.eql(200);
});
it('should read and return existing regular classic stream', async () => {
@ -116,6 +130,20 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
expect(getResponse.status).to.eql(200);
expect(getResponse.body.stream).to.eql(expectedStreamsResponse);
const listResponse = await apiClient.fetch('GET /api/streams 2023-10-31');
expect(listResponse.status).to.eql(200);
expect(listResponse.body.streams).to.have.length(2); // logs stream + classic stream
const dashboardResponse = await apiClient.fetch(
'GET /api/streams/{name}/dashboards 2023-10-31',
{
params: {
path: { name: TEST_STREAM_NAME },
},
}
);
expect(dashboardResponse.status).to.eql(200);
});
});
}