From 9e84dc709dd0755fe1626dc6f17edc1eec621124 Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Fri, 13 Sep 2024 10:18:50 +0200 Subject: [PATCH 1/4] Add unstructured log handling --- .../server/graphs/kv/validate.ts | 2 +- .../server/graphs/log_type_detection/graph.ts | 12 +- .../server/graphs/unstructured/constants.ts | 27 +++++ .../server/graphs/unstructured/error.ts | 32 +++++ .../server/graphs/unstructured/graph.ts | 112 ++++++++++++++++++ .../server/graphs/unstructured/index.ts | 7 ++ .../server/graphs/unstructured/prompts.ts | 105 ++++++++++++++++ .../server/graphs/unstructured/types.ts | 31 +++++ .../server/graphs/unstructured/unstuctured.ts | 32 +++++ .../server/graphs/unstructured/validate.ts | 41 +++++++ .../server/routes/analyze_logs_routes.ts | 2 +- .../server/templates/processors/grok.yml.njk | 3 +- .../integration_assistant/server/types.ts | 13 ++ .../server/util/processors.ts | 4 +- 14 files changed, 413 insertions(+), 10 deletions(-) create mode 100644 x-pack/plugins/integration_assistant/server/graphs/unstructured/constants.ts create mode 100644 x-pack/plugins/integration_assistant/server/graphs/unstructured/error.ts create mode 100644 x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.ts create mode 100644 x-pack/plugins/integration_assistant/server/graphs/unstructured/index.ts create mode 100644 x-pack/plugins/integration_assistant/server/graphs/unstructured/prompts.ts create mode 100644 x-pack/plugins/integration_assistant/server/graphs/unstructured/types.ts create mode 100644 x-pack/plugins/integration_assistant/server/graphs/unstructured/unstuctured.ts create mode 100644 x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.ts diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts index 0bca2ac3fd5e4..6b22cf399a906 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts @@ -65,7 +65,7 @@ export async function handleHeaderValidate({ client, }: HandleKVNodeParams): Promise> { const grokPattern = state.grokPattern; - const grokProcessor = createGrokProcessor(grokPattern); + const grokProcessor = createGrokProcessor([grokPattern]); const pipeline = { processors: grokProcessor, on_failure: [onFailure] }; const { pipelineResults, errors } = (await testPipeline(state.logSamples, pipeline, client)) as { diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts index 5ef894bf64a20..b1cdecd39fe69 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts @@ -14,6 +14,7 @@ import { ESProcessorItem, SamplesFormat } from '../../../common'; import { getKVGraph } from '../kv/graph'; import { LogDetectionGraphParams, LogDetectionBaseNodeParams } from './types'; import { LogFormat } from '../../constants'; +import { getUnstructuredGraph } from '../unstructured/graph'; const graphState: StateGraphArgs['channels'] = { lastExecutedChain: { @@ -90,9 +91,9 @@ function logFormatRouter({ state }: LogDetectionBaseNodeParams): string { if (state.samplesFormat.name === LogFormat.STRUCTURED) { return 'structured'; } - // if (state.samplesFormat === LogFormat.UNSTRUCTURED) { - // return 'unstructured'; - // } + if (state.samplesFormat.name === LogFormat.UNSTRUCTURED) { + return 'unstructured'; + } // if (state.samplesFormat === LogFormat.CSV) { // return 'csv'; // } @@ -109,18 +110,19 @@ export async function getLogFormatDetectionGraph({ model, client }: LogDetection handleLogFormatDetection({ state, model }) ) .addNode('handleKVGraph', await getKVGraph({ model, client })) - // .addNode('handleUnstructuredGraph', (state: LogFormatDetectionState) => getCompiledUnstructuredGraph({state, model})) + .addNode('handleUnstructuredGraph', await getUnstructuredGraph({ model, client })) // .addNode('handleCsvGraph', (state: LogFormatDetectionState) => getCompiledCsvGraph({state, model})) .addEdge(START, 'modelInput') .addEdge('modelInput', 'handleLogFormatDetection') .addEdge('handleKVGraph', 'modelOutput') + .addEdge('handleUnstructuredGraph', 'modelOutput') .addEdge('modelOutput', END) .addConditionalEdges( 'handleLogFormatDetection', (state: LogFormatDetectionState) => logFormatRouter({ state }), { structured: 'handleKVGraph', - // unstructured: 'handleUnstructuredGraph', + unstructured: 'handleUnstructuredGraph', // csv: 'handleCsvGraph', unsupported: 'modelOutput', } diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/constants.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/constants.ts new file mode 100644 index 0000000000000..b0e36de9be85d --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/constants.ts @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const GROK_EXAMPLE_ANSWER = { + rfc: 'RFC2454', + regex: + '/(?:(d{4}[-]d{2}[-]d{2}[T]d{2}[:]d{2}[:]d{2}(?:.d{1,6})?(?:[+-]d{2}[:]d{2}|Z)?)|-)s(?:([w][wd.@-]*)|-)s(.*)$/', + grok_patterns: ['%{WORD:key1}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}'], +}; + +export const GROK_ERROR_EXAMPLE_ANSWER = { + grok_patterns: [ + '%{TIMESTAMP:timestamp}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}', + ], +}; + +export const onFailure = { + append: { + field: 'error.message', + value: + '{% raw %}Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} in pipeline {{{_ingest.on_failure_pipeline}}} failed with message: {{{_ingest.on_failure_message}}}{% endraw %}', + }, +}; diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/error.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/error.ts new file mode 100644 index 0000000000000..ec891d3aaa8f3 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/error.ts @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { JsonOutputParser } from '@langchain/core/output_parsers'; +import type { UnstructuredLogState } from '../../types'; +import type { HandleUnstructuredNodeParams } from './types'; +import { GROK_ERROR_PROMPT } from './prompts'; +import { GROK_ERROR_EXAMPLE_ANSWER } from './constants'; + +export async function handleUnstructuredError({ + state, + model, +}: HandleUnstructuredNodeParams): Promise> { + const outputParser = new JsonOutputParser(); + const grokErrorGraph = GROK_ERROR_PROMPT.pipe(model).pipe(outputParser); + const currentPatterns = state.grokPatterns; + + const pattern = await grokErrorGraph.invoke({ + current_pattern: JSON.stringify(currentPatterns, null, 2), + errors: JSON.stringify(state.errors, null, 2), + ex_answer: JSON.stringify(GROK_ERROR_EXAMPLE_ANSWER, null, 2), + }); + + return { + grokPatterns: pattern.grok_patterns, + lastExecutedChain: 'unstructured_error', + }; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.ts new file mode 100644 index 0000000000000..75f4e9c787575 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.ts @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { StateGraphArgs } from '@langchain/langgraph'; +import { StateGraph, END, START } from '@langchain/langgraph'; +import type { UnstructuredLogState } from '../../types'; +import { handleUnstructured } from './unstuctured'; +import type { UnstructuredGraphParams, UnstructuredBaseNodeParams } from './types'; +import { handleUnstructuredError } from './error'; +import { handleUnstructuredValidate } from './validate'; + +const graphState: StateGraphArgs['channels'] = { + lastExecutedChain: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + packageName: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + dataStreamName: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + logSamples: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, + grokPatterns: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, + jsonSamples: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, + finalized: { + value: (x: boolean, y?: boolean) => y ?? x, + default: () => false, + }, + errors: { + value: (x: object, y?: object) => y ?? x, + default: () => [], + }, + additionalProcessors: { + value: (x: object[], y?: object[]) => y ?? x, + default: () => [], + }, + ecsVersion: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, +}; + +function modelInput({ state }: UnstructuredBaseNodeParams): Partial { + return { + finalized: false, + lastExecutedChain: 'modelInput', + }; +} + +function modelOutput({ state }: UnstructuredBaseNodeParams): Partial { + return { + finalized: true, + additionalProcessors: state.additionalProcessors, + lastExecutedChain: 'modelOutput', + }; +} + +function validationRouter({ state }: UnstructuredBaseNodeParams): string { + if (Object.keys(state.errors).length === 0) { + return 'modelOutput'; + } + return 'handleUnstructuredError'; +} + +export async function getUnstructuredGraph({ model, client }: UnstructuredGraphParams) { + const workflow = new StateGraph({ + channels: graphState, + }) + .addNode('modelInput', (state: UnstructuredLogState) => modelInput({ state })) + .addNode('modelOutput', (state: UnstructuredLogState) => modelOutput({ state })) + .addNode('handleUnstructuredError', (state: UnstructuredLogState) => + handleUnstructuredError({ state, model, client }) + ) + .addNode('handleUnstructured', (state: UnstructuredLogState) => + handleUnstructured({ state, model, client }) + ) + .addNode('handleUnstructuredValidate', (state: UnstructuredLogState) => + handleUnstructuredValidate({ state, model, client }) + ) + .addEdge(START, 'modelInput') + .addEdge('modelInput', 'handleUnstructured') + .addEdge('handleUnstructured', 'handleUnstructuredValidate') + .addConditionalEdges( + 'handleUnstructuredValidate', + (state: UnstructuredLogState) => validationRouter({ state }), + { + handleUnstructuredError: 'handleUnstructuredError', + modelOutput: 'modelOutput', + } + ) + .addEdge('handleUnstructuredError', 'handleUnstructuredValidate') + .addEdge('modelOutput', END); + + const compiledUnstructuredGraph = workflow.compile(); + return compiledUnstructuredGraph; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/index.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/index.ts new file mode 100644 index 0000000000000..8fa7bb99744ed --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/index.ts @@ -0,0 +1,7 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +export { getUnstructuredGraph } from './graph'; diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/prompts.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/prompts.ts new file mode 100644 index 0000000000000..5cf5c67135d53 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/prompts.ts @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { ChatPromptTemplate } from '@langchain/core/prompts'; + +export const GROK_MAIN_PROMPT = ChatPromptTemplate.fromMessages([ + [ + 'system', + `You are an expert in Syslogs and identifying the headers and structured body in syslog messages. Here is some context for you to reference for your task, read it carefully as you will get questions about it later: + + + {samples} + + `, + ], + [ + 'human', + `Looking at the multiple syslog samples provided in the context, You are tasked with identifying the appropriate regex and Grok pattern for a set of syslog samples. + Your goal is to accurately extract key components such as timestamps, hostnames, priority levels, process names, events, VLAN information, MAC addresses, IP addresses, STP roles, port statuses, messages and more. + + Follow these steps to help improve the grok patterns and apply it step by step: + 1. Familiarize yourself with various syslog message formats. + 2. PRI (Priority Level): Encoded in angle brackets, e.g., <134>, indicating the facility and severity. + 3. Timestamp: Use \`SYSLOGTIMESTAMP\` for RFC 3164 timestamps (e.g., Aug 10 16:34:02). Use \`TIMESTAMP_ISO8601\` for ISO 8601 (RFC 5424) timestamps. For epoch time, use \`NUMBER\`. + 4. If the timestamp could not be categorized into a predefined format, extract the date time fields separately and combine them with the format identified in the grok pattern. + 5. Make sure to identify the timezone component in the timestamp. + 6. Hostname/IP Address: The system or device that generated the message, which could be an IP address or fully qualified domain name + 7. Process Name and PID: Often included with brackets, such as sshd[1234]. + 8. VLAN information: Usually in the format of VLAN: 1234. + 9. MAC Address: The network interface MAC address. + 10. Port number: The port number on the device. + 11. Look for status codes ,interface ,log type, source ,User action, destination, protocol, etc. + 12. message: This is the free-form message text that varies widely across log entries. + + + You ALWAYS follow these guidelines when writing your response: + + - Make sure to map the remaining message part to \'message\' in grok pattern. + - Do not respond with anything except the processor as a JSON object enclosed with 3 backticks (\`), see example response above. Use strict JSON response format. + + + You are required to provide the output in the following example response format: + + + A: Please find the JSON object below: + \`\`\`json + {ex_answer} + \`\`\` + `, + ], + ['ai', 'Please find the JSON object below:'], +]); + +export const GROK_ERROR_PROMPT = ChatPromptTemplate.fromMessages([ + [ + 'system', + `You are an expert in Syslogs and identifying the headers and structured body in syslog messages. Here is some context for you to reference for your task, read it carefully as you will get questions about it later: + + +{current_pattern} + +`, + ], + [ + 'human', + `Please go through each error below, carefully review the provided current grok pattern, and resolve the most likely cause to the supplied error by returning an updated version of the current_pattern. + + +{errors} + + +Follow these steps to help improve the grok patterns and apply it step by step: + 1. Familiarize yourself with various syslog message formats. + 2. PRI (Priority Level): Encoded in angle brackets, e.g., <134>, indicating the facility and severity. + 3. Timestamp: Use \`SYSLOGTIMESTAMP\` for RFC 3164 timestamps (e.g., Aug 10 16:34:02). Use \`TIMESTAMP_ISO8601\` for ISO 8601 (RFC 5424) timestamps. For epoch time, use \`NUMBER\`. + 4. If the timestamp could not be categorized into a predefined format, extract the date time fields separately and combine them with the format identified in the grok pattern. + 5. Make sure to identify the timezone component in the timestamp. + 6. Hostname/IP Address: The system or device that generated the message, which could be an IP address or fully qualified domain name + 7. Process Name and PID: Often included with brackets, such as sshd[1234]. + 8. VLAN information: Usually in the format of VLAN: 1234. + 9. MAC Address: The network interface MAC address. + 10. Port number: The port number on the device. + 11. Look for status codes ,interface ,log type, source ,User action, destination, protocol, etc. + 12. message: This is the free-form message text that varies widely across log entries. + + You ALWAYS follow these guidelines when writing your response: + + - Make sure to map the remaining message part to \'message\' in grok pattern. + - Do not respond with anything except the processor as a JSON object enclosed with 3 backticks (\`), see example response above. Use strict JSON response format. + + + You are required to provide the output in the following example response format: + + + A: Please find the JSON object below: + \`\`\`json + {ex_answer} + \`\`\` + `, + ], + ['ai', 'Please find the JSON object below:'], +]); diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/types.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/types.ts new file mode 100644 index 0000000000000..218d3856cb661 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/types.ts @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import type { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; +import type { UnstructuredLogState, ChatModels } from '../../types'; + +export interface UnstructuredBaseNodeParams { + state: UnstructuredLogState; +} + +export interface UnstructuredNodeParams extends UnstructuredBaseNodeParams { + model: ChatModels; +} + +export interface UnstructuredGraphParams { + client: IScopedClusterClient; + model: ChatModels; +} + +export interface HandleUnstructuredNodeParams extends UnstructuredNodeParams { + client: IScopedClusterClient; +} + +export interface GrokResult { + [key: string]: unknown; + grok_patterns: string[]; + message: string; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/unstuctured.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/unstuctured.ts new file mode 100644 index 0000000000000..42186e796275f --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/unstuctured.ts @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { JsonOutputParser } from '@langchain/core/output_parsers'; +import type { UnstructuredLogState } from '../../types'; +import { GROK_MAIN_PROMPT } from './prompts'; +import { GrokResult, HandleUnstructuredNodeParams } from './types'; +import { GROK_EXAMPLE_ANSWER } from './constants'; + +export async function handleUnstructured({ + state, + model, + client, +}: HandleUnstructuredNodeParams): Promise> { + const grokMainGraph = GROK_MAIN_PROMPT.pipe(model).pipe(new JsonOutputParser()); + + // Pick logSamples if there was no header detected. + const samples = state.logSamples; + + const pattern = (await grokMainGraph.invoke({ + samples: samples[0], + ex_answer: JSON.stringify(GROK_EXAMPLE_ANSWER, null, 2), + })) as GrokResult; + + return { + grokPatterns: pattern.grok_patterns, + lastExecutedChain: 'handleUnstructured', + }; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.ts new file mode 100644 index 0000000000000..3e2d77ef71d95 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.ts @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { UnstructuredLogState } from '../../types'; +import type { GrokResult, HandleUnstructuredNodeParams } from './types'; +import { testPipeline } from '../../util'; +import { onFailure } from './constants'; +import { createGrokProcessor } from '../../util/processors'; + +export async function handleUnstructuredValidate({ + state, + client, +}: HandleUnstructuredNodeParams): Promise> { + const grokPatterns = state.grokPatterns; + const grokProcessor = createGrokProcessor(grokPatterns); + const pipeline = { processors: grokProcessor, on_failure: [onFailure] }; + + const { pipelineResults, errors } = (await testPipeline(state.logSamples, pipeline, client)) as { + pipelineResults: GrokResult[]; + errors: object[]; + }; + + if (errors.length > 0) { + return { errors, lastExecutedChain: 'unstructured_validate' }; + } + + const jsonSamples: string[] = pipelineResults.map((entry) => JSON.stringify(entry)); + const additionalProcessors = state.additionalProcessors; + additionalProcessors.push(grokProcessor[0]); + + return { + jsonSamples, + additionalProcessors, + errors: [], + lastExecutedChain: 'unstructured_validate', + }; +} diff --git a/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts b/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts index c0a81193a465b..29a68c4395a7c 100644 --- a/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts +++ b/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts @@ -81,7 +81,7 @@ export function registerAnalyzeLogsRoutes( const graph = await getLogFormatDetectionGraph({ model, client }); const graphResults = await graph.invoke(logFormatParameters, options); const graphLogFormat = graphResults.results.samplesFormat.name; - if (graphLogFormat === 'unsupported') { + if (graphLogFormat === 'unsupported' || graphLogFormat === 'csv') { return res.customError({ statusCode: 501, body: { message: `Unsupported log samples format` }, diff --git a/x-pack/plugins/integration_assistant/server/templates/processors/grok.yml.njk b/x-pack/plugins/integration_assistant/server/templates/processors/grok.yml.njk index 53ce913df0515..9b0456b134e34 100644 --- a/x-pack/plugins/integration_assistant/server/templates/processors/grok.yml.njk +++ b/x-pack/plugins/integration_assistant/server/templates/processors/grok.yml.njk @@ -1,5 +1,6 @@ - grok: field: message patterns: - - '{{ grokPattern }}' + {% for grokPattern in grokPatterns %} + - '{{ grokPattern }}'{% endfor %} tag: 'grok_header_pattern' diff --git a/x-pack/plugins/integration_assistant/server/types.ts b/x-pack/plugins/integration_assistant/server/types.ts index 0fb68b4e04572..454370a02c366 100644 --- a/x-pack/plugins/integration_assistant/server/types.ts +++ b/x-pack/plugins/integration_assistant/server/types.ts @@ -122,6 +122,19 @@ export interface KVState { ecsVersion: string; } +export interface UnstructuredLogState { + lastExecutedChain: string; + packageName: string; + dataStreamName: string; + grokPatterns: string[]; + logSamples: string[]; + jsonSamples: string[]; + finalized: boolean; + errors: object; + additionalProcessors: object[]; + ecsVersion: string; +} + export interface RelatedState { rawSamples: string[]; samples: string[]; diff --git a/x-pack/plugins/integration_assistant/server/util/processors.ts b/x-pack/plugins/integration_assistant/server/util/processors.ts index 12200f9d32db9..b2e6b1683482a 100644 --- a/x-pack/plugins/integration_assistant/server/util/processors.ts +++ b/x-pack/plugins/integration_assistant/server/util/processors.ts @@ -50,13 +50,13 @@ function createAppendProcessors(processors: SimplifiedProcessors): ESProcessorIt // The kv graph returns a simplified grok processor for header // This function takes in the grok pattern string and creates the grok processor -export function createGrokProcessor(grokPattern: string): ESProcessorItem { +export function createGrokProcessor(grokPatterns: string[]): ESProcessorItem { const templatesPath = joinPath(__dirname, '../templates/processors'); const env = new Environment(new FileSystemLoader(templatesPath), { autoescape: false, }); const template = env.getTemplate('grok.yml.njk'); - const renderedTemplate = template.render({ grokPattern }); + const renderedTemplate = template.render({ grokPatterns }); const grokProcessor = safeLoad(renderedTemplate) as ESProcessorItem; return grokProcessor; } From b5ac402ecf0a03ff086367aca3d0f56c3f1bba18 Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Mon, 16 Sep 2024 10:30:21 +0200 Subject: [PATCH 2/4] add tests --- .../__jest__/fixtures/unstructured.ts | 30 ++++++++ .../server/graphs/unstructured/errors.test.ts | 40 +++++++++++ .../server/graphs/unstructured/graph.test.ts | 39 ++++++++++ .../server/graphs/unstructured/graph.ts | 2 +- .../graphs/unstructured/unstructured.test.ts | 40 +++++++++++ .../{unstuctured.ts => unstructured.ts} | 0 .../graphs/unstructured/validate.test.ts | 71 +++++++++++++++++++ 7 files changed, 221 insertions(+), 1 deletion(-) create mode 100644 x-pack/plugins/integration_assistant/__jest__/fixtures/unstructured.ts create mode 100644 x-pack/plugins/integration_assistant/server/graphs/unstructured/errors.test.ts create mode 100644 x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.test.ts create mode 100644 x-pack/plugins/integration_assistant/server/graphs/unstructured/unstructured.test.ts rename x-pack/plugins/integration_assistant/server/graphs/unstructured/{unstuctured.ts => unstructured.ts} (100%) create mode 100644 x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.test.ts diff --git a/x-pack/plugins/integration_assistant/__jest__/fixtures/unstructured.ts b/x-pack/plugins/integration_assistant/__jest__/fixtures/unstructured.ts new file mode 100644 index 0000000000000..32c32350c13de --- /dev/null +++ b/x-pack/plugins/integration_assistant/__jest__/fixtures/unstructured.ts @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const unstructuredLogState = { + lastExecutedChain: 'testchain', + packageName: 'testPackage', + dataStreamName: 'testDatastream', + grokPatterns: ['%{GREEDYDATA:message}'], + logSamples: ['dummy data'], + jsonSamples: ['{"message":"dummy data"}'], + finalized: false, + ecsVersion: 'testVersion', + errors: { test: 'testerror' }, + additionalProcessors: [], +}; + +export const unstructuredLogResponse = { + grok_patterns: [ + '####<%{MONTH} %{MONTHDAY}, %{YEAR} %{TIME} (?:AM|PM) %{WORD:timezone}> <%{WORD:log_level}> <%{WORD:component}> <%{DATA:hostname}> <%{DATA:server_name}> <%{DATA:thread_info}> <%{DATA:user}> <%{DATA:empty_field}> <%{DATA:empty_field2}> <%{NUMBER:timestamp}> <%{DATA:message_id}> <%{GREEDYDATA:message}>', + ], +}; + +export const testPipelineValidResult: { pipelineResults: object[]; errors: object[] } = { + pipelineResults: [{ key: 'value', anotherKey: 'anotherValue' }], + errors: [], +}; diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/errors.test.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/errors.test.ts new file mode 100644 index 0000000000000..21a6cd5a5e3f9 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/errors.test.ts @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { FakeLLM } from '@langchain/core/utils/testing'; +import { handleUnstructuredError } from './error'; +import type { UnstructuredLogState } from '../../types'; +import { + unstructuredLogState, + unstructuredLogResponse, +} from '../../../__jest__/fixtures/unstructured'; +import { + ActionsClientChatOpenAI, + ActionsClientSimpleChatModel, +} from '@kbn/langchain/server/language_models'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; + +const model = new FakeLLM({ + response: JSON.stringify(unstructuredLogResponse, null, 2), +}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel; + +const state: UnstructuredLogState = unstructuredLogState; + +describe('Testing unstructured error handling node', () => { + const client = { + asCurrentUser: { + ingest: { + simulate: jest.fn(), + }, + }, + } as unknown as IScopedClusterClient; + it('handleUnstructuredError()', async () => { + const response = await handleUnstructuredError({ state, model, client }); + expect(response.grokPatterns).toStrictEqual(unstructuredLogResponse.grok_patterns); + expect(response.lastExecutedChain).toBe('unstructured_error'); + }); +}); diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.test.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.test.ts new file mode 100644 index 0000000000000..60a9bdc4329de --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.test.ts @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + ActionsClientChatOpenAI, + ActionsClientSimpleChatModel, +} from '@kbn/langchain/server/language_models'; +import { FakeLLM } from '@langchain/core/utils/testing'; +import { getUnstructuredGraph } from './graph'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; + +const model = new FakeLLM({ + response: '{"log_type": "structured"}', +}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel; + +describe('UnstructuredGraph', () => { + const client = { + asCurrentUser: { + ingest: { + simulate: jest.fn(), + }, + }, + } as unknown as IScopedClusterClient; + describe('Compiling and Running', () => { + it('Ensures that the graph compiles', async () => { + // When getUnstructuredGraph runs, langgraph compiles the graph it will error if the graph has any issues. + // Common issues for example detecting a node has no next step, or there is a infinite loop between them. + try { + await getUnstructuredGraph({ model, client }); + } catch (error) { + fail(`getUnstructuredGraph threw an error: ${error}`); + } + }); + }); +}); diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.ts index 75f4e9c787575..6048404728bfb 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.ts @@ -8,7 +8,7 @@ import type { StateGraphArgs } from '@langchain/langgraph'; import { StateGraph, END, START } from '@langchain/langgraph'; import type { UnstructuredLogState } from '../../types'; -import { handleUnstructured } from './unstuctured'; +import { handleUnstructured } from './unstructured'; import type { UnstructuredGraphParams, UnstructuredBaseNodeParams } from './types'; import { handleUnstructuredError } from './error'; import { handleUnstructuredValidate } from './validate'; diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/unstructured.test.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/unstructured.test.ts new file mode 100644 index 0000000000000..11d7107be13c0 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/unstructured.test.ts @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { FakeLLM } from '@langchain/core/utils/testing'; +import { handleUnstructured } from './unstructured'; +import type { UnstructuredLogState } from '../../types'; +import { + unstructuredLogState, + unstructuredLogResponse, +} from '../../../__jest__/fixtures/unstructured'; +import { + ActionsClientChatOpenAI, + ActionsClientSimpleChatModel, +} from '@kbn/langchain/server/language_models'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; + +const model = new FakeLLM({ + response: JSON.stringify(unstructuredLogResponse, null, 2), +}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel; + +const state: UnstructuredLogState = unstructuredLogState; + +describe('Testing unstructured log handling node', () => { + const client = { + asCurrentUser: { + ingest: { + simulate: jest.fn(), + }, + }, + } as unknown as IScopedClusterClient; + it('handleUnstructured()', async () => { + const response = await handleUnstructured({ state, model, client }); + expect(response.grokPatterns).toStrictEqual(unstructuredLogResponse.grok_patterns); + expect(response.lastExecutedChain).toBe('handleUnstructured'); + }); +}); diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/unstuctured.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/unstructured.ts similarity index 100% rename from x-pack/plugins/integration_assistant/server/graphs/unstructured/unstuctured.ts rename to x-pack/plugins/integration_assistant/server/graphs/unstructured/unstructured.ts diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.test.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.test.ts new file mode 100644 index 0000000000000..2794f6e69f9ac --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.test.ts @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { FakeLLM } from '@langchain/core/utils/testing'; +import { handleUnstructuredValidate } from './validate'; +import type { UnstructuredLogState } from '../../types'; +import { + unstructuredLogState, + unstructuredLogResponse, +} from '../../../__jest__/fixtures/unstructured'; +import { + ActionsClientChatOpenAI, + ActionsClientSimpleChatModel, +} from '@kbn/langchain/server/language_models'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; + +const model = new FakeLLM({ + response: JSON.stringify(unstructuredLogResponse, null, 2), +}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel; + +const state: UnstructuredLogState = unstructuredLogState; + +describe('Testing unstructured validation without errors', () => { + const client = { + asCurrentUser: { + ingest: { + simulate: jest + .fn() + .mockReturnValue({ docs: [{ doc: { _source: { message: 'dummy data' } } }] }), + }, + }, + } as unknown as IScopedClusterClient; + + it('handleUnstructuredValidate() without errors', async () => { + const response = await handleUnstructuredValidate({ state, model, client }); + expect(response.jsonSamples).toStrictEqual(unstructuredLogState.jsonSamples); + expect(response.additionalProcessors).toStrictEqual([ + { + grok: { + field: 'message', + patterns: unstructuredLogState.grokPatterns, + tag: 'grok_header_pattern', + }, + }, + ]); + expect(response.errors).toStrictEqual([]); + expect(response.lastExecutedChain).toBe('unstructured_validate'); + }); +}); + +describe('Testing unstructured validation errors', () => { + const client = { + asCurrentUser: { + ingest: { + simulate: jest + .fn() + .mockReturnValue({ docs: [{ doc: { _source: { error: 'some error' } } }] }), + }, + }, + } as unknown as IScopedClusterClient; + + it('handleUnstructuredValidate() errors', async () => { + const response = await handleUnstructuredValidate({ state, model, client }); + expect(response.errors).toStrictEqual(['some error']); + expect(response.lastExecutedChain).toBe('unstructured_validate'); + }); +}); From 6766b54be176c66197740c5aaf02834664af1f5b Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Mon, 16 Sep 2024 10:37:38 +0200 Subject: [PATCH 3/4] remove unnecessary const --- .../integration_assistant/__jest__/fixtures/unstructured.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/x-pack/plugins/integration_assistant/__jest__/fixtures/unstructured.ts b/x-pack/plugins/integration_assistant/__jest__/fixtures/unstructured.ts index 32c32350c13de..113ef4d37c073 100644 --- a/x-pack/plugins/integration_assistant/__jest__/fixtures/unstructured.ts +++ b/x-pack/plugins/integration_assistant/__jest__/fixtures/unstructured.ts @@ -23,8 +23,3 @@ export const unstructuredLogResponse = { '####<%{MONTH} %{MONTHDAY}, %{YEAR} %{TIME} (?:AM|PM) %{WORD:timezone}> <%{WORD:log_level}> <%{WORD:component}> <%{DATA:hostname}> <%{DATA:server_name}> <%{DATA:thread_info}> <%{DATA:user}> <%{DATA:empty_field}> <%{DATA:empty_field2}> <%{NUMBER:timestamp}> <%{DATA:message_id}> <%{GREEDYDATA:message}>', ], }; - -export const testPipelineValidResult: { pipelineResults: object[]; errors: object[] } = { - pipelineResults: [{ key: 'value', anotherKey: 'anotherValue' }], - errors: [], -}; From a4dce802c2e17c4632f339d20308a228d464860c Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Mon, 16 Sep 2024 13:17:51 +0200 Subject: [PATCH 4/4] change to camelcase --- .../plugins/integration_assistant/server/graphs/kv/error.ts | 2 +- .../integration_assistant/server/graphs/kv/header.test.ts | 2 +- .../plugins/integration_assistant/server/graphs/kv/header.ts | 2 +- .../integration_assistant/server/graphs/kv/validate.ts | 4 ++-- .../integration_assistant/server/graphs/unstructured/error.ts | 2 +- .../server/graphs/unstructured/errors.test.ts | 2 +- .../server/graphs/unstructured/validate.test.ts | 4 ++-- .../server/graphs/unstructured/validate.ts | 4 ++-- 8 files changed, 11 insertions(+), 11 deletions(-) diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/error.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/error.ts index b6d64ee4f615d..b1b7c12a68d5a 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/kv/error.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/error.ts @@ -33,7 +33,7 @@ export async function handleKVError({ return { kvProcessor, - lastExecutedChain: 'kv_error', + lastExecutedChain: 'kvError', }; } diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/header.test.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/header.test.ts index 7991484024713..353384361d2da 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/kv/header.test.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/header.test.ts @@ -43,6 +43,6 @@ describe('Testing kv header', () => { expect(response.grokPattern).toStrictEqual( '<%{NUMBER:priority}>%{NUMBER:version} %{GREEDYDATA:message}' ); - expect(response.lastExecutedChain).toBe('kv_header'); + expect(response.lastExecutedChain).toBe('kvHeader'); }); }); diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/header.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/header.ts index 473eae1516112..36d8968ab9e67 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/kv/header.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/header.ts @@ -26,6 +26,6 @@ export async function handleHeader({ return { grokPattern: pattern.grok_pattern, - lastExecutedChain: 'kv_header', + lastExecutedChain: 'kvHeader', }; } diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts index 6b22cf399a906..b0601de74aa5e 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts @@ -41,7 +41,7 @@ export async function handleKVValidate({ )) as { pipelineResults: KVResult[]; errors: object[] }; if (errors.length > 0) { - return { errors, lastExecutedChain: 'kv_validate' }; + return { errors, lastExecutedChain: 'kvValidate' }; } // Converts JSON Object into a string and parses it as a array of JSON strings @@ -56,7 +56,7 @@ export async function handleKVValidate({ jsonSamples, additionalProcessors, errors: [], - lastExecutedChain: 'kv_validate', + lastExecutedChain: 'kvValidate', }; } diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/error.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/error.ts index ec891d3aaa8f3..d002dd19d5439 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/unstructured/error.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/error.ts @@ -27,6 +27,6 @@ export async function handleUnstructuredError({ return { grokPatterns: pattern.grok_patterns, - lastExecutedChain: 'unstructured_error', + lastExecutedChain: 'unstructuredError', }; } diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/errors.test.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/errors.test.ts index 21a6cd5a5e3f9..212b4b6255be2 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/unstructured/errors.test.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/errors.test.ts @@ -35,6 +35,6 @@ describe('Testing unstructured error handling node', () => { it('handleUnstructuredError()', async () => { const response = await handleUnstructuredError({ state, model, client }); expect(response.grokPatterns).toStrictEqual(unstructuredLogResponse.grok_patterns); - expect(response.lastExecutedChain).toBe('unstructured_error'); + expect(response.lastExecutedChain).toBe('unstructuredError'); }); }); diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.test.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.test.ts index 2794f6e69f9ac..493834e3220f9 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.test.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.test.ts @@ -48,7 +48,7 @@ describe('Testing unstructured validation without errors', () => { }, ]); expect(response.errors).toStrictEqual([]); - expect(response.lastExecutedChain).toBe('unstructured_validate'); + expect(response.lastExecutedChain).toBe('unstructuredValidate'); }); }); @@ -66,6 +66,6 @@ describe('Testing unstructured validation errors', () => { it('handleUnstructuredValidate() errors', async () => { const response = await handleUnstructuredValidate({ state, model, client }); expect(response.errors).toStrictEqual(['some error']); - expect(response.lastExecutedChain).toBe('unstructured_validate'); + expect(response.lastExecutedChain).toBe('unstructuredValidate'); }); }); diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.ts index 3e2d77ef71d95..043e38be0983f 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.ts @@ -25,7 +25,7 @@ export async function handleUnstructuredValidate({ }; if (errors.length > 0) { - return { errors, lastExecutedChain: 'unstructured_validate' }; + return { errors, lastExecutedChain: 'unstructuredValidate' }; } const jsonSamples: string[] = pipelineResults.map((entry) => JSON.stringify(entry)); @@ -36,6 +36,6 @@ export async function handleUnstructuredValidate({ jsonSamples, additionalProcessors, errors: [], - lastExecutedChain: 'unstructured_validate', + lastExecutedChain: 'unstructuredValidate', }; }