diff --git a/package.json b/package.json index 9e45f28a354a..55596f5eba00 100644 --- a/package.json +++ b/package.json @@ -1096,7 +1096,7 @@ "@mapbox/mapbox-gl-rtl-text": "0.2.3", "@mapbox/mapbox-gl-supported": "2.0.1", "@mapbox/vector-tile": "1.3.1", - "@modelcontextprotocol/sdk": "^1.6.0", + "@modelcontextprotocol/sdk": "^1.12.1", "@n8n/json-schema-to-zod": "^1.1.0", "@openfeature/core": "^1.8.0", "@openfeature/launchdarkly-client-provider": "^0.3.2", diff --git a/packages/kbn-ts-type-check-cli/run_type_check_cli.ts b/packages/kbn-ts-type-check-cli/run_type_check_cli.ts index 15d8759dd203..5ff77e77aa72 100644 --- a/packages/kbn-ts-type-check-cli/run_type_check_cli.ts +++ b/packages/kbn-ts-type-check-cli/run_type_check_cli.ts @@ -128,7 +128,7 @@ run( ...(flagsReader.boolean('verbose') ? ['--verbose'] : []), ], env: { - NODE_OPTIONS: '--max-old-space-size=8192', + NODE_OPTIONS: '--max-old-space-size=10240', }, cwd: REPO_ROOT, wait: true, diff --git a/src/platform/plugins/private/kibana_usage_collection/server/collectors/management/schema.ts b/src/platform/plugins/private/kibana_usage_collection/server/collectors/management/schema.ts index b930c375464c..38fc0bc1f2e2 100644 --- a/src/platform/plugins/private/kibana_usage_collection/server/collectors/management/schema.ts +++ b/src/platform/plugins/private/kibana_usage_collection/server/collectors/management/schema.ts @@ -499,6 +499,10 @@ export const stackManagementSchema: MakeSchemaFrom = { _meta: { description: 'Non-default value of setting.' }, }, }, + 'onechat:mcpServer:enabled': { + type: 'boolean', + _meta: { description: 'Non-default value of setting.' }, + }, 'banners:placement': { type: 'keyword', _meta: { description: 'Non-default value of setting.' }, diff --git a/src/platform/plugins/private/kibana_usage_collection/server/collectors/management/types.ts b/src/platform/plugins/private/kibana_usage_collection/server/collectors/management/types.ts index 1884c8eb2566..88cd565a8df7 100644 --- a/src/platform/plugins/private/kibana_usage_collection/server/collectors/management/types.ts +++ b/src/platform/plugins/private/kibana_usage_collection/server/collectors/management/types.ts @@ -49,6 +49,7 @@ export interface UsageStats { 'observability:newLogsOverview': boolean; 'observability:aiAssistantSimulatedFunctionCalling': boolean; 'observability:aiAssistantSearchConnectorIndexPattern': string; + 'onechat:mcpServer:enabled': boolean; 'visualization:heatmap:maxBuckets': number; 'visualization:useLegacyTimeAxis': boolean; 'visualization:regionmap:showWarnings': boolean; diff --git a/src/platform/plugins/shared/telemetry/schema/oss_platform.json b/src/platform/plugins/shared/telemetry/schema/oss_platform.json index 749e010f22d6..e3a51e0f692a 100644 --- a/src/platform/plugins/shared/telemetry/schema/oss_platform.json +++ b/src/platform/plugins/shared/telemetry/schema/oss_platform.json @@ -11307,6 +11307,12 @@ } } }, + "onechat:mcpServer:enabled": { + "type": "boolean", + "_meta": { + "description": "Non-default value of setting." + } + }, "banners:placement": { "type": "keyword", "_meta": { diff --git a/x-pack/platform/plugins/shared/onechat/README.md b/x-pack/platform/plugins/shared/onechat/README.md index 53fc3c671b19..9206a2a3f5a8 100644 --- a/x-pack/platform/plugins/shared/onechat/README.md +++ b/x-pack/platform/plugins/shared/onechat/README.md @@ -17,7 +17,9 @@ The onechat plugin exposes APIs to interact with onechat primitives. The main primitives are: -- tools +- [tools](#tools) + +Additionally, the plugin implements [MCP server](#mcp-server) that exposes onechat tools. ## Tools @@ -219,3 +221,38 @@ try { } } ``` + +## MCP Server + +The MCP server provides a standardized interface for external MCP clients to access onechat tools. + + +### Running with Claude Desktop + +To enable the MCP server, add the following to your Kibana config: + +```yaml +uiSettings.overrides: + onechat:mcpServer:enabled: true +``` + +Configure Claude Desktop by adding this to its configuration: + +```json +{ + "mcpServers": { + "elastic": { + "command": "npx", + "args": [ + "mcp-remote", + "http://localhost:5601/api/mcp", + "--header", + "Authorization: ApiKey ${API_KEY}" + ], + "env": { + "API_KEY": "..." + } + } + } +} +``` diff --git a/x-pack/platform/plugins/shared/onechat/common/constants.ts b/x-pack/platform/plugins/shared/onechat/common/constants.ts new file mode 100644 index 000000000000..af62660944f1 --- /dev/null +++ b/x-pack/platform/plugins/shared/onechat/common/constants.ts @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +/** + * UI Setting ID for enabling / disabling the MCP server + */ +export const ONECHAT_MCP_SERVER_UI_SETTING_ID = 'onechat:mcpServer:enabled'; diff --git a/x-pack/platform/plugins/shared/onechat/server/plugin.ts b/x-pack/platform/plugins/shared/onechat/server/plugin.ts index a72629bb7f28..f4e7587aea81 100644 --- a/x-pack/platform/plugins/shared/onechat/server/plugin.ts +++ b/x-pack/platform/plugins/shared/onechat/server/plugin.ts @@ -7,6 +7,8 @@ import type { Logger } from '@kbn/logging'; import type { CoreSetup, CoreStart, Plugin, PluginInitializerContext } from '@kbn/core/server'; +import { i18n } from '@kbn/i18n'; +import { schema } from '@kbn/config-schema'; import type { OnechatConfig } from './config'; import type { OnechatPluginSetup, @@ -17,6 +19,7 @@ import type { import { registerRoutes } from './routes'; import { ServiceManager } from './services'; import { registerFeatures } from './features'; +import { ONECHAT_MCP_SERVER_UI_SETTING_ID } from '../common/constants'; export class OnechatPlugin implements @@ -47,6 +50,21 @@ export class OnechatPlugin registerFeatures({ features: pluginsSetup.features }); + coreSetup.uiSettings.register({ + [ONECHAT_MCP_SERVER_UI_SETTING_ID]: { + description: i18n.translate('onechat.uiSettings.mcpServer.description', { + defaultMessage: 'Enables MCP server with access to tools.', + }), + name: i18n.translate('onechat.uiSettings.mcpServer.name', { + defaultMessage: 'MCP Server', + }), + schema: schema.boolean(), + value: false, + readonly: true, + readonlyMode: 'ui', + }, + }); + const router = coreSetup.http.createRouter(); registerRoutes({ router, diff --git a/x-pack/platform/plugins/shared/onechat/server/routes/index.ts b/x-pack/platform/plugins/shared/onechat/server/routes/index.ts index 382cc6f691b7..b583a950a14f 100644 --- a/x-pack/platform/plugins/shared/onechat/server/routes/index.ts +++ b/x-pack/platform/plugins/shared/onechat/server/routes/index.ts @@ -10,10 +10,12 @@ import { registerToolsRoutes } from './tools'; import { registerAgentRoutes } from './agents'; import { registerChatRoutes } from './chat'; import { registerConversationRoutes } from './conversations'; +import { registerMCPRoutes } from './mcp'; export const registerRoutes = (dependencies: RouteDependencies) => { registerToolsRoutes(dependencies); registerAgentRoutes(dependencies); registerChatRoutes(dependencies); registerConversationRoutes(dependencies); + registerMCPRoutes(dependencies); }; diff --git a/x-pack/platform/plugins/shared/onechat/server/routes/mcp.ts b/x-pack/platform/plugins/shared/onechat/server/routes/mcp.ts new file mode 100644 index 000000000000..783a2ef1aaeb --- /dev/null +++ b/x-pack/platform/plugins/shared/onechat/server/routes/mcp.ts @@ -0,0 +1,216 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { ErrorCode } from '@modelcontextprotocol/sdk/types.js'; +import { schema } from '@kbn/config-schema'; +import { apiPrivileges } from '../../common/features'; +import type { RouteDependencies } from './types'; +import { getHandlerWrapper } from './wrap_handler'; +import { KibanaMcpHttpTransport } from '../utils/kibana_mcp_http_transport'; +import { ONECHAT_MCP_SERVER_UI_SETTING_ID } from '../../common/constants'; + +const TECHNICAL_PREVIEW_WARNING = + 'Elastic MCP Server is in technical preview and may be changed or removed in a future release. Elastic will work to fix any issues, but features in technical preview are not subject to the support SLA of official GA features.'; + +const MCP_SERVER_NAME = 'elastic-mcp-server'; +const MCP_SERVER_VERSION = '0.0.1'; +const MCP_SERVER_PATH = '/api/mcp'; + +export function registerMCPRoutes({ router, getInternalServices, logger }: RouteDependencies) { + const wrapHandler = getHandlerWrapper({ logger }); + + router.versioned + .post({ + path: MCP_SERVER_PATH, + security: { + authz: { requiredPrivileges: [apiPrivileges.readOnechat] }, + }, + access: 'public', + summary: 'MCP server', + description: TECHNICAL_PREVIEW_WARNING, + options: { + tags: ['mcp'], + xsrfRequired: false, + availability: { + stability: 'experimental', + }, + }, + }) + .addVersion( + { + version: '2023-10-31', + validate: { + request: { body: schema.object({}, { unknowns: 'allow' }) }, + }, + }, + wrapHandler(async (ctx, request, response) => { + let transport: KibanaMcpHttpTransport | undefined; + let server: McpServer | undefined; + + const { uiSettings } = await ctx.core; + const enabled = await uiSettings.client.get(ONECHAT_MCP_SERVER_UI_SETTING_ID); + + if (!enabled) { + return response.notFound(); + } + + try { + transport = new KibanaMcpHttpTransport({ sessionIdGenerator: undefined, logger }); + + // Instantiate new MCP server upon every request, no session persistence + server = new McpServer({ + name: MCP_SERVER_NAME, + version: MCP_SERVER_VERSION, + }); + + const { tools: toolService } = getInternalServices(); + + const registry = toolService.registry.asScopedPublicRegistry({ request }); + const tools = await registry.list({}); + + // Expose tools scoped to the request + for (const tool of tools) { + server.tool( + tool.id, + tool.description, + tool.schema.shape, + async (args: { [x: string]: any }) => { + const toolResult = await tool.execute({ toolParams: args }); + return { + content: [{ type: 'text' as const, text: JSON.stringify(toolResult) }], + }; + } + ); + } + + request.events.aborted$.subscribe(async () => { + await transport?.close().catch((error) => { + logger.error('MCP Server: Error closing transport', { error }); + }); + await server?.close().catch((error) => { + logger.error('MCP Server: Error closing server', { error }); + }); + }); + + await server.connect(transport); + + return await transport.handleRequest(request, response); + } catch (error) { + logger.error('MCP Server: Error handling request', { error }); + try { + await transport?.close(); + } catch (closeError) { + logger.error('MCP Server: Error closing transport during error handling', { + error: closeError, + }); + } + if (server) { + try { + await server.close(); + } catch (closeError) { + logger.error('MCP Server: Error closing server during error handling', { + error: closeError, + }); + } + } + + logger.error('MCP Server: Error handling request', { error }); + return response.customError({ + statusCode: 500, + body: { + message: `Internal server error: ${error}`, + attributes: { + code: ErrorCode.InternalError, + }, + }, + }); + } + }) + ); + + router.versioned + .get({ + path: MCP_SERVER_PATH, + security: { + authz: { requiredPrivileges: [apiPrivileges.readOnechat] }, + }, + access: 'public', + summary: 'MCP server', + description: TECHNICAL_PREVIEW_WARNING, + options: { + tags: ['mcp'], + availability: { + stability: 'experimental', + }, + }, + }) + .addVersion( + { + version: '2023-10-31', + validate: false, + }, + wrapHandler(async (ctx, _, response) => { + const { uiSettings } = await ctx.core; + const enabled = await uiSettings.client.get(ONECHAT_MCP_SERVER_UI_SETTING_ID); + + if (!enabled) { + return response.notFound(); + } + return response.customError({ + statusCode: 405, + body: { + message: 'Method not allowed', + attributes: { + code: ErrorCode.ConnectionClosed, + }, + }, + }); + }) + ); + + router.versioned + .delete({ + path: MCP_SERVER_PATH, + security: { + authz: { requiredPrivileges: [apiPrivileges.readOnechat] }, + }, + access: 'public', + summary: 'MCP server', + description: TECHNICAL_PREVIEW_WARNING, + options: { + tags: ['mcp'], + xsrfRequired: false, + availability: { + stability: 'experimental', + }, + }, + }) + .addVersion( + { + version: '2023-10-31', + validate: false, + }, + wrapHandler(async (ctx, _, response) => { + const { uiSettings } = await ctx.core; + const enabled = await uiSettings.client.get(ONECHAT_MCP_SERVER_UI_SETTING_ID); + + if (!enabled) { + return response.notFound(); + } + return response.customError({ + statusCode: 405, + body: { + message: 'Method not allowed', + attributes: { + code: ErrorCode.ConnectionClosed, + }, + }, + }); + }) + ); +} diff --git a/x-pack/platform/plugins/shared/onechat/server/utils/kibana_mcp_http_transport.test.ts b/x-pack/platform/plugins/shared/onechat/server/utils/kibana_mcp_http_transport.test.ts new file mode 100644 index 000000000000..5a5d8e5ae5c5 --- /dev/null +++ b/x-pack/platform/plugins/shared/onechat/server/utils/kibana_mcp_http_transport.test.ts @@ -0,0 +1,517 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { KibanaMcpHttpTransport } from './kibana_mcp_http_transport'; +import type { KibanaRequest, KibanaResponseFactory, RouteMethod } from '@kbn/core-http-server'; +import { httpServerMock } from '@kbn/core/server/mocks'; +import { loggingSystemMock } from '@kbn/core-logging-server-mocks'; +import { CallToolResult, JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js'; +import { randomUUID } from 'node:crypto'; +import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { z } from '@kbn/zod'; + +const mockLoggerFactory = loggingSystemMock.create(); +const mockLogger = mockLoggerFactory.get('mock logger'); + +interface TestServerConfig { + sessionIdGenerator: (() => string) | undefined; +} + +/** + * Creates a minimal response factory for testing that returns proper response objects + */ +function createMinimalResponseFactory(): KibanaResponseFactory { + return { + ok: (options?: { body?: any; headers?: Record }) => ({ + status: 200, + payload: options?.body, + options: { + headers: options?.headers || {}, + }, + }), + accepted: (options?: { body?: any; headers?: Record }) => ({ + status: 202, + payload: options?.body, + options: { + headers: options?.headers || {}, + }, + }), + badRequest: (options?: { body?: any; headers?: Record }) => ({ + status: 400, + payload: options?.body, + options: { + headers: options?.headers || {}, + }, + }), + customError: (options: { + statusCode: number; + body?: any; + headers?: Record; + }) => ({ + status: options.statusCode, + payload: options.body, + options: { + headers: options?.headers || {}, + }, + }), + } as unknown as KibanaResponseFactory; +} + +async function createTestServer( + config: TestServerConfig = { sessionIdGenerator: () => randomUUID() } +): Promise<{ + transport: KibanaMcpHttpTransport; + mcpServer: McpServer; +}> { + const mcpServer = new McpServer( + { name: 'test-server', version: '1.0.0' }, + { capabilities: { logging: {} } } + ); + + mcpServer.tool( + 'greet', + 'A simple greeting tool', + { name: z.string().describe('Name to greet') }, + async ({ name }): Promise => { + return { content: [{ type: 'text', text: `Hello, ${name}!` }] }; + } + ); + + const transport = new KibanaMcpHttpTransport({ + sessionIdGenerator: config.sessionIdGenerator, + logger: mockLogger, + }); + + await mcpServer.connect(transport); + + return { transport, mcpServer }; +} + +const createMockKibanaRequest = ( + body?: any, + headers: Record = {}, + method: RouteMethod = 'post' +): jest.Mocked => + httpServerMock.createKibanaRequest({ + headers, + body, + method, + path: '/test', + }); + +const TEST_MESSAGES = { + initialize: { + jsonrpc: '2.0', + method: 'initialize', + params: { + clientInfo: { name: 'test-client', version: '1.0' }, + protocolVersion: '2025-03-26', + capabilities: {}, + }, + id: 'init-1', + } as JSONRPCMessage, + + toolsList: { + jsonrpc: '2.0', + method: 'tools/list', + params: {}, + id: 'tools-1', + } as JSONRPCMessage, +}; + +describe('KibanaMcpHttpTransport', () => { + let transport: KibanaMcpHttpTransport; + let responseFactory: KibanaResponseFactory; + + beforeEach(async () => { + jest.clearAllMocks(); + const { transport: newTransport } = await createTestServer(); + transport = newTransport; + responseFactory = createMinimalResponseFactory(); + }); + + afterEach(async () => { + await transport.close(); + }); + + describe('Initialization', () => { + it('should reject starting transport twice', async () => { + await expect(transport.start()).rejects.toThrow('Transport already started'); + await transport.close(); + }); + + it('should initialize server and generate session ID', async () => { + const request = createMockKibanaRequest(TEST_MESSAGES.initialize, { + 'content-type': 'application/json', + accept: 'application/json', + }); + + const response = await transport.handleRequest(request, responseFactory); + + expect(response.status).toBe(200); + const headers = response.options.headers as Record; + expect(headers?.['mcp-session-id']).toBeDefined(); + expect(headers?.['Content-Type']).toBe('application/json'); + }); + + it('should reject second initialization request', async () => { + // First initialization + const firstRequest = createMockKibanaRequest(TEST_MESSAGES.initialize, { + 'content-type': 'application/json', + accept: 'application/json', + }); + + await transport.handleRequest(firstRequest, responseFactory); + + // Second initialization + const secondRequest = createMockKibanaRequest( + { ...TEST_MESSAGES.initialize, id: 'second-init' }, + { 'content-type': 'application/json', accept: 'application/json' } + ); + + const response = await transport.handleRequest(secondRequest, responseFactory); + + expect(response.status).toBe(400); + const errorData = JSON.parse(response.payload as string); + expect(errorData).toMatchObject({ + jsonrpc: '2.0', + error: { + code: -32600, + message: expect.stringMatching(/Server already initialized/), + }, + }); + }); + + it('should reject batch initialize request', async () => { + const batchInitMessages = [ + TEST_MESSAGES.initialize, + { ...TEST_MESSAGES.initialize, id: 'init-2' }, + ]; + + const request = createMockKibanaRequest(batchInitMessages, { + 'content-type': 'application/json', + accept: 'application/json', + }); + + const response = await transport.handleRequest(request, responseFactory); + + expect(response.status).toBe(400); + const errorData = JSON.parse(response.payload as string); + expect(errorData).toMatchObject({ + jsonrpc: '2.0', + error: { + code: -32600, + message: expect.stringMatching(/Only one initialization request is allowed/), + }, + }); + }); + }); + + describe('Request Handling', () => { + beforeEach(async () => { + // Initialize transport to get session ID + const initRequest = createMockKibanaRequest(TEST_MESSAGES.initialize, { + 'content-type': 'application/json', + accept: 'application/json', + }); + + await transport.handleRequest(initRequest, responseFactory); + }); + + it('should handle POST requests with valid session', async () => { + const request = createMockKibanaRequest(TEST_MESSAGES.toolsList, { + 'content-type': 'application/json', + accept: 'application/json', + }); + + const response = await transport.handleRequest(request, responseFactory); + + expect(response.status).toBe(200); + const headers = response.options.headers as Record; + expect(headers?.['Content-Type']).toBe('application/json'); + + const responseData = JSON.parse(response.payload as string); + + expect(responseData).toMatchObject({ + jsonrpc: '2.0', + result: { + tools: [ + { + name: 'greet', + description: 'A simple greeting tool', + inputSchema: { + type: 'object', + properties: { + name: { + type: 'string', + description: 'Name to greet', + }, + }, + required: ['name'], + additionalProperties: false, + $schema: 'http://json-schema.org/draft-07/schema#', + }, + }, + ], + }, + id: 'tools-1', + }); + }); + + it('should handle notifications with 202 response', async () => { + const request = createMockKibanaRequest( + { jsonrpc: '2.0', method: 'notification/progress', params: { progress: 1 } }, + { + 'content-type': 'application/json', + accept: 'application/json', + } + ); + + const response = await transport.handleRequest(request, responseFactory); + + expect(response.status).toBe(202); + }); + + it('should handle batch notifications with 202 response', async () => { + const batchNotifications = [ + { jsonrpc: '2.0', method: 'notification/progress', params: { progress: 1 } }, + { jsonrpc: '2.0', method: 'notification/progress', params: { progress: 2 } }, + ]; + + const request = createMockKibanaRequest(batchNotifications, { + 'content-type': 'application/json', + accept: 'application/json', + }); + + const response = await transport.handleRequest(request, responseFactory); + + expect(response.status).toBe(202); + }); + + it('should handle batch requests with multiple responses', async () => { + const batchRequests = [ + { jsonrpc: '2.0', method: 'tools/list', params: {}, id: 'req-1' }, + { jsonrpc: '2.0', method: 'ping', params: {}, id: 'req-2' }, + ]; + + const request = createMockKibanaRequest(batchRequests, { + 'content-type': 'application/json', + accept: 'application/json', + }); + + const response = await transport.handleRequest(request, responseFactory); + + expect(response.status).toBe(200); + + const responseData = JSON.parse(response.payload as string); + expect(Array.isArray(responseData)).toBe(true); + expect(responseData).toHaveLength(2); + }); + }); + + describe('Error Handling', () => { + it('should reject requests without Accept header containing application/json', async () => { + const request = createMockKibanaRequest(TEST_MESSAGES.initialize, { + 'content-type': 'application/json', + accept: 'text/html', + }); + + const response = await transport.handleRequest(request, responseFactory); + + expect(response.status).toBe(406); + const errorData = JSON.parse(response.payload as string); + expect(errorData).toMatchObject({ + jsonrpc: '2.0', + error: { + code: -32000, + message: expect.stringMatching(/Client must accept application\/json/), + }, + }); + }); + + it('should reject requests with invalid Content-Type', async () => { + const request = createMockKibanaRequest(TEST_MESSAGES.initialize, { + 'content-type': 'text/plain', + accept: 'application/json', + }); + + const response = await transport.handleRequest(request, responseFactory); + + expect(response.status).toBe(415); + const errorData = JSON.parse(response.payload as string); + expect(errorData).toMatchObject({ + jsonrpc: '2.0', + error: { + code: -32000, + message: expect.stringMatching(/Content-Type must be application\/json/), + }, + }); + }); + + it('should handle unsupported HTTP methods', async () => { + const request = createMockKibanaRequest(undefined, {}, 'get'); + + const response = await transport.handleRequest(request, responseFactory); + + expect(response.status).toBe(405); + const errorData = JSON.parse(response.payload as string); + expect(errorData).toMatchObject({ + jsonrpc: '2.0', + error: { + code: -32600, + message: 'Method not allowed.', + }, + }); + }); + + it('should handle invalid JSON data', async () => { + // Create request with invalid JSON structure that will fail parsing + const request = createMockKibanaRequest( + 'invalid json', // This will cause JSON parsing to fail + { 'content-type': 'application/json', accept: 'application/json' } + ); + + const response = await transport.handleRequest(request, responseFactory); + + expect(response.status).toBe(400); + const errorData = JSON.parse(response.payload as string); + expect(errorData).toMatchObject({ + jsonrpc: '2.0', + error: { + code: -32700, + message: 'Parse error', + }, + }); + }); + + it('should handle invalid JSON-RPC messages', async () => { + const invalidMessage = { method: 'test', params: {} }; // missing jsonrpc and id + + const request = createMockKibanaRequest(invalidMessage, { + 'content-type': 'application/json', + accept: 'application/json', + }); + + const response = await transport.handleRequest(request, responseFactory); + + expect(response.status).toBe(400); + }); + }); + + describe('Session Management', () => { + it('should include session ID in responses after initialization', async () => { + const initRequest = createMockKibanaRequest(TEST_MESSAGES.initialize, { + 'content-type': 'application/json', + accept: 'application/json', + }); + + const initResponse = await transport.handleRequest(initRequest, responseFactory); + const initHeaders = initResponse.options.headers as Record; + const sessionId = initHeaders?.['mcp-session-id'] as string; + + expect(sessionId).toBeDefined(); + + const toolsRequest = createMockKibanaRequest(TEST_MESSAGES.toolsList, { + 'content-type': 'application/json', + accept: 'application/json', + }); + + const toolsResponse = await transport.handleRequest(toolsRequest, responseFactory); + + expect(toolsResponse.status).toBe(200); + const toolsHeaders = toolsResponse.options.headers as Record; + expect(toolsHeaders?.['mcp-session-id']).toBe(sessionId); + }); + }); + + describe('Message Sending', () => { + beforeEach(async () => { + const initRequest = createMockKibanaRequest(TEST_MESSAGES.initialize, { + 'content-type': 'application/json', + accept: 'application/json', + }); + + await transport.handleRequest(initRequest, responseFactory); + }); + + it('should send responses for requests', async () => { + const request = createMockKibanaRequest(TEST_MESSAGES.toolsList, { + 'content-type': 'application/json', + accept: 'application/json', + }); + + const response = await transport.handleRequest(request, responseFactory); + + expect(response.status).toBe(200); + + const responseData = JSON.parse(response.payload as string); + expect(responseData).toMatchObject({ + jsonrpc: '2.0', + result: { + tools: [ + { + name: 'greet', + description: 'A simple greeting tool', + inputSchema: { + type: 'object', + properties: { + name: { + type: 'string', + description: 'Name to greet', + }, + }, + required: ['name'], + additionalProperties: false, + $schema: 'http://json-schema.org/draft-07/schema#', + }, + }, + ], + }, + id: 'tools-1', + }); + }); + }); + + describe('Connection Management', () => { + it('should clean up resources on close', async () => { + const initRequest = createMockKibanaRequest(TEST_MESSAGES.initialize, { + 'content-type': 'application/json', + accept: 'application/json', + }); + + await transport.handleRequest(initRequest, responseFactory); + + const onCloseSpy = jest.fn(); + transport.onclose = onCloseSpy; + + await transport.close(); + + expect(onCloseSpy).toHaveBeenCalled(); + expect((transport as any)._initialized).toBe(false); + expect((transport as any).sessionId).toBeUndefined(); + }); + }); + + describe('Error Handling in send()', () => { + it('should call onerror callback when send() fails', async () => { + const onErrorSpy = jest.fn(); + transport.onerror = onErrorSpy; + + // Try to send a response without establishing a connection + const response: JSONRPCMessage = { + jsonrpc: '2.0', + result: {}, + id: 'non-existent', + }; + + await transport.send(response); + + expect(onErrorSpy).toHaveBeenCalled(); + expect(mockLogger.error).toHaveBeenCalled(); + }); + }); +}); diff --git a/x-pack/platform/plugins/shared/onechat/server/utils/kibana_mcp_http_transport.ts b/x-pack/platform/plugins/shared/onechat/server/utils/kibana_mcp_http_transport.ts new file mode 100644 index 000000000000..69e98c17d457 --- /dev/null +++ b/x-pack/platform/plugins/shared/onechat/server/utils/kibana_mcp_http_transport.ts @@ -0,0 +1,368 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Transport } from '@modelcontextprotocol/sdk/shared/transport.js'; +import type { JSONRPCMessage, RequestId } from '@modelcontextprotocol/sdk/types.js'; +import { ErrorCode } from '@modelcontextprotocol/sdk/types.js'; +import type { KibanaResponseFactory, KibanaRequest } from '@kbn/core-http-server'; +import { + isJSONRPCRequest, + isJSONRPCResponse, + isJSONRPCError, + JSONRPCMessageSchema, + isInitializeRequest, +} from '@modelcontextprotocol/sdk/types.js'; +import { Logger } from '@kbn/logging'; +import { AuthInfo } from '@modelcontextprotocol/sdk/server/auth/types'; +import { randomUUID } from 'node:crypto'; +import type { IKibanaResponse } from '@kbn/core/server'; + +/** + * Server transport for Streamable HTTP: this implements the MCP Streamable HTTP transport specification. + * It supports direct HTTP responses. It doesn't support SSE streaming. + * + * It is compatible with Kibana's HTTP request/response abstractions. Implementation is adapted from: + * https://github.com/modelcontextprotocol/typescript-sdk/blob/main/src/server/streamableHttp.ts + * + * In stateful mode: + * - Session ID is generated and included in response headers + * - Session ID is always included in initialization responses + * - Requests with invalid session IDs are rejected with 404 Not Found + * - Non-initialization requests without a session ID are rejected with 400 Bad Request + * - State is maintained in-memory (connections, message history) + * + * In stateless mode: + * - No Session ID is included in any responses + * - No session validation is performed + */ +export class KibanaMcpHttpTransport implements Transport { + // when sessionId is not set (undefined), it means the transport is in stateless mode + private sessionIdGenerator: (() => string) | undefined; + private _started: boolean = false; + private _streamMapping: Map = new Map(); + private _requestToStreamMapping: Map = new Map(); + private _requestResponseMap: Map = new Map(); + private _initialized: boolean = false; + private _logger: Logger; + private _responseCallbacks: Map void> = new Map(); + + sessionId?: string | undefined; + onclose?: () => void; + onerror?: (error: Error) => void; + onmessage?: (message: JSONRPCMessage, extra?: { authInfo?: AuthInfo }) => void; + + constructor(options: { sessionIdGenerator?: () => string; logger: Logger }) { + this.sessionIdGenerator = options.sessionIdGenerator; + this._logger = options.logger; + } + + /** + * Registers a callback for when a response is ready for a specific stream + */ + private registerResponseCallback( + streamId: string, + callback: (response: IKibanaResponse) => void + ) { + this._responseCallbacks.set(streamId, callback); + } + + /** + * Handles an incoming HTTP request, whether GET or POST + */ + async handleRequest(req: KibanaRequest, res: KibanaResponseFactory): Promise { + if (req.route.method === 'post') { + return await this.handlePostRequest(req, res); + } else { + return await this.handleUnsupportedRequest(res); + } + } + + /** + * Starts the transport. This is required by the Transport interface but is a no-op + * for the Streamable HTTP transport as connections are managed per-request. + */ + async start(): Promise { + if (this._started) { + throw new Error('Transport already started'); + } + this._started = true; + } + + /** + * Closes the transport and cleans up resources + */ + async close(): Promise { + this._streamMapping.clear(); + + // Clear any pending responses + this._requestResponseMap.clear(); + this._requestToStreamMapping.clear(); + this._responseCallbacks.clear(); + + // Clear session state + this.sessionId = undefined; + this._initialized = false; + this.onclose?.(); + } + + /** + * Handles POST requests containing JSON-RPC messages compatible with MCP specification + */ + async handlePostRequest( + request: KibanaRequest, + responseFactory: KibanaResponseFactory + ): Promise { + try { + this._logger.debug('Processing POST request'); + + // Validate the Accept header + const acceptHeader = request.headers.accept; + + // The client MUST include an Accept header, listing application/json as supported content type. + if (!acceptHeader?.includes('application/json')) { + this._logger.warn('Request rejected: Invalid Accept header'); + return responseFactory.customError({ + statusCode: 406, + body: JSON.stringify({ + jsonrpc: '2.0', + error: { + code: ErrorCode.ConnectionClosed, + message: 'Not Acceptable: Client must accept application/json', + }, + id: null, + }), + }); + } + + const ct = request.headers['content-type']; + + if (!ct || !ct.includes('application/json')) { + this._logger.warn('Request rejected: Invalid Content-Type'); + return responseFactory.customError({ + statusCode: 415, + body: JSON.stringify({ + jsonrpc: '2.0', + error: { + code: ErrorCode.ConnectionClosed, + message: 'Unsupported Media Type: Content-Type must be application/json', + }, + id: null, + }), + }); + } + + const rawMessage = request.body; + let messages: JSONRPCMessage[]; + + // handle batch and single messages + if (Array.isArray(rawMessage)) { + this._logger.debug(`Processing batch request with ${rawMessage.length} messages`); + messages = rawMessage.map((msg) => JSONRPCMessageSchema.parse(msg)); + } else { + this._logger.debug('Processing single message request'); + messages = [JSONRPCMessageSchema.parse(rawMessage)]; + } + + // Check if this is an initialization request + const isInitializationRequest = messages.some(isInitializeRequest); + + if (isInitializationRequest) { + if (this._initialized && this.sessionId !== undefined) { + this._logger.warn('Initialization request rejected - server already initialized'); + return responseFactory.badRequest({ + body: JSON.stringify({ + jsonrpc: '2.0', + error: { + code: ErrorCode.InvalidRequest, + message: 'Invalid Request: Server already initialized', + }, + id: null, + }), + }); + } + if (messages.length > 1) { + this._logger.warn('Initialization request rejected - multiple messages not allowed'); + return responseFactory.badRequest({ + body: JSON.stringify({ + jsonrpc: '2.0', + error: { + code: ErrorCode.InvalidRequest, + message: 'Invalid Request: Only one initialization request is allowed', + }, + id: null, + }), + }); + } + this.sessionId = this.sessionIdGenerator?.(); + this._initialized = true; + this._logger.debug(`Session initialized: ${this.sessionId}`); + } + + // check if it contains requests + const hasRequests = messages.some(isJSONRPCRequest); + + if (!hasRequests) { + this._logger.debug('Processing notifications/responses'); + // if it only contains notifications or responses, return 202 + const response = responseFactory.accepted(); + + // handle each message + for (const message of messages) { + this.onmessage?.(message); + } + return response; + } else { + const streamId = randomUUID(); + this._logger.debug(`Processing requests with stream ID: ${streamId}`); + + // Create a promise that will resolve when the response is ready + const responsePromise = new Promise((resolve) => { + this.registerResponseCallback(streamId, resolve); + }); + + // Store the response factory for this request to send messages back through this connection + for (const message of messages) { + if (isJSONRPCRequest(message)) { + this._logger.debug(`Mapping request ${message.id} to stream ${streamId}`); + this._streamMapping.set(streamId, responseFactory); + this._requestToStreamMapping.set(message.id, streamId); + } + } + + // Set up close handler for client disconnects + request.events.aborted$.subscribe(() => { + this._logger.debug(`Stream ${streamId} aborted by client`); + this._streamMapping.delete(streamId); + this._responseCallbacks.delete(streamId); + }); + + // handle each message + for (const message of messages) { + this.onmessage?.(message); + } + + // Wait for the response to be ready + return await responsePromise; + } + } catch (error) { + this._logger.error(`Request processing error: ${error}`); + this.onerror?.(error as Error); + return responseFactory.badRequest({ + body: JSON.stringify({ + jsonrpc: '2.0', + error: { + code: ErrorCode.ParseError, + message: 'Parse error', + data: String(error), + }, + id: null, + }), + }); + } + } + + /** + * Sends a JSON-RPC message + */ + async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise { + try { + this._logger.debug('Processing outgoing message'); + + let requestId = options?.relatedRequestId; + if (isJSONRPCResponse(message) || isJSONRPCError(message)) { + requestId = message.id; + } + + if (requestId === undefined) { + this._logger.debug('Processing notification message'); + // todo: handle notifications + return; + } + + // Get the response for this request + const streamId = this._requestToStreamMapping.get(requestId); + const responseFactory = this._streamMapping.get(streamId!); + + if (!streamId) { + const error = `No connection established for request ID: ${String(requestId)}`; + this._logger.error(error); + throw new Error(error); + } + + if (isJSONRPCResponse(message) || isJSONRPCError(message)) { + this._logger.debug(`Processing response for request ${String(requestId)}`); + this._requestResponseMap.set(requestId, message); + const relatedIds = Array.from(this._requestToStreamMapping.entries()) + .filter(([_, sid]) => this._streamMapping.get(sid) === responseFactory) + .map(([id]) => id); + + // Check if we have responses for all requests using this connection + const allResponsesReady = relatedIds.every((id) => this._requestResponseMap.has(id)); + + if (allResponsesReady) { + if (!responseFactory) { + const error = `No connection established for request ID: ${String(requestId)}`; + this._logger.error(error); + throw new Error(error); + } + + // All responses ready, send as JSON + const headers: Record = { + 'Content-Type': 'application/json', + }; + if (this.sessionId !== undefined) { + headers['mcp-session-id'] = this.sessionId; + } + + const responses = relatedIds.map((id) => this._requestResponseMap.get(id)!); + const responseBody = + responses.length === 1 ? JSON.stringify(responses[0]) : JSON.stringify(responses); + + const response = responseFactory.ok({ + headers, + body: responseBody, + }); + + // Trigger the response callback + const callback = this._responseCallbacks.get(streamId); + if (callback) { + callback(response); + this._responseCallbacks.delete(streamId); + } + + // Clean up + for (const id of relatedIds) { + this._requestResponseMap.delete(id); + this._requestToStreamMapping.delete(id); + } + this._streamMapping.delete(streamId); + this._logger.debug('Request mappings cleaned up'); + } + } + } catch (error) { + this._logger.error(`Error sending message: ${error}`); + this.onerror?.(error as Error); + } + } + + /** + * Handles unsupported requests (GET, DELETE, PUT, PATCH, etc.) + */ + private async handleUnsupportedRequest(res: KibanaResponseFactory): Promise { + return res.customError({ + statusCode: 405, + body: JSON.stringify({ + jsonrpc: '2.0', + error: { + code: ErrorCode.InvalidRequest, + message: 'Method not allowed.', + }, + id: null, + }), + }); + } +} diff --git a/x-pack/platform/plugins/shared/onechat/tsconfig.json b/x-pack/platform/plugins/shared/onechat/tsconfig.json index 789b515dc3c1..e6707a8b0145 100644 --- a/x-pack/platform/plugins/shared/onechat/tsconfig.json +++ b/x-pack/platform/plugins/shared/onechat/tsconfig.json @@ -34,6 +34,8 @@ "@kbn/sse-utils-client", "@kbn/sse-utils-server", "@kbn/core-http-browser", - "@kbn/features-plugin" + "@kbn/features-plugin", + "@kbn/core-logging-server-mocks", + "@kbn/i18n" ] } diff --git a/yarn.lock b/yarn.lock index 801125c7d36e..5ed658b210a8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -8686,17 +8686,19 @@ dependencies: "@types/mdx" "^2.0.0" -"@modelcontextprotocol/sdk@^1.6.0": - version "1.6.0" - resolved "https://registry.yarnpkg.com/@modelcontextprotocol/sdk/-/sdk-1.6.0.tgz#1d1849c9b36c0e494cf77398579dbd7d46c1ed34" - integrity sha512-585s8g+jzuGBomzgzDeP5l8gEyiSs+KhoAHbA2ZZ24Zgm83IZsyCLl/fmWhPHbfYsuLG8NE6SWGZA5ZBql8jSw== +"@modelcontextprotocol/sdk@^1.12.1": + version "1.12.1" + resolved "https://registry.yarnpkg.com/@modelcontextprotocol/sdk/-/sdk-1.12.1.tgz#f77503f0263b33cb1e5b81a6ff0c322393cabd37" + integrity sha512-KG1CZhZfWg+u8pxeM/mByJDScJSrjjxLc8fwQqbsS8xCjBmQfMNEBTotYdNanKekepnfRI85GtgQlctLFpcYPw== dependencies: + ajv "^6.12.6" content-type "^1.0.5" cors "^2.8.5" + cross-spawn "^7.0.5" eventsource "^3.0.2" express "^5.0.1" express-rate-limit "^7.5.0" - pkce-challenge "^4.1.0" + pkce-challenge "^5.0.0" raw-body "^3.0.0" zod "^3.23.8" zod-to-json-schema "^3.24.1" @@ -13452,7 +13454,7 @@ ajv-keywords@^5.1.0: dependencies: fast-deep-equal "^3.1.3" -ajv@^6.12.2, ajv@^6.12.4, ajv@^6.12.5: +ajv@^6.12.2, ajv@^6.12.4, ajv@^6.12.5, ajv@^6.12.6: version "6.12.6" resolved "https://registry.yarnpkg.com/ajv/-/ajv-6.12.6.tgz#baf5a62e802b07d977034586f8c3baf5adf26df4" integrity sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g== @@ -16054,7 +16056,7 @@ cross-fetch@^3.1.5: dependencies: node-fetch "^2.7.0" -cross-spawn@^7.0.0, cross-spawn@^7.0.2, cross-spawn@^7.0.3: +cross-spawn@^7.0.0, cross-spawn@^7.0.2, cross-spawn@^7.0.3, cross-spawn@^7.0.5: version "7.0.6" resolved "https://registry.yarnpkg.com/cross-spawn/-/cross-spawn-7.0.6.tgz#8a58fe78f00dcd70c370451759dfbfaf03e8ee9f" integrity sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA== @@ -26051,7 +26053,7 @@ pixelmatch@5.3.0, pixelmatch@^5.3.0: dependencies: pngjs "^6.0.0" -pkce-challenge@3.1.0, pkce-challenge@^4.1.0: +pkce-challenge@3.1.0, pkce-challenge@^5.0.0: version "3.1.0" resolved "https://registry.yarnpkg.com/pkce-challenge/-/pkce-challenge-3.1.0.tgz#c974ee934e62c501f09da817964e75db201ee8bf" integrity sha512-bQ/0XPZZ7eX+cdAkd61uYWpfMhakH3NeteUF1R8GNa+LMqX8QFAkbCLqq+AYAns1/ueACBu/BMWhrlKGrdvGZg==