[SSE] Fix EventSource streams (#213151)

## Summary

Resolves #212919

We noticed that setting the header `'Content-Type':
'text/event-stream',` didn't work as the browser's native EventSource
implementation.

```JS
      return res.ok({
        headers: {
          'Content-Type': 'text/event-stream',
          'Cache-Control': 'no-cache',
        },
        body: observableIntoEventSourceStream(events$ as unknown as Observable<ServerSentEvent>, {
          signal: abortController.signal,
          logger,
        }),
      });
```

The reason, apparently, is that we need to flush the compressor's buffer
negotiated in the HTTP request.

### How to test it:

Run Kibana with examples `yarn start --no-base-path --run-examples
--http2` and open the SSE example app in Kibana. You should see a clock
updating every second in the UI (the clock is coming from the server).

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Alejandro Fernández Haro 2025-03-11 11:33:11 +01:00 committed by GitHub
parent c348bd1df5
commit bdf7823c54
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 389 additions and 12 deletions

1
.github/CODEOWNERS vendored
View file

@ -34,6 +34,7 @@ examples/routing_example @elastic/kibana-core
examples/screenshot_mode_example @elastic/appex-sharedux
examples/search_examples @elastic/kibana-data-discovery
examples/share_examples @elastic/appex-sharedux
examples/sse_example @elastic/kibana-core
examples/state_containers_examples @elastic/appex-sharedux
examples/ui_action_examples @elastic/appex-sharedux
examples/ui_actions_explorer @elastic/appex-sharedux

3
examples/sse_example/README.md Executable file
View file

@ -0,0 +1,3 @@
# SSE Example
This plugin's goal is to demonstrate how to implement Server-Sent Events.

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export const PLUGIN_ID = 'sseExample';
export const PLUGIN_NAME = 'SSE Example';

View file

@ -0,0 +1,13 @@
{
"type": "plugin",
"id": "@kbn/sse-example-plugin",
"owner": "@elastic/kibana-core",
"description": "Plugin that shows how to implement Server-Sent Events.",
"plugin": {
"id": "sseExample",
"server": true,
"browser": true,
"requiredPlugins": ["developerExamples"],
"optionalPlugins": []
}
}

View file

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import React from 'react';
import ReactDOM from 'react-dom';
import { AppMountParameters, CoreStart } from '@kbn/core/public';
import { KibanaRenderContextProvider } from '@kbn/react-kibana-context-render';
import { SseExampleApp } from './components/app';
export const renderApp = (coreStart: CoreStart, { element }: AppMountParameters) => {
const { http } = coreStart;
ReactDOM.render(
<KibanaRenderContextProvider {...coreStart}>
<SseExampleApp http={http} />
</KibanaRenderContextProvider>,
element
);
return () => ReactDOM.unmountComponentAtNode(element);
};

View file

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import React, { useMemo } from 'react';
import { EuiPageTemplate, EuiTitle } from '@elastic/eui';
import type { CoreStart } from '@kbn/core/public';
import useObservable from 'react-use/lib/useObservable';
import { defer } from 'rxjs';
import { httpResponseIntoObservable } from '@kbn/sse-utils-client';
import { PLUGIN_NAME } from '../../common';
interface FeatureFlagsExampleAppDeps {
http: CoreStart['http'];
}
export const SseExampleApp = ({ http }: FeatureFlagsExampleAppDeps) => {
const sseResponse$ = useMemo(() => {
return defer(() =>
http.get('/internal/sse_examples/clock', {
asResponse: true,
rawResponse: true,
version: '1',
})
).pipe(httpResponseIntoObservable<{ message: string; type: 'clock' }>());
}, [http]);
const sseResponse = useObservable(sseResponse$, { message: 'Initial value', type: 'clock' });
return (
<>
<EuiPageTemplate>
<EuiPageTemplate.Header>
<EuiTitle size="l">
<h1>{PLUGIN_NAME}</h1>
</EuiTitle>
</EuiPageTemplate.Header>
<EuiPageTemplate.Section>
<h2>{sseResponse.message}</h2>
</EuiPageTemplate.Section>
</EuiPageTemplate>
</>
);
};

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { SseExamplePlugin } from './plugin';
export function plugin() {
return new SseExamplePlugin();
}

View file

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { AppMountParameters, CoreSetup, CoreStart, Plugin } from '@kbn/core/public';
import { AppPluginSetupDependencies } from './types';
import { PLUGIN_ID, PLUGIN_NAME } from '../common';
export class SseExamplePlugin implements Plugin {
public setup(core: CoreSetup, deps: AppPluginSetupDependencies) {
// Register an application into the side navigation menu
core.application.register({
id: PLUGIN_ID,
title: PLUGIN_NAME,
async mount(params: AppMountParameters) {
// Load application bundle
const { renderApp } = await import('./application');
// Get start services as specified in kibana.json
const [coreStart] = await core.getStartServices();
// Render the application
return renderApp(coreStart, params);
},
});
deps.developerExamples.register({
appId: PLUGIN_ID,
title: PLUGIN_NAME,
description: 'Plugin that shows how to make use of the feature flags core service.',
});
}
public start(core: CoreStart) {}
public stop() {}
}

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import type { DeveloperExamplesSetup } from '@kbn/developer-examples-plugin/public';
export interface AppPluginSetupDependencies {
developerExamples: DeveloperExamplesSetup;
}

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import type { PluginInitializerContext } from '@kbn/core-plugins-server';
export async function plugin(initializerContext: PluginInitializerContext) {
const { SseExamplePlugin } = await import('./plugin');
return new SseExamplePlugin(initializerContext);
}

View file

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import type { PluginInitializerContext, CoreSetup, Plugin, Logger } from '@kbn/core/server';
import { defineRoutes } from './routes';
export class SseExamplePlugin implements Plugin {
private readonly logger: Logger;
constructor(initializerContext: PluginInitializerContext) {
this.logger = initializerContext.logger.get();
}
public setup(core: CoreSetup) {
const router = core.http.createRouter();
// Register server side APIs
defineRoutes(router, this.logger);
}
public start() {}
public stop() {}
}

View file

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import type { IRouter, Logger } from '@kbn/core/server';
import { Observable, defer, map, timer } from 'rxjs';
import { observableIntoEventSourceStream } from '@kbn/sse-utils-server';
import { ServerSentEvent } from '@kbn/sse-utils/src/events';
export function defineRoutes(router: IRouter, logger: Logger) {
router.versioned
.get({
path: '/internal/sse_examples/clock',
access: 'internal',
})
.addVersion(
{
version: '1',
validate: false,
},
async (context, request, response) => {
const abortController = new AbortController();
request.events.aborted$.subscribe(() => {
abortController.abort();
});
const events$: Observable<ServerSentEvent> = defer(() => timer(0, 1000)).pipe(
map(() => ({ type: 'clock', message: `Hi! It's ${new Date().toLocaleTimeString()}!` }))
);
return response.ok({
headers: {
'Content-Type': 'text/event-stream',
},
body: observableIntoEventSourceStream(events$, {
signal: abortController.signal,
logger,
}),
});
}
);
}

View file

@ -0,0 +1,24 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "target/types"
},
"include": [
"index.ts",
"common/**/*.ts",
"public/**/*.ts",
"public/**/*.tsx",
"server/**/*.ts",
"../../typings/**/*"
],
"exclude": ["target/**/*"],
"kbn_references": [
"@kbn/core",
"@kbn/core-plugins-server",
"@kbn/developer-examples-plugin",
"@kbn/react-kibana-context-render",
"@kbn/sse-utils-client",
"@kbn/sse-utils-server",
"@kbn/sse-utils",
]
}

View file

@ -925,6 +925,7 @@
"@kbn/sort-predicates": "link:src/platform/packages/shared/kbn-sort-predicates",
"@kbn/spaces-plugin": "link:x-pack/platform/plugins/shared/spaces",
"@kbn/spaces-test-plugin": "link:x-pack/test/spaces_api_integration/common/plugins/spaces_test_plugin",
"@kbn/sse-example-plugin": "link:examples/sse_example",
"@kbn/sse-utils": "link:src/platform/packages/shared/kbn-sse-utils",
"@kbn/sse-utils-client": "link:src/platform/packages/shared/kbn-sse-utils-client",
"@kbn/sse-utils-server": "link:src/platform/packages/shared/kbn-sse-utils-server",

View file

@ -23,10 +23,11 @@ describe('observableIntoEventSourceStream', () => {
let controller: AbortController;
let stream: PassThrough;
let stream: PassThrough & { flush: () => void };
let source$: Subject<ServerSentEvent>;
let data: string[];
let streamFlushSpy: jest.SpyInstance;
beforeEach(() => {
jest.useFakeTimers();
@ -43,6 +44,7 @@ describe('observableIntoEventSourceStream', () => {
stream.on('data', (chunk) => {
data.push(chunk.toString());
});
streamFlushSpy = jest.spyOn(stream, 'flush');
});
afterEach(() => {
@ -56,6 +58,35 @@ describe('observableIntoEventSourceStream', () => {
jest.runAllTimers();
expect(data).toEqual(['event: data\ndata: {"data":{"foo":"bar"}}\n\n']);
expect(streamFlushSpy).toHaveBeenCalledTimes(1);
});
it('multiple writes only call flush twice', () => {
source$.next({ type: ServerSentEventType.data, data: { foo: 'bar-1' } });
source$.next({ type: ServerSentEventType.data, data: { foo: 'bar-2' } });
source$.next({ type: ServerSentEventType.data, data: { foo: 'bar-3' } });
source$.next({ type: ServerSentEventType.data, data: { foo: 'bar-4' } });
source$.complete();
expect(streamFlushSpy).toHaveBeenCalledTimes(1); // on the first message
jest.advanceTimersByTime(50); // Advance half the throttling time
expect(streamFlushSpy).toHaveBeenCalledTimes(1); // still the first flush only
jest.advanceTimersByTime(50); // Advance throttling time
expect(streamFlushSpy).toHaveBeenCalledTimes(2); // on the first message, and after the throttling time
jest.runAllTimers();
expect(data).toEqual([
'event: data\ndata: {"data":{"foo":"bar-1"}}\n\n',
'event: data\ndata: {"data":{"foo":"bar-2"}}\n\n',
'event: data\ndata: {"data":{"foo":"bar-3"}}\n\n',
'event: data\ndata: {"data":{"foo":"bar-4"}}\n\n',
]);
expect(streamFlushSpy).toHaveBeenCalledTimes(2);
});
it('handles SSE errors', () => {
@ -87,6 +118,7 @@ describe('observableIntoEventSourceStream', () => {
},
})}\n\n`,
]);
expect(streamFlushSpy).toHaveBeenCalledTimes(1);
});
it('handles SSE errors with metadata', () => {
@ -122,6 +154,7 @@ describe('observableIntoEventSourceStream', () => {
},
})}\n\n`,
]);
expect(streamFlushSpy).toHaveBeenCalledTimes(1);
});
it('handles non-SSE errors', () => {
@ -144,10 +177,12 @@ describe('observableIntoEventSourceStream', () => {
it('should send keep-alive comments every 10 seconds', () => {
jest.advanceTimersByTime(10000);
expect(data).toContain(': keep-alive');
expect(data.filter((d) => d === ': keep-alive\n')).toHaveLength(1);
expect(streamFlushSpy).toHaveBeenCalledTimes(1);
jest.advanceTimersByTime(10000);
expect(data.filter((d) => d === ': keep-alive')).toHaveLength(2);
expect(data.filter((d) => d === ': keep-alive\n')).toHaveLength(2);
expect(streamFlushSpy).toHaveBeenCalledTimes(2);
});
describe('without fake timers', () => {
@ -166,6 +201,7 @@ describe('observableIntoEventSourceStream', () => {
await new Promise((resolve) => process.nextTick(resolve));
expect(endSpy).toHaveBeenCalled();
expect(streamFlushSpy).toHaveBeenCalledTimes(0);
});
it('should end stream when signal is aborted', async () => {
@ -189,6 +225,7 @@ describe('observableIntoEventSourceStream', () => {
expect(data).toEqual([
`event: data\ndata: ${JSON.stringify({ data: { initial: 'data' } })}\n\n`,
]);
expect(streamFlushSpy).toHaveBeenCalledTimes(1);
});
afterEach(() => {

View file

@ -14,17 +14,36 @@ import {
ServerSentEventErrorCode,
} from '@kbn/sse-utils/src/errors';
import { ServerSentEvent, ServerSentEventType } from '@kbn/sse-utils/src/events';
import { catchError, map, Observable, of } from 'rxjs';
import { catchError, map, Observable, of, Subject, throttleTime } from 'rxjs';
import { PassThrough } from 'stream';
import type { Zlib } from 'zlib';
class ResponseStream extends PassThrough {
private _compressor?: Zlib;
setCompressor(compressor: Zlib) {
this._compressor = compressor;
}
flush() {
this._compressor?.flush();
}
}
export function observableIntoEventSourceStream(
source$: Observable<ServerSentEvent>,
{
logger,
signal,
flushThrottleMs = 100,
}: {
logger: Pick<Logger, 'debug' | 'error'>;
signal: AbortSignal;
/**
* The minimum time in milliseconds between flushes of the stream.
* This is to avoid flushing too often if the source emits events in quick succession.
*
* @default 100
*/
flushThrottleMs?: number;
}
) {
const withSerializedErrors$ = source$.pipe(
@ -54,36 +73,51 @@ export function observableIntoEventSourceStream(
}),
map((event) => {
const { type, ...rest } = event;
return `event: ${type}\ndata: ${JSON.stringify(rest)}\n\n`;
return createLine({ event: type, data: rest });
})
);
const stream = new PassThrough();
const stream = new ResponseStream();
const flush$ = new Subject<void>();
flush$
// Using `leading: true` and `trailing: true` to avoid holding the flushing for too long,
// but still avoid flushing too often (it will emit at the beginning of the throttling process, and at the end).
.pipe(throttleTime(flushThrottleMs, void 0, { leading: true, trailing: true }))
.subscribe(() => stream.flush());
const intervalId = setInterval(() => {
// `:` denotes a comment - this is to keep the connection open
// it will be ignored by the SSE parser on the client
stream.write(': keep-alive');
stream.write(': keep-alive\n');
flush$.next();
}, 10000);
const subscription = withSerializedErrors$.subscribe({
next: (line) => {
stream.write(line);
// Make sure to flush the written lines to emit them immediately (instead of waiting for buffer to fill)
flush$.next();
},
complete: () => {
flush$.complete();
stream.end();
clearTimeout(intervalId);
},
error: (error) => {
clearTimeout(intervalId);
stream.write(
`event:error\ndata: ${JSON.stringify({
error: {
code: ServerSentEventErrorCode.internalError,
message: error.message,
createLine({
event: 'error',
data: {
error: {
code: ServerSentEventErrorCode.internalError,
message: error.message,
},
},
})}\n\n`
})
);
flush$.complete();
// No need to flush because we're ending the stream anyway
stream.end();
},
});
@ -95,3 +129,13 @@ export function observableIntoEventSourceStream(
return stream;
}
function createLine({ event, data }: { event: string; data: unknown }) {
return [
`event: ${event}`,
`data: ${JSON.stringify(data)}`,
// We could also include `id` and `retry` if we see fit in the future.
// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#fields
`\n`,
].join(`\n`);
}

View file

@ -1870,6 +1870,8 @@
"@kbn/spaces-plugin/*": ["x-pack/platform/plugins/shared/spaces/*"],
"@kbn/spaces-test-plugin": ["x-pack/test/spaces_api_integration/common/plugins/spaces_test_plugin"],
"@kbn/spaces-test-plugin/*": ["x-pack/test/spaces_api_integration/common/plugins/spaces_test_plugin/*"],
"@kbn/sse-example-plugin": ["examples/sse_example"],
"@kbn/sse-example-plugin/*": ["examples/sse_example/*"],
"@kbn/sse-utils": ["src/platform/packages/shared/kbn-sse-utils"],
"@kbn/sse-utils/*": ["src/platform/packages/shared/kbn-sse-utils/*"],
"@kbn/sse-utils-client": ["src/platform/packages/shared/kbn-sse-utils-client"],

View file

@ -7587,6 +7587,10 @@
version "0.0.0"
uid ""
"@kbn/sse-example-plugin@link:examples/sse_example":
version "0.0.0"
uid ""
"@kbn/sse-utils-client@link:src/platform/packages/shared/kbn-sse-utils-client":
version "0.0.0"
uid ""