[1chat] MCP Server that can expose 1chat tools (#222231)

## Summary

This PR introduces the **1chat MCP server** in Kibana, exposed at the
experimental `/api/mcp` endpoint behind a feature flag. It allows
external MCP clients (e.g. Claude Desktop, Cursor, OpenAI Agents) to
connect and use tools registered in the 1chat registry.

### MCP server
- Implements a **stateless** MCP server following the MCP spec
(Streamable HTTP transport).
- Supports **API key** and **basic auth** for authentication.
- Works with clients via:
  - **Streamable HTTP** with auth header 
  - **STDIO** transport using `mcp-remote` proxy
- Endpoint under a feature flag `xpack.onechat.mcpServer.enabled`
- 1chat tools are scoped to the caller’s permissions, as determined by
the auth header.

### Other changes
- Implemented `KibanaMcpHttpTransport` (mcp http transport layer adapted
to Kibana Core primitives) + tests

### Local testing

Set ui setting: `onechat:mcpServer:enabled` to true

E.g. add this to Claude Desktop:

```
{
  "mcpServers": {
    "elastic": {
      "command": "npx",
      "args": [
        "mcp-remote",
        "https://{kbn}/api/mcp",
        "--header",
        "Authorization: ApiKey ${API_KEY}"
      ],
      "env": {
        "API_KEY": "..."
      }
    },
  }
}

```
### Enable feature via API

```
POST kbn:/internal/kibana/settings/onechat:mcpServer:enabled
{"value": true}
```

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Jedr Blaszyk 2025-06-11 15:16:08 +02:00 committed by GitHub
parent f99ac4e87a
commit 8c6fc9e21c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 1196 additions and 12 deletions

View file

@ -1096,7 +1096,7 @@
"@mapbox/mapbox-gl-rtl-text": "0.2.3",
"@mapbox/mapbox-gl-supported": "2.0.1",
"@mapbox/vector-tile": "1.3.1",
"@modelcontextprotocol/sdk": "^1.6.0",
"@modelcontextprotocol/sdk": "^1.12.1",
"@n8n/json-schema-to-zod": "^1.1.0",
"@openfeature/core": "^1.8.0",
"@openfeature/launchdarkly-client-provider": "^0.3.2",

View file

@ -128,7 +128,7 @@ run(
...(flagsReader.boolean('verbose') ? ['--verbose'] : []),
],
env: {
NODE_OPTIONS: '--max-old-space-size=8192',
NODE_OPTIONS: '--max-old-space-size=10240',
},
cwd: REPO_ROOT,
wait: true,

View file

@ -499,6 +499,10 @@ export const stackManagementSchema: MakeSchemaFrom<UsageStats> = {
_meta: { description: 'Non-default value of setting.' },
},
},
'onechat:mcpServer:enabled': {
type: 'boolean',
_meta: { description: 'Non-default value of setting.' },
},
'banners:placement': {
type: 'keyword',
_meta: { description: 'Non-default value of setting.' },

View file

@ -49,6 +49,7 @@ export interface UsageStats {
'observability:newLogsOverview': boolean;
'observability:aiAssistantSimulatedFunctionCalling': boolean;
'observability:aiAssistantSearchConnectorIndexPattern': string;
'onechat:mcpServer:enabled': boolean;
'visualization:heatmap:maxBuckets': number;
'visualization:useLegacyTimeAxis': boolean;
'visualization:regionmap:showWarnings': boolean;

View file

@ -11307,6 +11307,12 @@
}
}
},
"onechat:mcpServer:enabled": {
"type": "boolean",
"_meta": {
"description": "Non-default value of setting."
}
},
"banners:placement": {
"type": "keyword",
"_meta": {

View file

@ -17,7 +17,9 @@ The onechat plugin exposes APIs to interact with onechat primitives.
The main primitives are:
- tools
- [tools](#tools)
Additionally, the plugin implements [MCP server](#mcp-server) that exposes onechat tools.
## Tools
@ -219,3 +221,38 @@ try {
}
}
```
## MCP Server
The MCP server provides a standardized interface for external MCP clients to access onechat tools.
### Running with Claude Desktop
To enable the MCP server, add the following to your Kibana config:
```yaml
uiSettings.overrides:
onechat:mcpServer:enabled: true
```
Configure Claude Desktop by adding this to its configuration:
```json
{
"mcpServers": {
"elastic": {
"command": "npx",
"args": [
"mcp-remote",
"http://localhost:5601/api/mcp",
"--header",
"Authorization: ApiKey ${API_KEY}"
],
"env": {
"API_KEY": "..."
}
}
}
}
```

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
/**
* UI Setting ID for enabling / disabling the MCP server
*/
export const ONECHAT_MCP_SERVER_UI_SETTING_ID = 'onechat:mcpServer:enabled';

View file

@ -7,6 +7,8 @@
import type { Logger } from '@kbn/logging';
import type { CoreSetup, CoreStart, Plugin, PluginInitializerContext } from '@kbn/core/server';
import { i18n } from '@kbn/i18n';
import { schema } from '@kbn/config-schema';
import type { OnechatConfig } from './config';
import type {
OnechatPluginSetup,
@ -17,6 +19,7 @@ import type {
import { registerRoutes } from './routes';
import { ServiceManager } from './services';
import { registerFeatures } from './features';
import { ONECHAT_MCP_SERVER_UI_SETTING_ID } from '../common/constants';
export class OnechatPlugin
implements
@ -47,6 +50,21 @@ export class OnechatPlugin
registerFeatures({ features: pluginsSetup.features });
coreSetup.uiSettings.register({
[ONECHAT_MCP_SERVER_UI_SETTING_ID]: {
description: i18n.translate('onechat.uiSettings.mcpServer.description', {
defaultMessage: 'Enables MCP server with access to tools.',
}),
name: i18n.translate('onechat.uiSettings.mcpServer.name', {
defaultMessage: 'MCP Server',
}),
schema: schema.boolean(),
value: false,
readonly: true,
readonlyMode: 'ui',
},
});
const router = coreSetup.http.createRouter();
registerRoutes({
router,

View file

@ -10,10 +10,12 @@ import { registerToolsRoutes } from './tools';
import { registerAgentRoutes } from './agents';
import { registerChatRoutes } from './chat';
import { registerConversationRoutes } from './conversations';
import { registerMCPRoutes } from './mcp';
export const registerRoutes = (dependencies: RouteDependencies) => {
registerToolsRoutes(dependencies);
registerAgentRoutes(dependencies);
registerChatRoutes(dependencies);
registerConversationRoutes(dependencies);
registerMCPRoutes(dependencies);
};

View file

@ -0,0 +1,216 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { ErrorCode } from '@modelcontextprotocol/sdk/types.js';
import { schema } from '@kbn/config-schema';
import { apiPrivileges } from '../../common/features';
import type { RouteDependencies } from './types';
import { getHandlerWrapper } from './wrap_handler';
import { KibanaMcpHttpTransport } from '../utils/kibana_mcp_http_transport';
import { ONECHAT_MCP_SERVER_UI_SETTING_ID } from '../../common/constants';
const TECHNICAL_PREVIEW_WARNING =
'Elastic MCP Server is in technical preview and may be changed or removed in a future release. Elastic will work to fix any issues, but features in technical preview are not subject to the support SLA of official GA features.';
const MCP_SERVER_NAME = 'elastic-mcp-server';
const MCP_SERVER_VERSION = '0.0.1';
const MCP_SERVER_PATH = '/api/mcp';
export function registerMCPRoutes({ router, getInternalServices, logger }: RouteDependencies) {
const wrapHandler = getHandlerWrapper({ logger });
router.versioned
.post({
path: MCP_SERVER_PATH,
security: {
authz: { requiredPrivileges: [apiPrivileges.readOnechat] },
},
access: 'public',
summary: 'MCP server',
description: TECHNICAL_PREVIEW_WARNING,
options: {
tags: ['mcp'],
xsrfRequired: false,
availability: {
stability: 'experimental',
},
},
})
.addVersion(
{
version: '2023-10-31',
validate: {
request: { body: schema.object({}, { unknowns: 'allow' }) },
},
},
wrapHandler(async (ctx, request, response) => {
let transport: KibanaMcpHttpTransport | undefined;
let server: McpServer | undefined;
const { uiSettings } = await ctx.core;
const enabled = await uiSettings.client.get(ONECHAT_MCP_SERVER_UI_SETTING_ID);
if (!enabled) {
return response.notFound();
}
try {
transport = new KibanaMcpHttpTransport({ sessionIdGenerator: undefined, logger });
// Instantiate new MCP server upon every request, no session persistence
server = new McpServer({
name: MCP_SERVER_NAME,
version: MCP_SERVER_VERSION,
});
const { tools: toolService } = getInternalServices();
const registry = toolService.registry.asScopedPublicRegistry({ request });
const tools = await registry.list({});
// Expose tools scoped to the request
for (const tool of tools) {
server.tool(
tool.id,
tool.description,
tool.schema.shape,
async (args: { [x: string]: any }) => {
const toolResult = await tool.execute({ toolParams: args });
return {
content: [{ type: 'text' as const, text: JSON.stringify(toolResult) }],
};
}
);
}
request.events.aborted$.subscribe(async () => {
await transport?.close().catch((error) => {
logger.error('MCP Server: Error closing transport', { error });
});
await server?.close().catch((error) => {
logger.error('MCP Server: Error closing server', { error });
});
});
await server.connect(transport);
return await transport.handleRequest(request, response);
} catch (error) {
logger.error('MCP Server: Error handling request', { error });
try {
await transport?.close();
} catch (closeError) {
logger.error('MCP Server: Error closing transport during error handling', {
error: closeError,
});
}
if (server) {
try {
await server.close();
} catch (closeError) {
logger.error('MCP Server: Error closing server during error handling', {
error: closeError,
});
}
}
logger.error('MCP Server: Error handling request', { error });
return response.customError({
statusCode: 500,
body: {
message: `Internal server error: ${error}`,
attributes: {
code: ErrorCode.InternalError,
},
},
});
}
})
);
router.versioned
.get({
path: MCP_SERVER_PATH,
security: {
authz: { requiredPrivileges: [apiPrivileges.readOnechat] },
},
access: 'public',
summary: 'MCP server',
description: TECHNICAL_PREVIEW_WARNING,
options: {
tags: ['mcp'],
availability: {
stability: 'experimental',
},
},
})
.addVersion(
{
version: '2023-10-31',
validate: false,
},
wrapHandler(async (ctx, _, response) => {
const { uiSettings } = await ctx.core;
const enabled = await uiSettings.client.get(ONECHAT_MCP_SERVER_UI_SETTING_ID);
if (!enabled) {
return response.notFound();
}
return response.customError({
statusCode: 405,
body: {
message: 'Method not allowed',
attributes: {
code: ErrorCode.ConnectionClosed,
},
},
});
})
);
router.versioned
.delete({
path: MCP_SERVER_PATH,
security: {
authz: { requiredPrivileges: [apiPrivileges.readOnechat] },
},
access: 'public',
summary: 'MCP server',
description: TECHNICAL_PREVIEW_WARNING,
options: {
tags: ['mcp'],
xsrfRequired: false,
availability: {
stability: 'experimental',
},
},
})
.addVersion(
{
version: '2023-10-31',
validate: false,
},
wrapHandler(async (ctx, _, response) => {
const { uiSettings } = await ctx.core;
const enabled = await uiSettings.client.get(ONECHAT_MCP_SERVER_UI_SETTING_ID);
if (!enabled) {
return response.notFound();
}
return response.customError({
statusCode: 405,
body: {
message: 'Method not allowed',
attributes: {
code: ErrorCode.ConnectionClosed,
},
},
});
})
);
}

View file

@ -0,0 +1,517 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { KibanaMcpHttpTransport } from './kibana_mcp_http_transport';
import type { KibanaRequest, KibanaResponseFactory, RouteMethod } from '@kbn/core-http-server';
import { httpServerMock } from '@kbn/core/server/mocks';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { CallToolResult, JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js';
import { randomUUID } from 'node:crypto';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { z } from '@kbn/zod';
const mockLoggerFactory = loggingSystemMock.create();
const mockLogger = mockLoggerFactory.get('mock logger');
interface TestServerConfig {
sessionIdGenerator: (() => string) | undefined;
}
/**
* Creates a minimal response factory for testing that returns proper response objects
*/
function createMinimalResponseFactory(): KibanaResponseFactory {
return {
ok: (options?: { body?: any; headers?: Record<string, string> }) => ({
status: 200,
payload: options?.body,
options: {
headers: options?.headers || {},
},
}),
accepted: (options?: { body?: any; headers?: Record<string, string> }) => ({
status: 202,
payload: options?.body,
options: {
headers: options?.headers || {},
},
}),
badRequest: (options?: { body?: any; headers?: Record<string, string> }) => ({
status: 400,
payload: options?.body,
options: {
headers: options?.headers || {},
},
}),
customError: (options: {
statusCode: number;
body?: any;
headers?: Record<string, string>;
}) => ({
status: options.statusCode,
payload: options.body,
options: {
headers: options?.headers || {},
},
}),
} as unknown as KibanaResponseFactory;
}
async function createTestServer(
config: TestServerConfig = { sessionIdGenerator: () => randomUUID() }
): Promise<{
transport: KibanaMcpHttpTransport;
mcpServer: McpServer;
}> {
const mcpServer = new McpServer(
{ name: 'test-server', version: '1.0.0' },
{ capabilities: { logging: {} } }
);
mcpServer.tool(
'greet',
'A simple greeting tool',
{ name: z.string().describe('Name to greet') },
async ({ name }): Promise<CallToolResult> => {
return { content: [{ type: 'text', text: `Hello, ${name}!` }] };
}
);
const transport = new KibanaMcpHttpTransport({
sessionIdGenerator: config.sessionIdGenerator,
logger: mockLogger,
});
await mcpServer.connect(transport);
return { transport, mcpServer };
}
const createMockKibanaRequest = (
body?: any,
headers: Record<string, string> = {},
method: RouteMethod = 'post'
): jest.Mocked<KibanaRequest> =>
httpServerMock.createKibanaRequest({
headers,
body,
method,
path: '/test',
});
const TEST_MESSAGES = {
initialize: {
jsonrpc: '2.0',
method: 'initialize',
params: {
clientInfo: { name: 'test-client', version: '1.0' },
protocolVersion: '2025-03-26',
capabilities: {},
},
id: 'init-1',
} as JSONRPCMessage,
toolsList: {
jsonrpc: '2.0',
method: 'tools/list',
params: {},
id: 'tools-1',
} as JSONRPCMessage,
};
describe('KibanaMcpHttpTransport', () => {
let transport: KibanaMcpHttpTransport;
let responseFactory: KibanaResponseFactory;
beforeEach(async () => {
jest.clearAllMocks();
const { transport: newTransport } = await createTestServer();
transport = newTransport;
responseFactory = createMinimalResponseFactory();
});
afterEach(async () => {
await transport.close();
});
describe('Initialization', () => {
it('should reject starting transport twice', async () => {
await expect(transport.start()).rejects.toThrow('Transport already started');
await transport.close();
});
it('should initialize server and generate session ID', async () => {
const request = createMockKibanaRequest(TEST_MESSAGES.initialize, {
'content-type': 'application/json',
accept: 'application/json',
});
const response = await transport.handleRequest(request, responseFactory);
expect(response.status).toBe(200);
const headers = response.options.headers as Record<string, string>;
expect(headers?.['mcp-session-id']).toBeDefined();
expect(headers?.['Content-Type']).toBe('application/json');
});
it('should reject second initialization request', async () => {
// First initialization
const firstRequest = createMockKibanaRequest(TEST_MESSAGES.initialize, {
'content-type': 'application/json',
accept: 'application/json',
});
await transport.handleRequest(firstRequest, responseFactory);
// Second initialization
const secondRequest = createMockKibanaRequest(
{ ...TEST_MESSAGES.initialize, id: 'second-init' },
{ 'content-type': 'application/json', accept: 'application/json' }
);
const response = await transport.handleRequest(secondRequest, responseFactory);
expect(response.status).toBe(400);
const errorData = JSON.parse(response.payload as string);
expect(errorData).toMatchObject({
jsonrpc: '2.0',
error: {
code: -32600,
message: expect.stringMatching(/Server already initialized/),
},
});
});
it('should reject batch initialize request', async () => {
const batchInitMessages = [
TEST_MESSAGES.initialize,
{ ...TEST_MESSAGES.initialize, id: 'init-2' },
];
const request = createMockKibanaRequest(batchInitMessages, {
'content-type': 'application/json',
accept: 'application/json',
});
const response = await transport.handleRequest(request, responseFactory);
expect(response.status).toBe(400);
const errorData = JSON.parse(response.payload as string);
expect(errorData).toMatchObject({
jsonrpc: '2.0',
error: {
code: -32600,
message: expect.stringMatching(/Only one initialization request is allowed/),
},
});
});
});
describe('Request Handling', () => {
beforeEach(async () => {
// Initialize transport to get session ID
const initRequest = createMockKibanaRequest(TEST_MESSAGES.initialize, {
'content-type': 'application/json',
accept: 'application/json',
});
await transport.handleRequest(initRequest, responseFactory);
});
it('should handle POST requests with valid session', async () => {
const request = createMockKibanaRequest(TEST_MESSAGES.toolsList, {
'content-type': 'application/json',
accept: 'application/json',
});
const response = await transport.handleRequest(request, responseFactory);
expect(response.status).toBe(200);
const headers = response.options.headers as Record<string, string | string[]>;
expect(headers?.['Content-Type']).toBe('application/json');
const responseData = JSON.parse(response.payload as string);
expect(responseData).toMatchObject({
jsonrpc: '2.0',
result: {
tools: [
{
name: 'greet',
description: 'A simple greeting tool',
inputSchema: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'Name to greet',
},
},
required: ['name'],
additionalProperties: false,
$schema: 'http://json-schema.org/draft-07/schema#',
},
},
],
},
id: 'tools-1',
});
});
it('should handle notifications with 202 response', async () => {
const request = createMockKibanaRequest(
{ jsonrpc: '2.0', method: 'notification/progress', params: { progress: 1 } },
{
'content-type': 'application/json',
accept: 'application/json',
}
);
const response = await transport.handleRequest(request, responseFactory);
expect(response.status).toBe(202);
});
it('should handle batch notifications with 202 response', async () => {
const batchNotifications = [
{ jsonrpc: '2.0', method: 'notification/progress', params: { progress: 1 } },
{ jsonrpc: '2.0', method: 'notification/progress', params: { progress: 2 } },
];
const request = createMockKibanaRequest(batchNotifications, {
'content-type': 'application/json',
accept: 'application/json',
});
const response = await transport.handleRequest(request, responseFactory);
expect(response.status).toBe(202);
});
it('should handle batch requests with multiple responses', async () => {
const batchRequests = [
{ jsonrpc: '2.0', method: 'tools/list', params: {}, id: 'req-1' },
{ jsonrpc: '2.0', method: 'ping', params: {}, id: 'req-2' },
];
const request = createMockKibanaRequest(batchRequests, {
'content-type': 'application/json',
accept: 'application/json',
});
const response = await transport.handleRequest(request, responseFactory);
expect(response.status).toBe(200);
const responseData = JSON.parse(response.payload as string);
expect(Array.isArray(responseData)).toBe(true);
expect(responseData).toHaveLength(2);
});
});
describe('Error Handling', () => {
it('should reject requests without Accept header containing application/json', async () => {
const request = createMockKibanaRequest(TEST_MESSAGES.initialize, {
'content-type': 'application/json',
accept: 'text/html',
});
const response = await transport.handleRequest(request, responseFactory);
expect(response.status).toBe(406);
const errorData = JSON.parse(response.payload as string);
expect(errorData).toMatchObject({
jsonrpc: '2.0',
error: {
code: -32000,
message: expect.stringMatching(/Client must accept application\/json/),
},
});
});
it('should reject requests with invalid Content-Type', async () => {
const request = createMockKibanaRequest(TEST_MESSAGES.initialize, {
'content-type': 'text/plain',
accept: 'application/json',
});
const response = await transport.handleRequest(request, responseFactory);
expect(response.status).toBe(415);
const errorData = JSON.parse(response.payload as string);
expect(errorData).toMatchObject({
jsonrpc: '2.0',
error: {
code: -32000,
message: expect.stringMatching(/Content-Type must be application\/json/),
},
});
});
it('should handle unsupported HTTP methods', async () => {
const request = createMockKibanaRequest(undefined, {}, 'get');
const response = await transport.handleRequest(request, responseFactory);
expect(response.status).toBe(405);
const errorData = JSON.parse(response.payload as string);
expect(errorData).toMatchObject({
jsonrpc: '2.0',
error: {
code: -32600,
message: 'Method not allowed.',
},
});
});
it('should handle invalid JSON data', async () => {
// Create request with invalid JSON structure that will fail parsing
const request = createMockKibanaRequest(
'invalid json', // This will cause JSON parsing to fail
{ 'content-type': 'application/json', accept: 'application/json' }
);
const response = await transport.handleRequest(request, responseFactory);
expect(response.status).toBe(400);
const errorData = JSON.parse(response.payload as string);
expect(errorData).toMatchObject({
jsonrpc: '2.0',
error: {
code: -32700,
message: 'Parse error',
},
});
});
it('should handle invalid JSON-RPC messages', async () => {
const invalidMessage = { method: 'test', params: {} }; // missing jsonrpc and id
const request = createMockKibanaRequest(invalidMessage, {
'content-type': 'application/json',
accept: 'application/json',
});
const response = await transport.handleRequest(request, responseFactory);
expect(response.status).toBe(400);
});
});
describe('Session Management', () => {
it('should include session ID in responses after initialization', async () => {
const initRequest = createMockKibanaRequest(TEST_MESSAGES.initialize, {
'content-type': 'application/json',
accept: 'application/json',
});
const initResponse = await transport.handleRequest(initRequest, responseFactory);
const initHeaders = initResponse.options.headers as Record<string, string | string[]>;
const sessionId = initHeaders?.['mcp-session-id'] as string;
expect(sessionId).toBeDefined();
const toolsRequest = createMockKibanaRequest(TEST_MESSAGES.toolsList, {
'content-type': 'application/json',
accept: 'application/json',
});
const toolsResponse = await transport.handleRequest(toolsRequest, responseFactory);
expect(toolsResponse.status).toBe(200);
const toolsHeaders = toolsResponse.options.headers as Record<string, string | string[]>;
expect(toolsHeaders?.['mcp-session-id']).toBe(sessionId);
});
});
describe('Message Sending', () => {
beforeEach(async () => {
const initRequest = createMockKibanaRequest(TEST_MESSAGES.initialize, {
'content-type': 'application/json',
accept: 'application/json',
});
await transport.handleRequest(initRequest, responseFactory);
});
it('should send responses for requests', async () => {
const request = createMockKibanaRequest(TEST_MESSAGES.toolsList, {
'content-type': 'application/json',
accept: 'application/json',
});
const response = await transport.handleRequest(request, responseFactory);
expect(response.status).toBe(200);
const responseData = JSON.parse(response.payload as string);
expect(responseData).toMatchObject({
jsonrpc: '2.0',
result: {
tools: [
{
name: 'greet',
description: 'A simple greeting tool',
inputSchema: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'Name to greet',
},
},
required: ['name'],
additionalProperties: false,
$schema: 'http://json-schema.org/draft-07/schema#',
},
},
],
},
id: 'tools-1',
});
});
});
describe('Connection Management', () => {
it('should clean up resources on close', async () => {
const initRequest = createMockKibanaRequest(TEST_MESSAGES.initialize, {
'content-type': 'application/json',
accept: 'application/json',
});
await transport.handleRequest(initRequest, responseFactory);
const onCloseSpy = jest.fn();
transport.onclose = onCloseSpy;
await transport.close();
expect(onCloseSpy).toHaveBeenCalled();
expect((transport as any)._initialized).toBe(false);
expect((transport as any).sessionId).toBeUndefined();
});
});
describe('Error Handling in send()', () => {
it('should call onerror callback when send() fails', async () => {
const onErrorSpy = jest.fn();
transport.onerror = onErrorSpy;
// Try to send a response without establishing a connection
const response: JSONRPCMessage = {
jsonrpc: '2.0',
result: {},
id: 'non-existent',
};
await transport.send(response);
expect(onErrorSpy).toHaveBeenCalled();
expect(mockLogger.error).toHaveBeenCalled();
});
});
});

View file

@ -0,0 +1,368 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
import type { JSONRPCMessage, RequestId } from '@modelcontextprotocol/sdk/types.js';
import { ErrorCode } from '@modelcontextprotocol/sdk/types.js';
import type { KibanaResponseFactory, KibanaRequest } from '@kbn/core-http-server';
import {
isJSONRPCRequest,
isJSONRPCResponse,
isJSONRPCError,
JSONRPCMessageSchema,
isInitializeRequest,
} from '@modelcontextprotocol/sdk/types.js';
import { Logger } from '@kbn/logging';
import { AuthInfo } from '@modelcontextprotocol/sdk/server/auth/types';
import { randomUUID } from 'node:crypto';
import type { IKibanaResponse } from '@kbn/core/server';
/**
* Server transport for Streamable HTTP: this implements the MCP Streamable HTTP transport specification.
* It supports direct HTTP responses. It doesn't support SSE streaming.
*
* It is compatible with Kibana's HTTP request/response abstractions. Implementation is adapted from:
* https://github.com/modelcontextprotocol/typescript-sdk/blob/main/src/server/streamableHttp.ts
*
* In stateful mode:
* - Session ID is generated and included in response headers
* - Session ID is always included in initialization responses
* - Requests with invalid session IDs are rejected with 404 Not Found
* - Non-initialization requests without a session ID are rejected with 400 Bad Request
* - State is maintained in-memory (connections, message history)
*
* In stateless mode:
* - No Session ID is included in any responses
* - No session validation is performed
*/
export class KibanaMcpHttpTransport implements Transport {
// when sessionId is not set (undefined), it means the transport is in stateless mode
private sessionIdGenerator: (() => string) | undefined;
private _started: boolean = false;
private _streamMapping: Map<string, KibanaResponseFactory> = new Map();
private _requestToStreamMapping: Map<RequestId, string> = new Map();
private _requestResponseMap: Map<RequestId, JSONRPCMessage> = new Map();
private _initialized: boolean = false;
private _logger: Logger;
private _responseCallbacks: Map<string, (response: IKibanaResponse) => void> = new Map();
sessionId?: string | undefined;
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage, extra?: { authInfo?: AuthInfo }) => void;
constructor(options: { sessionIdGenerator?: () => string; logger: Logger }) {
this.sessionIdGenerator = options.sessionIdGenerator;
this._logger = options.logger;
}
/**
* Registers a callback for when a response is ready for a specific stream
*/
private registerResponseCallback(
streamId: string,
callback: (response: IKibanaResponse) => void
) {
this._responseCallbacks.set(streamId, callback);
}
/**
* Handles an incoming HTTP request, whether GET or POST
*/
async handleRequest(req: KibanaRequest, res: KibanaResponseFactory): Promise<IKibanaResponse> {
if (req.route.method === 'post') {
return await this.handlePostRequest(req, res);
} else {
return await this.handleUnsupportedRequest(res);
}
}
/**
* Starts the transport. This is required by the Transport interface but is a no-op
* for the Streamable HTTP transport as connections are managed per-request.
*/
async start(): Promise<void> {
if (this._started) {
throw new Error('Transport already started');
}
this._started = true;
}
/**
* Closes the transport and cleans up resources
*/
async close(): Promise<void> {
this._streamMapping.clear();
// Clear any pending responses
this._requestResponseMap.clear();
this._requestToStreamMapping.clear();
this._responseCallbacks.clear();
// Clear session state
this.sessionId = undefined;
this._initialized = false;
this.onclose?.();
}
/**
* Handles POST requests containing JSON-RPC messages compatible with MCP specification
*/
async handlePostRequest(
request: KibanaRequest<unknown, unknown, unknown>,
responseFactory: KibanaResponseFactory
): Promise<IKibanaResponse> {
try {
this._logger.debug('Processing POST request');
// Validate the Accept header
const acceptHeader = request.headers.accept;
// The client MUST include an Accept header, listing application/json as supported content type.
if (!acceptHeader?.includes('application/json')) {
this._logger.warn('Request rejected: Invalid Accept header');
return responseFactory.customError({
statusCode: 406,
body: JSON.stringify({
jsonrpc: '2.0',
error: {
code: ErrorCode.ConnectionClosed,
message: 'Not Acceptable: Client must accept application/json',
},
id: null,
}),
});
}
const ct = request.headers['content-type'];
if (!ct || !ct.includes('application/json')) {
this._logger.warn('Request rejected: Invalid Content-Type');
return responseFactory.customError({
statusCode: 415,
body: JSON.stringify({
jsonrpc: '2.0',
error: {
code: ErrorCode.ConnectionClosed,
message: 'Unsupported Media Type: Content-Type must be application/json',
},
id: null,
}),
});
}
const rawMessage = request.body;
let messages: JSONRPCMessage[];
// handle batch and single messages
if (Array.isArray(rawMessage)) {
this._logger.debug(`Processing batch request with ${rawMessage.length} messages`);
messages = rawMessage.map((msg) => JSONRPCMessageSchema.parse(msg));
} else {
this._logger.debug('Processing single message request');
messages = [JSONRPCMessageSchema.parse(rawMessage)];
}
// Check if this is an initialization request
const isInitializationRequest = messages.some(isInitializeRequest);
if (isInitializationRequest) {
if (this._initialized && this.sessionId !== undefined) {
this._logger.warn('Initialization request rejected - server already initialized');
return responseFactory.badRequest({
body: JSON.stringify({
jsonrpc: '2.0',
error: {
code: ErrorCode.InvalidRequest,
message: 'Invalid Request: Server already initialized',
},
id: null,
}),
});
}
if (messages.length > 1) {
this._logger.warn('Initialization request rejected - multiple messages not allowed');
return responseFactory.badRequest({
body: JSON.stringify({
jsonrpc: '2.0',
error: {
code: ErrorCode.InvalidRequest,
message: 'Invalid Request: Only one initialization request is allowed',
},
id: null,
}),
});
}
this.sessionId = this.sessionIdGenerator?.();
this._initialized = true;
this._logger.debug(`Session initialized: ${this.sessionId}`);
}
// check if it contains requests
const hasRequests = messages.some(isJSONRPCRequest);
if (!hasRequests) {
this._logger.debug('Processing notifications/responses');
// if it only contains notifications or responses, return 202
const response = responseFactory.accepted();
// handle each message
for (const message of messages) {
this.onmessage?.(message);
}
return response;
} else {
const streamId = randomUUID();
this._logger.debug(`Processing requests with stream ID: ${streamId}`);
// Create a promise that will resolve when the response is ready
const responsePromise = new Promise<IKibanaResponse>((resolve) => {
this.registerResponseCallback(streamId, resolve);
});
// Store the response factory for this request to send messages back through this connection
for (const message of messages) {
if (isJSONRPCRequest(message)) {
this._logger.debug(`Mapping request ${message.id} to stream ${streamId}`);
this._streamMapping.set(streamId, responseFactory);
this._requestToStreamMapping.set(message.id, streamId);
}
}
// Set up close handler for client disconnects
request.events.aborted$.subscribe(() => {
this._logger.debug(`Stream ${streamId} aborted by client`);
this._streamMapping.delete(streamId);
this._responseCallbacks.delete(streamId);
});
// handle each message
for (const message of messages) {
this.onmessage?.(message);
}
// Wait for the response to be ready
return await responsePromise;
}
} catch (error) {
this._logger.error(`Request processing error: ${error}`);
this.onerror?.(error as Error);
return responseFactory.badRequest({
body: JSON.stringify({
jsonrpc: '2.0',
error: {
code: ErrorCode.ParseError,
message: 'Parse error',
data: String(error),
},
id: null,
}),
});
}
}
/**
* Sends a JSON-RPC message
*/
async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise<void> {
try {
this._logger.debug('Processing outgoing message');
let requestId = options?.relatedRequestId;
if (isJSONRPCResponse(message) || isJSONRPCError(message)) {
requestId = message.id;
}
if (requestId === undefined) {
this._logger.debug('Processing notification message');
// todo: handle notifications
return;
}
// Get the response for this request
const streamId = this._requestToStreamMapping.get(requestId);
const responseFactory = this._streamMapping.get(streamId!);
if (!streamId) {
const error = `No connection established for request ID: ${String(requestId)}`;
this._logger.error(error);
throw new Error(error);
}
if (isJSONRPCResponse(message) || isJSONRPCError(message)) {
this._logger.debug(`Processing response for request ${String(requestId)}`);
this._requestResponseMap.set(requestId, message);
const relatedIds = Array.from(this._requestToStreamMapping.entries())
.filter(([_, sid]) => this._streamMapping.get(sid) === responseFactory)
.map(([id]) => id);
// Check if we have responses for all requests using this connection
const allResponsesReady = relatedIds.every((id) => this._requestResponseMap.has(id));
if (allResponsesReady) {
if (!responseFactory) {
const error = `No connection established for request ID: ${String(requestId)}`;
this._logger.error(error);
throw new Error(error);
}
// All responses ready, send as JSON
const headers: Record<string, string> = {
'Content-Type': 'application/json',
};
if (this.sessionId !== undefined) {
headers['mcp-session-id'] = this.sessionId;
}
const responses = relatedIds.map((id) => this._requestResponseMap.get(id)!);
const responseBody =
responses.length === 1 ? JSON.stringify(responses[0]) : JSON.stringify(responses);
const response = responseFactory.ok({
headers,
body: responseBody,
});
// Trigger the response callback
const callback = this._responseCallbacks.get(streamId);
if (callback) {
callback(response);
this._responseCallbacks.delete(streamId);
}
// Clean up
for (const id of relatedIds) {
this._requestResponseMap.delete(id);
this._requestToStreamMapping.delete(id);
}
this._streamMapping.delete(streamId);
this._logger.debug('Request mappings cleaned up');
}
}
} catch (error) {
this._logger.error(`Error sending message: ${error}`);
this.onerror?.(error as Error);
}
}
/**
* Handles unsupported requests (GET, DELETE, PUT, PATCH, etc.)
*/
private async handleUnsupportedRequest(res: KibanaResponseFactory): Promise<IKibanaResponse> {
return res.customError({
statusCode: 405,
body: JSON.stringify({
jsonrpc: '2.0',
error: {
code: ErrorCode.InvalidRequest,
message: 'Method not allowed.',
},
id: null,
}),
});
}
}

View file

@ -34,6 +34,8 @@
"@kbn/sse-utils-client",
"@kbn/sse-utils-server",
"@kbn/core-http-browser",
"@kbn/features-plugin"
"@kbn/features-plugin",
"@kbn/core-logging-server-mocks",
"@kbn/i18n"
]
}

View file

@ -8686,17 +8686,19 @@
dependencies:
"@types/mdx" "^2.0.0"
"@modelcontextprotocol/sdk@^1.6.0":
version "1.6.0"
resolved "https://registry.yarnpkg.com/@modelcontextprotocol/sdk/-/sdk-1.6.0.tgz#1d1849c9b36c0e494cf77398579dbd7d46c1ed34"
integrity sha512-585s8g+jzuGBomzgzDeP5l8gEyiSs+KhoAHbA2ZZ24Zgm83IZsyCLl/fmWhPHbfYsuLG8NE6SWGZA5ZBql8jSw==
"@modelcontextprotocol/sdk@^1.12.1":
version "1.12.1"
resolved "https://registry.yarnpkg.com/@modelcontextprotocol/sdk/-/sdk-1.12.1.tgz#f77503f0263b33cb1e5b81a6ff0c322393cabd37"
integrity sha512-KG1CZhZfWg+u8pxeM/mByJDScJSrjjxLc8fwQqbsS8xCjBmQfMNEBTotYdNanKekepnfRI85GtgQlctLFpcYPw==
dependencies:
ajv "^6.12.6"
content-type "^1.0.5"
cors "^2.8.5"
cross-spawn "^7.0.5"
eventsource "^3.0.2"
express "^5.0.1"
express-rate-limit "^7.5.0"
pkce-challenge "^4.1.0"
pkce-challenge "^5.0.0"
raw-body "^3.0.0"
zod "^3.23.8"
zod-to-json-schema "^3.24.1"
@ -13452,7 +13454,7 @@ ajv-keywords@^5.1.0:
dependencies:
fast-deep-equal "^3.1.3"
ajv@^6.12.2, ajv@^6.12.4, ajv@^6.12.5:
ajv@^6.12.2, ajv@^6.12.4, ajv@^6.12.5, ajv@^6.12.6:
version "6.12.6"
resolved "https://registry.yarnpkg.com/ajv/-/ajv-6.12.6.tgz#baf5a62e802b07d977034586f8c3baf5adf26df4"
integrity sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==
@ -16054,7 +16056,7 @@ cross-fetch@^3.1.5:
dependencies:
node-fetch "^2.7.0"
cross-spawn@^7.0.0, cross-spawn@^7.0.2, cross-spawn@^7.0.3:
cross-spawn@^7.0.0, cross-spawn@^7.0.2, cross-spawn@^7.0.3, cross-spawn@^7.0.5:
version "7.0.6"
resolved "https://registry.yarnpkg.com/cross-spawn/-/cross-spawn-7.0.6.tgz#8a58fe78f00dcd70c370451759dfbfaf03e8ee9f"
integrity sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==
@ -26051,7 +26053,7 @@ pixelmatch@5.3.0, pixelmatch@^5.3.0:
dependencies:
pngjs "^6.0.0"
pkce-challenge@3.1.0, pkce-challenge@^4.1.0:
pkce-challenge@3.1.0, pkce-challenge@^5.0.0:
version "3.1.0"
resolved "https://registry.yarnpkg.com/pkce-challenge/-/pkce-challenge-3.1.0.tgz#c974ee934e62c501f09da817964e75db201ee8bf"
integrity sha512-bQ/0XPZZ7eX+cdAkd61uYWpfMhakH3NeteUF1R8GNa+LMqX8QFAkbCLqq+AYAns1/ueACBu/BMWhrlKGrdvGZg==