From f4a5fca95c5e5641c05053ef05f07f241b0ab9c6 Mon Sep 17 00:00:00 2001 From: Nicolas Chaulet Date: Wed, 26 Aug 2020 13:40:13 -0400 Subject: [PATCH 1/9] [Ingest Manager] Shared policy action --- .../common/types/models/agent.ts | 10 ++- .../server/saved_objects/index.ts | 3 + .../server/services/agent_policy.ts | 4 + .../server/services/agent_policy_update.ts | 27 +++++- .../server/services/agents/acks.ts | 57 ++++++------ .../server/services/agents/actions.ts | 18 ++++ .../agents/checkin/state_new_actions.ts | 87 ++++++++----------- .../server/services/agents/saved_objects.ts | 1 + 8 files changed, 122 insertions(+), 85 deletions(-) diff --git a/x-pack/plugins/ingest_manager/common/types/models/agent.ts b/x-pack/plugins/ingest_manager/common/types/models/agent.ts index 2b8a306577e7d..96e407d8c09f4 100644 --- a/x-pack/plugins/ingest_manager/common/types/models/agent.ts +++ b/x-pack/plugins/ingest_manager/common/types/models/agent.ts @@ -30,8 +30,11 @@ export interface NewAgentAction { export interface AgentAction extends NewAgentAction { id: string; - agent_id: string; + agent_id?: string; + policy_id?: string; + policy_revision?: number; created_at: string; + ack_data?: any; } export interface AgentActionSOAttributes { @@ -39,8 +42,11 @@ export interface AgentActionSOAttributes { sent_at?: string; timestamp?: string; created_at: string; - agent_id: string; + agent_id?: string; + policy_id?: string; + policy_revision?: number; data?: string; + ack_data?: string; } export interface NewAgentEvent { diff --git a/x-pack/plugins/ingest_manager/server/saved_objects/index.ts b/x-pack/plugins/ingest_manager/server/saved_objects/index.ts index 1bbe3b71bf919..cf03b2c63648a 100644 --- a/x-pack/plugins/ingest_manager/server/saved_objects/index.ts +++ b/x-pack/plugins/ingest_manager/server/saved_objects/index.ts @@ -94,8 +94,11 @@ const savedObjectTypes: { [key: string]: SavedObjectsType } = { mappings: { properties: { agent_id: { type: 'keyword' }, + policy_id: { type: 'keyword' }, + policy_revision: { type: 'integer' }, type: { type: 'keyword' }, data: { type: 'binary' }, + ack_data: { type: 'text' }, sent_at: { type: 'date' }, created_at: { type: 'date' }, }, diff --git a/x-pack/plugins/ingest_manager/server/services/agent_policy.ts b/x-pack/plugins/ingest_manager/server/services/agent_policy.ts index 21bc7b021e83a..ea8505a0bafe6 100644 --- a/x-pack/plugins/ingest_manager/server/services/agent_policy.ts +++ b/x-pack/plugins/ingest_manager/server/services/agent_policy.ts @@ -66,6 +66,10 @@ class AgentPolicyService { updated_by: user ? user.username : 'system', }); + if (options.bumpRevision) { + await this.triggerAgentPolicyUpdatedEvent(soClient, 'updated', id); + } + return (await this.get(soClient, id)) as AgentPolicy; } diff --git a/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts b/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts index 3c743dd957f62..43a9ca7074e5f 100644 --- a/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts +++ b/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts @@ -6,8 +6,9 @@ import { SavedObjectsClientContract } from 'src/core/server'; import { generateEnrollmentAPIKey, deleteEnrollmentApiKeyForAgentPolicyId } from './api_keys'; -import { unenrollForAgentPolicyId } from './agents'; +import { unenrollForAgentPolicyId, createAgentAction } from './agents'; import { outputService } from './output'; +import { agentPolicyService } from './agent_policy'; export async function agentPolicyUpdateEventHandler( soClient: SavedObjectsClientContract, @@ -26,6 +27,30 @@ export async function agentPolicyUpdateEventHandler( }); } + if (action === 'updated') { + const policy = await agentPolicyService.getFullAgentPolicy(soClient, agentPolicyId); + if (!policy) { + return; + } + const packages = policy.inputs.reduce((acc, input) => { + const packageName = input.meta?.package?.name; + if (packageName && acc.indexOf(packageName) < 0) { + return [packageName, ...acc]; + } + return acc; + }, []); + + await createAgentAction(soClient, { + type: 'CONFIG_CHANGE', + data: { config: policy } as any, + ack_data: { packages }, + created_at: new Date().toISOString(), + sent_at: undefined, + policy_id: policy.id, + policy_revision: policy.revision, + }); + } + if (action === 'deleted') { await unenrollForAgentPolicyId(soClient, agentPolicyId); await deleteEnrollmentApiKeyForAgentPolicyId(soClient, agentPolicyId); diff --git a/x-pack/plugins/ingest_manager/server/services/agents/acks.ts b/x-pack/plugins/ingest_manager/server/services/agents/acks.ts index 87572ce405ee7..2ae6ccb9c94a0 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/acks.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/acks.ts @@ -18,7 +18,6 @@ import { AgentEventSOAttributes, AgentSOAttributes, AgentActionSOAttributes, - FullAgentPolicy, } from '../../types'; import { AGENT_EVENT_SAVED_OBJECT_TYPE, @@ -55,8 +54,12 @@ export async function acknowledgeAgentActions( throw error; } + const agentActionsIds = []; for (const action of actions) { - if (action.agent_id !== agent.id) { + if (action.agent_id) { + agentActionsIds.push(action); + } + if (action.agent_id && action.agent_id !== agent.id) { throw Boom.badRequest(`${action.id} not found`); } } @@ -70,50 +73,46 @@ export async function acknowledgeAgentActions( await forceUnenrollAgent(soClient, agent.id); } - const agentPolicy = getLatestAgentPolicyIfUpdated(agent, actions); + const configChangeAction = getLatestConfigChangePolicyActionIfUpdated(agent, actions); await soClient.bulkUpdate([ - ...(agentPolicy ? [buildUpdateAgentPolicy(agent.id, agentPolicy)] : []), + ...(configChangeAction + ? [ + { + type: AGENT_SAVED_OBJECT_TYPE, + id: agent.id, + attributes: { + policy_revision: configChangeAction.policy_revision, + packages: configChangeAction?.ack_data?.packages, + }, + }, + ] + : []), ...buildUpdateAgentActionSentAt(actionIds), ]); return actions; } -function getLatestAgentPolicyIfUpdated(agent: Agent, actions: AgentAction[]) { - return actions.reduce((acc, action) => { +function getLatestConfigChangePolicyActionIfUpdated(agent: Agent, actions: AgentAction[]) { + return actions.reduce((acc, action) => { if (action.type !== 'CONFIG_CHANGE') { return acc; } - const data = action.data || {}; - - if (data?.config?.id !== agent.policy_id) { + if (action.policy_id !== agent.policy_id) { return acc; } - const currentRevision = (acc && acc.revision) || agent.policy_revision || 0; - - return data?.config?.revision > currentRevision ? data?.config : acc; - }, null); -} + if (!acc) { + return action; + } -function buildUpdateAgentPolicy(agentId: string, agentPolicy: FullAgentPolicy) { - const packages = agentPolicy.inputs.reduce((acc, input) => { - const packageName = input.meta?.package?.name; - if (packageName && acc.indexOf(packageName) < 0) { - return [packageName, ...acc]; + if (action.policy_revision && action.policy_revision > (acc.policy_revision || 0)) { + return action; } - return acc; - }, []); - return { - type: AGENT_SAVED_OBJECT_TYPE, - id: agentId, - attributes: { - policy_revision: agentPolicy.revision, - packages, - }, - }; + return acc; + }, null); } function buildUpdateAgentActionSentAt( diff --git a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts index 8d1b320c89ae6..7be9ef589e73c 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts @@ -17,6 +17,7 @@ export async function createAgentAction( const so = await soClient.create(AGENT_ACTION_SAVED_OBJECT_TYPE, { ...newAgentAction, data: newAgentAction.data ? JSON.stringify(newAgentAction.data) : undefined, + ack_data: newAgentAction.ack_data ? JSON.stringify(newAgentAction.ack_data) : undefined, }); const agentAction = savedObjectToAgentAction(so); @@ -86,6 +87,23 @@ export async function getNewActionsSince(soClient: SavedObjectsClientContract, t return res.saved_objects.map(savedObjectToAgentAction); } +export async function getLatestConfigChangeAction( + soClient: SavedObjectsClientContract, + policyId: string +) { + const res = await soClient.find({ + type: AGENT_ACTION_SAVED_OBJECT_TYPE, + search: policyId, + searchFields: ['policy_id'], + sortField: 'created_at', + sortOrder: 'DESC', + }); + + if (res.saved_objects[0]) { + return savedObjectToAgentAction(res.saved_objects[0]); + } +} + export interface ActionsService { getAgent: (soClient: SavedObjectsClientContract, agentId: string) => Promise; diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts index e10d013fe84a9..3c508a6277ed3 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts @@ -16,14 +16,7 @@ import { take, } from 'rxjs/operators'; import { SavedObjectsClientContract, KibanaRequest } from 'src/core/server'; -import { - Agent, - AgentAction, - AgentSOAttributes, - AgentPolicy, - FullAgentPolicy, -} from '../../../types'; -import { agentPolicyService } from '../../agent_policy'; +import { Agent, AgentAction, AgentSOAttributes } from '../../../types'; import * as APIKeysService from '../../api_keys'; import { AGENT_SAVED_OBJECT_TYPE, @@ -31,7 +24,7 @@ import { AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS, AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL, } from '../../../constants'; -import { createAgentAction, getNewActionsSince } from '../actions'; +import { getNewActionsSince, getLatestConfigChangeAction, getAgentActionByIds } from '../actions'; import { appContextService } from '../../app_context'; import { toPromiseAbortable, AbortError, createRateLimiter } from './rxjs_utils'; @@ -54,20 +47,6 @@ function getInternalUserSOClient() { return appContextService.getInternalUserSOClient(fakeRequest); } -function createAgentPolicySharedObservable(agentPolicyId: string) { - const internalSOClient = getInternalUserSOClient(); - return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe( - switchMap(() => - from(agentPolicyService.get(internalSOClient, agentPolicyId) as Promise) - ), - distinctUntilKeyChanged('revision'), - switchMap((data) => - from(agentPolicyService.getFullAgentPolicy(internalSOClient, agentPolicyId)) - ), - shareReplay({ refCount: true, bufferSize: 1 }) - ); -} - function createNewActionsSharedObservable(): Observable { return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe( switchMap(() => { @@ -79,6 +58,17 @@ function createNewActionsSharedObservable(): Observable { ); } +function createAgentPolicyActionSharedObservable(agentPolicyId: string) { + const internalSOClient = getInternalUserSOClient(); + return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe( + switchMap(() => from(getLatestConfigChangeAction(internalSOClient, agentPolicyId))), + filter((data): data is AgentAction => data !== undefined), + distinctUntilKeyChanged('id'), + switchMap((data) => from(getAgentActionByIds(internalSOClient, [data.id]).then((r) => r[0]))), + shareReplay({ refCount: true, bufferSize: 1 }) + ); +} + async function getOrCreateAgentDefaultOutputAPIKey( soClient: SavedObjectsClientContract, agent: Agent @@ -102,47 +92,31 @@ async function getOrCreateAgentDefaultOutputAPIKey( return outputAPIKey.key; } -function shouldCreateAgentPolicyAction(agent: Agent, agentPolicy: FullAgentPolicy | null): boolean { - if (!agentPolicy || !agentPolicy.revision) { - return false; - } - const isAgentPolicyOutdated = - !agent.policy_revision || agent.policy_revision < agentPolicy.revision; - if (!isAgentPolicyOutdated) { - return false; - } - - return true; -} - -async function createAgentActionFromAgentPolicy( +async function createAgentActionFromPolicyAction( soClient: SavedObjectsClientContract, agent: Agent, - policy: FullAgentPolicy | null + policyAction: AgentAction ) { - // Deep clone !not supporting Date, and undefined value. - const newAgentPolicy = JSON.parse(JSON.stringify(policy)); + // Faster than clone + const newAgentAction: AgentAction = JSON.parse(JSON.stringify(policyAction)); + + delete newAgentAction.policy_id; + delete newAgentAction.policy_revision; // Mutate the policy to set the api token for this agent - newAgentPolicy.outputs.default.api_key = await getOrCreateAgentDefaultOutputAPIKey( + newAgentAction.data.config.outputs.default.api_key = await getOrCreateAgentDefaultOutputAPIKey( soClient, agent ); - const policyChangeAction = await createAgentAction(soClient, { - agent_id: agent.id, - type: 'CONFIG_CHANGE', - data: { config: newAgentPolicy } as any, - created_at: new Date().toISOString(), - sent_at: undefined, - }); + newAgentAction.agent_id = agent.id; - return [policyChangeAction]; + return [newAgentAction]; } export function agentCheckinStateNewActionsFactory() { // Shared Observables - const agentPolicies$ = new Map>(); + const agentPolicies$ = new Map>(); const newActions$ = createNewActionsSharedObservable(); // Rx operators const rateLimiter = createRateLimiter( @@ -162,7 +136,7 @@ export function agentCheckinStateNewActionsFactory() { } const agentPolicyId = agent.policy_id; if (!agentPolicies$.has(agentPolicyId)) { - agentPolicies$.set(agentPolicyId, createAgentPolicySharedObservable(agentPolicyId)); + agentPolicies$.set(agentPolicyId, createAgentPolicyActionSharedObservable(agentPolicyId)); } const agentPolicy$ = agentPolicies$.get(agentPolicyId); if (!agentPolicy$) { @@ -171,9 +145,16 @@ export function agentCheckinStateNewActionsFactory() { const stream$ = agentPolicy$.pipe( timeout(appContextService.getConfig()?.fleet.pollingRequestTimeout || 0), - filter((agentPolicy) => shouldCreateAgentPolicyAction(agent, agentPolicy)), + filter( + (action) => + agent.policy_id !== undefined && + action.policy_revision !== undefined && + action.policy_id !== undefined && + action.policy_id === agent.policy_id && + (!agent.policy_revision || action.policy_revision > agent.policy_revision) + ), rateLimiter(), - mergeMap((agentPolicy) => createAgentActionFromAgentPolicy(soClient, agent, agentPolicy)), + mergeMap((policyAction) => createAgentActionFromPolicyAction(soClient, agent, policyAction)), merge(newActions$), mergeMap(async (data) => { if (!data) { diff --git a/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts b/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts index 2ab5cc8139f69..836a193ccb14a 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts @@ -40,5 +40,6 @@ export function savedObjectToAgentAction(so: SavedObject Date: Thu, 27 Aug 2020 09:04:36 -0400 Subject: [PATCH 2/9] Fix ack --- .../plugins/ingest_manager/server/services/agents/acks.ts | 2 +- .../ingest_manager/server/services/agents/actions.ts | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/x-pack/plugins/ingest_manager/server/services/agents/acks.ts b/x-pack/plugins/ingest_manager/server/services/agents/acks.ts index 2ae6ccb9c94a0..718d15b667f22 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/acks.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/acks.ts @@ -46,7 +46,7 @@ export async function acknowledgeAgentActions( let actions; try { - actions = await getAgentActionByIds(soClient, actionIds); + actions = await getAgentActionByIds(soClient, actionIds, false); } catch (error) { if (Boom.isBoom(error) && error.output.statusCode === 404) { throw Boom.badRequest(`One or more actions cannot be found`); diff --git a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts index 7be9ef589e73c..8462d541f3b30 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts @@ -52,7 +52,8 @@ export async function getAgentActionsForCheckin( export async function getAgentActionByIds( soClient: SavedObjectsClientContract, - actionIds: string[] + actionIds: string[], + decryptData: boolean = true ) { const actions = ( await soClient.bulkGet( @@ -63,6 +64,10 @@ export async function getAgentActionByIds( ) ).saved_objects.map(savedObjectToAgentAction); + if (!decryptData) { + return actions; + } + return Promise.all( actions.map(async (action) => { // Get decrypted actions From 2d552ac87d09da59c99121ba9bd0dcd6795dac55 Mon Sep 17 00:00:00 2001 From: Nicolas Chaulet Date: Mon, 31 Aug 2020 09:27:30 -0400 Subject: [PATCH 3/9] Refacto types --- .../common/types/models/agent.ts | 41 ++- .../common/types/rest_spec/agent.ts | 2 +- .../server/routes/agent/actions_handlers.ts | 3 +- .../server/services/agent_policy_update.ts | 7 +- .../server/services/agents/acks.test.ts | 241 ++---------------- .../server/services/agents/acks.ts | 77 ++++-- .../server/services/agents/actions.test.ts | 8 +- .../server/services/agents/actions.ts | 92 ++++++- .../agents/checkin/state_new_actions.ts | 42 +-- .../server/services/agents/saved_objects.ts | 40 ++- .../ingest_manager/server/types/index.tsx | 3 + .../server/types/models/agent.ts | 7 +- 12 files changed, 269 insertions(+), 294 deletions(-) diff --git a/x-pack/plugins/ingest_manager/common/types/models/agent.ts b/x-pack/plugins/ingest_manager/common/types/models/agent.ts index 96e407d8c09f4..d494507c79a78 100644 --- a/x-pack/plugins/ingest_manager/common/types/models/agent.ts +++ b/x-pack/plugins/ingest_manager/common/types/models/agent.ts @@ -21,34 +21,57 @@ export type AgentStatus = | 'unenrolling' | 'degraded'; -export type AgentActionType = 'CONFIG_CHANGE' | 'DATA_DUMP' | 'RESUME' | 'PAUSE' | 'UNENROLL'; +export type AgentActionType = 'CONFIG_CHANGE' | 'UNENROLL'; + export interface NewAgentAction { type: AgentActionType; data?: any; sent_at?: string; } -export interface AgentAction extends NewAgentAction { +export interface AgentAction { + type: AgentActionType; + data?: any; + sent_at?: string; id: string; - agent_id?: string; - policy_id?: string; - policy_revision?: number; + agent_id: string; + created_at: string; + ack_data?: any; +} + +export interface AgentPolicyAction { + id: string; + type: AgentActionType; + data?: any; + policy_id: string; + policy_revision: number; created_at: string; ack_data?: any; } -export interface AgentActionSOAttributes { +interface CommonAgentActionSOAttributes { type: AgentActionType; sent_at?: string; timestamp?: string; created_at: string; - agent_id?: string; - policy_id?: string; - policy_revision?: number; data?: string; ack_data?: string; } +export type BaseAgentActionSOAttributes = CommonAgentActionSOAttributes & { + agent_id?: string; + policy_id?: string; + policy_revision?: number; +}; + +export type AgentActionSOAttributes = CommonAgentActionSOAttributes & { + agent_id: string; +}; +export type AgentPolicyActionSOAttributes = CommonAgentActionSOAttributes & { + policy_id: string; + policy_revision: number; +}; + export interface NewAgentEvent { type: 'STATE' | 'ERROR' | 'ACTION_RESULT' | 'ACTION'; subtype: // State diff --git a/x-pack/plugins/ingest_manager/common/types/rest_spec/agent.ts b/x-pack/plugins/ingest_manager/common/types/rest_spec/agent.ts index 4a50938049a7b..cac171d76d00f 100644 --- a/x-pack/plugins/ingest_manager/common/types/rest_spec/agent.ts +++ b/x-pack/plugins/ingest_manager/common/types/rest_spec/agent.ts @@ -7,11 +7,11 @@ import { Agent, AgentAction, + NewAgentAction, NewAgentEvent, AgentEvent, AgentStatus, AgentType, - NewAgentAction, } from '../models'; export interface GetAgentsRequest { diff --git a/x-pack/plugins/ingest_manager/server/routes/agent/actions_handlers.ts b/x-pack/plugins/ingest_manager/server/routes/agent/actions_handlers.ts index 81893b1e78338..2f6db5f1d9bcc 100644 --- a/x-pack/plugins/ingest_manager/server/routes/agent/actions_handlers.ts +++ b/x-pack/plugins/ingest_manager/server/routes/agent/actions_handlers.ts @@ -10,7 +10,6 @@ import { RequestHandler } from 'kibana/server'; import { TypeOf } from '@kbn/config-schema'; import { PostNewAgentActionRequestSchema } from '../../types/rest_spec'; import { ActionsService } from '../../services/agents'; -import { NewAgentAction } from '../../../common/types/models'; import { PostNewAgentActionResponse } from '../../../common/types/rest_spec'; export const postNewAgentActionHandlerBuilder = function ( @@ -26,7 +25,7 @@ export const postNewAgentActionHandlerBuilder = function ( const agent = await actionsService.getAgent(soClient, request.params.agentId); - const newAgentAction = request.body.action as NewAgentAction; + const newAgentAction = request.body.action; const savedAgentAction = await actionsService.createAgentAction(soClient, { created_at: new Date().toISOString(), diff --git a/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts b/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts index 43a9ca7074e5f..8a8f60b52e3e1 100644 --- a/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts +++ b/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts @@ -6,7 +6,7 @@ import { SavedObjectsClientContract } from 'src/core/server'; import { generateEnrollmentAPIKey, deleteEnrollmentApiKeyForAgentPolicyId } from './api_keys'; -import { unenrollForAgentPolicyId, createAgentAction } from './agents'; +import { unenrollForAgentPolicyId, createAgentPolicyAction } from './agents'; import { outputService } from './output'; import { agentPolicyService } from './agent_policy'; @@ -29,7 +29,7 @@ export async function agentPolicyUpdateEventHandler( if (action === 'updated') { const policy = await agentPolicyService.getFullAgentPolicy(soClient, agentPolicyId); - if (!policy) { + if (!policy || !policy.revision) { return; } const packages = policy.inputs.reduce((acc, input) => { @@ -40,12 +40,11 @@ export async function agentPolicyUpdateEventHandler( return acc; }, []); - await createAgentAction(soClient, { + await createAgentPolicyAction(soClient, { type: 'CONFIG_CHANGE', data: { config: policy } as any, ack_data: { packages }, created_at: new Date().toISOString(), - sent_at: undefined, policy_id: policy.id, policy_revision: policy.revision, }); diff --git a/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts b/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts index 80fdc305d0ba7..21e71031d0886 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts @@ -6,45 +6,19 @@ import Boom from 'boom'; import { SavedObjectsBulkResponse } from 'kibana/server'; import { savedObjectsClientMock } from 'src/core/server/mocks'; -import { encryptedSavedObjectsMock } from '../../../../../plugins/encrypted_saved_objects/server/mocks'; import { Agent, - AgentAction, AgentActionSOAttributes, + BaseAgentActionSOAttributes, AgentEvent, } from '../../../common/types/models'; import { AGENT_TYPE_PERMANENT, AGENT_ACTION_SAVED_OBJECT_TYPE } from '../../../common/constants'; import { acknowledgeAgentActions } from './acks'; -import { appContextService } from '../app_context'; -import { IngestManagerAppContext } from '../../plugin'; describe('test agent acks services', () => { it('should succeed on valid and matched actions', async () => { const mockSavedObjectsClient = savedObjectsClientMock.create(); - const mockStartEncryptedSOPlugin = encryptedSavedObjectsMock.createStart(); - appContextService.start(({ - encryptedSavedObjectsStart: mockStartEncryptedSOPlugin, - } as unknown) as IngestManagerAppContext); - - const [ - { value: mockStartEncryptedSOClient }, - ] = mockStartEncryptedSOPlugin.getClient.mock.results; - - mockStartEncryptedSOClient.getDecryptedAsInternalUser.mockReturnValue( - Promise.resolve({ - id: 'action1', - references: [], - type: AGENT_ACTION_SAVED_OBJECT_TYPE, - attributes: { - type: 'CONFIG_CHANGE', - agent_id: 'id', - sent_at: '2020-03-14T19:45:02.620Z', - timestamp: '2019-01-04T14:32:03.36764-05:00', - created_at: '2020-03-14T19:45:02.620Z', - }, - }) - ); mockSavedObjectsClient.bulkGet.mockReturnValue( Promise.resolve({ @@ -65,7 +39,7 @@ describe('test agent acks services', () => { } as SavedObjectsBulkResponse) ); - const agentActions = await acknowledgeAgentActions( + await acknowledgeAgentActions( mockSavedObjectsClient, ({ id: 'id', @@ -81,107 +55,20 @@ describe('test agent acks services', () => { } as AgentEvent, ] ); - expect(agentActions).toEqual([ - ({ - type: 'CONFIG_CHANGE', - id: 'action1', - agent_id: 'id', - sent_at: '2020-03-14T19:45:02.620Z', - timestamp: '2019-01-04T14:32:03.36764-05:00', - created_at: '2020-03-14T19:45:02.620Z', - } as unknown) as AgentAction, - ]); }); it('should update config field on the agent if a policy change is acknowledged', async () => { const mockSavedObjectsClient = savedObjectsClientMock.create(); - const mockStartEncryptedSOPlugin = encryptedSavedObjectsMock.createStart(); - appContextService.start(({ - encryptedSavedObjectsStart: mockStartEncryptedSOPlugin, - } as unknown) as IngestManagerAppContext); - const [ - { value: mockStartEncryptedSOClient }, - ] = mockStartEncryptedSOPlugin.getClient.mock.results; - - mockStartEncryptedSOClient.getDecryptedAsInternalUser.mockReturnValue( - Promise.resolve({ - id: 'action1', - references: [], - type: AGENT_ACTION_SAVED_OBJECT_TYPE, - attributes: { - type: 'CONFIG_CHANGE', - agent_id: 'id', - sent_at: '2020-03-14T19:45:02.620Z', - timestamp: '2019-01-04T14:32:03.36764-05:00', - created_at: '2020-03-14T19:45:02.620Z', - data: JSON.stringify({ - config: { - id: 'policy1', - revision: 4, - settings: { - monitoring: { - enabled: true, - use_output: 'default', - logs: true, - metrics: true, - }, - }, - outputs: { - default: { - type: 'elasticsearch', - hosts: ['http://localhost:9200'], - }, - }, - inputs: [ - { - id: 'f2293360-b57c-11ea-8bd3-7bd51e425399', - name: 'system-1', - type: 'logs', - use_output: 'default', - meta: { - package: { - name: 'system', - version: '0.3.0', - }, - }, - dataset: { - namespace: 'default', - }, - streams: [ - { - id: 'logs-system.syslog', - dataset: { - name: 'system.syslog', - }, - paths: ['/var/log/messages*', '/var/log/syslog*'], - exclude_files: ['.gz$'], - multiline: { - pattern: '^\\s', - match: 'after', - }, - processors: [ - { - add_locale: null, - }, - { - add_fields: { - target: '', - fields: { - 'ecs.version': '1.5.0', - }, - }, - }, - ], - }, - ], - }, - ], - }, - }), - }, - }) - ); + const actionAttributes = { + type: 'CONFIG_CHANGE', + policy_id: 'policy1', + policy_revision: 4, + sent_at: '2020-03-14T19:45:02.620Z', + timestamp: '2019-01-04T14:32:03.36764-05:00', + created_at: '2020-03-14T19:45:02.620Z', + ack_data: JSON.stringify({ packages: ['system'] }), + }; mockSavedObjectsClient.bulkGet.mockReturnValue( Promise.resolve({ @@ -190,16 +77,10 @@ describe('test agent acks services', () => { id: 'action1', references: [], type: AGENT_ACTION_SAVED_OBJECT_TYPE, - attributes: { - type: 'CONFIG_CHANGE', - agent_id: 'id', - sent_at: '2020-03-14T19:45:02.620Z', - timestamp: '2019-01-04T14:32:03.36764-05:00', - created_at: '2020-03-14T19:45:02.620Z', - }, + attributes: actionAttributes, }, ], - } as SavedObjectsBulkResponse) + } as SavedObjectsBulkResponse) ); await acknowledgeAgentActions( @@ -220,7 +101,7 @@ describe('test agent acks services', () => { ] ); expect(mockSavedObjectsClient.bulkUpdate).toBeCalled(); - expect(mockSavedObjectsClient.bulkUpdate.mock.calls[0][0]).toHaveLength(2); + expect(mockSavedObjectsClient.bulkUpdate.mock.calls[0][0]).toHaveLength(1); expect(mockSavedObjectsClient.bulkUpdate.mock.calls[0][0][0]).toMatchInlineSnapshot(` Object { "attributes": Object { @@ -237,93 +118,6 @@ describe('test agent acks services', () => { it('should not update config field on the agent if a policy change for an old revision is acknowledged', async () => { const mockSavedObjectsClient = savedObjectsClientMock.create(); - const mockStartEncryptedSOPlugin = encryptedSavedObjectsMock.createStart(); - appContextService.start(({ - encryptedSavedObjectsStart: mockStartEncryptedSOPlugin, - } as unknown) as IngestManagerAppContext); - - const [ - { value: mockStartEncryptedSOClient }, - ] = mockStartEncryptedSOPlugin.getClient.mock.results; - - mockStartEncryptedSOClient.getDecryptedAsInternalUser.mockReturnValue( - Promise.resolve({ - id: 'action1', - references: [], - type: AGENT_ACTION_SAVED_OBJECT_TYPE, - attributes: { - type: 'CONFIG_CHANGE', - agent_id: 'id', - sent_at: '2020-03-14T19:45:02.620Z', - timestamp: '2019-01-04T14:32:03.36764-05:00', - created_at: '2020-03-14T19:45:02.620Z', - data: JSON.stringify({ - config: { - id: 'policy1', - revision: 4, - settings: { - monitoring: { - enabled: true, - use_output: 'default', - logs: true, - metrics: true, - }, - }, - outputs: { - default: { - type: 'elasticsearch', - hosts: ['http://localhost:9200'], - }, - }, - inputs: [ - { - id: 'f2293360-b57c-11ea-8bd3-7bd51e425399', - name: 'system-1', - type: 'logs', - use_output: 'default', - meta: { - package: { - name: 'system', - version: '0.3.0', - }, - }, - dataset: { - namespace: 'default', - }, - streams: [ - { - id: 'logs-system.syslog', - dataset: { - name: 'system.syslog', - }, - paths: ['/var/log/messages*', '/var/log/syslog*'], - exclude_files: ['.gz$'], - multiline: { - pattern: '^\\s', - match: 'after', - }, - processors: [ - { - add_locale: null, - }, - { - add_fields: { - target: '', - fields: { - 'ecs.version': '1.5.0', - }, - }, - }, - ], - }, - ], - }, - ], - }, - }), - }, - }) - ); mockSavedObjectsClient.bulkGet.mockReturnValue( Promise.resolve({ @@ -334,14 +128,15 @@ describe('test agent acks services', () => { type: AGENT_ACTION_SAVED_OBJECT_TYPE, attributes: { type: 'CONFIG_CHANGE', - agent_id: 'id', sent_at: '2020-03-14T19:45:02.620Z', timestamp: '2019-01-04T14:32:03.36764-05:00', created_at: '2020-03-14T19:45:02.620Z', + policy_id: 'policy1', + policy_revision: 99, }, }, ], - } as SavedObjectsBulkResponse) + } as SavedObjectsBulkResponse) ); await acknowledgeAgentActions( @@ -363,7 +158,7 @@ describe('test agent acks services', () => { ] ); expect(mockSavedObjectsClient.bulkUpdate).toBeCalled(); - expect(mockSavedObjectsClient.bulkUpdate.mock.calls[0][0]).toHaveLength(1); + expect(mockSavedObjectsClient.bulkUpdate.mock.calls[0][0]).toHaveLength(0); }); it('should fail for actions that cannot be found on agent actions list', async () => { diff --git a/x-pack/plugins/ingest_manager/server/services/agents/acks.ts b/x-pack/plugins/ingest_manager/server/services/agents/acks.ts index 718d15b667f22..d29dfcec7ef30 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/acks.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/acks.ts @@ -11,9 +11,11 @@ import { SavedObjectsClientContract, } from 'src/core/server'; import Boom from 'boom'; +import LRU from 'lru-cache'; import { Agent, AgentAction, + AgentPolicyAction, AgentEvent, AgentEventSOAttributes, AgentSOAttributes, @@ -29,11 +31,20 @@ import { forceUnenrollAgent } from './unenroll'; const ALLOWED_ACKNOWLEDGEMENT_TYPE: string[] = ['ACTION_RESULT']; +const actionCache = new LRU({ + max: 20, + maxAge: 10 * 60 * 1000, // 10 minutes +}); + export async function acknowledgeAgentActions( soClient: SavedObjectsClientContract, agent: Agent, agentEvents: AgentEvent[] ): Promise { + if (agentEvents.length === 0) { + return []; + } + for (const agentEvent of agentEvents) { if (!isAllowedType(agentEvent.type)) { throw Boom.badRequest(`${agentEvent.type} not allowed for acknowledgment only ACTION_RESULT`); @@ -44,9 +55,9 @@ export async function acknowledgeAgentActions( .map((event) => event.action_id) .filter((actionId) => actionId !== undefined) as string[]; - let actions; + let actions: AgentAction[]; try { - actions = await getAgentActionByIds(soClient, actionIds, false); + actions = await fetchActionsUsingCache(soClient, actionIds); } catch (error) { if (Boom.isBoom(error) && error.output.statusCode === 404) { throw Boom.badRequest(`One or more actions cannot be found`); @@ -54,20 +65,16 @@ export async function acknowledgeAgentActions( throw error; } - const agentActionsIds = []; + const agentActionsIds: string[] = []; for (const action of actions) { if (action.agent_id) { - agentActionsIds.push(action); + agentActionsIds.push(action.id); } if (action.agent_id && action.agent_id !== agent.id) { throw Boom.badRequest(`${action.id} not found`); } } - if (actions.length === 0) { - return []; - } - const isAgentUnenrolled = actions.some((action) => action.type === 'UNENROLL'); if (isAgentUnenrolled) { await forceUnenrollAgent(soClient, agent.id); @@ -88,26 +95,56 @@ export async function acknowledgeAgentActions( }, ] : []), - ...buildUpdateAgentActionSentAt(actionIds), + ...buildUpdateAgentActionSentAt(agentActionsIds), ]); return actions; } -function getLatestConfigChangePolicyActionIfUpdated(agent: Agent, actions: AgentAction[]) { - return actions.reduce((acc, action) => { - if (action.type !== 'CONFIG_CHANGE') { - return acc; - } - if (action.policy_id !== agent.policy_id) { - return acc; - } - - if (!acc) { +async function fetchActionsUsingCache( + soClient: SavedObjectsClientContract, + actionIds: string[] +): Promise { + const missingActionIds: string[] = []; + const actions = actionIds + .map((actionId) => { + const action = actionCache.get(actionId); + if (!action) { + missingActionIds.push(actionId); + } return action; + }) + .filter((action): action is AgentAction => action !== undefined); + + if (missingActionIds.length === 0) { + return actions; + } + + const freshActions = await getAgentActionByIds(soClient, actionIds, false); + freshActions.forEach((action) => actionCache.set(action.id, action)); + + return [...freshActions, ...actions]; +} + +function isAgentPolicyAction(action: AgentAction | AgentPolicyAction): action is AgentPolicyAction { + return (action as AgentPolicyAction).policy_id !== undefined; +} + +function getLatestConfigChangePolicyActionIfUpdated( + agent: Agent, + actions: Array +): AgentPolicyAction | null { + return actions.reduce((acc, action) => { + if ( + !isAgentPolicyAction(action) || + action.type !== 'CONFIG_CHANGE' || + action.policy_id !== agent.policy_id || + (acc?.policy_revision ?? 0) < (agent.policy_revision || 0) + ) { + return acc; } - if (action.policy_revision && action.policy_revision > (acc.policy_revision || 0)) { + if (action.policy_revision > (acc?.policy_revision ?? 0)) { return action; } diff --git a/x-pack/plugins/ingest_manager/server/services/agents/actions.test.ts b/x-pack/plugins/ingest_manager/server/services/agents/actions.test.ts index c739007952389..bcb3fc7fdc7bd 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/actions.test.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/actions.test.ts @@ -22,7 +22,13 @@ describe('test agent actions services', () => { }; mockSavedObjectsClient.create.mockReturnValue( Promise.resolve({ - attributes: {}, + attributes: { + agent_id: 'agentid', + type: 'CONFIG_CHANGE', + data: JSON.stringify({ content: 'data' }), + sent_at: '2020-03-14T19:45:02.620Z', + created_at: '2020-03-14T19:45:02.620Z', + }, } as SavedObject) ); await createAgentAction(mockSavedObjectsClient, newAgentAction); diff --git a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts index 8462d541f3b30..121dc2cddc766 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts @@ -4,8 +4,15 @@ * you may not use this file except in compliance with the Elastic License. */ -import { SavedObjectsClientContract } from 'kibana/server'; -import { Agent, AgentAction, AgentActionSOAttributes } from '../../../common/types/models'; +import { SavedObjectsClientContract, SavedObject } from 'kibana/server'; +import { + Agent, + AgentAction, + AgentPolicyAction, + BaseAgentActionSOAttributes, + AgentActionSOAttributes, + AgentPolicyActionSOAttributes, +} from '../../../common/types/models'; import { AGENT_ACTION_SAVED_OBJECT_TYPE } from '../../../common/constants'; import { savedObjectToAgentAction } from './saved_objects'; import { appContextService } from '../app_context'; @@ -14,13 +21,37 @@ export async function createAgentAction( soClient: SavedObjectsClientContract, newAgentAction: Omit ): Promise { - const so = await soClient.create(AGENT_ACTION_SAVED_OBJECT_TYPE, { + return createAction(soClient, newAgentAction); +} + +export function createAgentPolicyAction( + soClient: SavedObjectsClientContract, + newAgentAction: Omit +): Promise { + return createAction(soClient, newAgentAction); +} +async function createAction( + soClient: SavedObjectsClientContract, + newAgentAction: Omit +): Promise; +async function createAction( + soClient: SavedObjectsClientContract, + newAgentAction: Omit +): Promise; +async function createAction( + soClient: SavedObjectsClientContract, + newAgentAction: Omit | Omit +): Promise { + const so = await soClient.create(AGENT_ACTION_SAVED_OBJECT_TYPE, { ...newAgentAction, data: newAgentAction.data ? JSON.stringify(newAgentAction.data) : undefined, ack_data: newAgentAction.ack_data ? JSON.stringify(newAgentAction.ack_data) : undefined, }); - const agentAction = savedObjectToAgentAction(so); + const agentAction = + so.attributes.agent_id !== undefined + ? savedObjectToAgentAction(so as SavedObject) + : savedObjectToAgentAction(so as SavedObject); agentAction.data = newAgentAction.data; return agentAction; @@ -30,7 +61,7 @@ export async function getAgentActionsForCheckin( soClient: SavedObjectsClientContract, agentId: string ): Promise { - const res = await soClient.find({ + const res = await soClient.find({ type: AGENT_ACTION_SAVED_OBJECT_TYPE, filter: `not ${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.sent_at: * and ${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.agent_id:${agentId}`, }); @@ -62,7 +93,7 @@ export async function getAgentActionByIds( type: AGENT_ACTION_SAVED_OBJECT_TYPE, })) ) - ).saved_objects.map(savedObjectToAgentAction); + ).saved_objects.map((action) => savedObjectToAgentAction(action)); if (!decryptData) { return actions; @@ -83,20 +114,61 @@ export async function getAgentActionByIds( ); } +export async function getAgentPolicyActionByIds( + soClient: SavedObjectsClientContract, + actionIds: string[], + decryptData: boolean = true +) { + const actions = ( + await soClient.bulkGet( + actionIds.map((actionId) => ({ + id: actionId, + type: AGENT_ACTION_SAVED_OBJECT_TYPE, + })) + ) + ).saved_objects.map((action) => savedObjectToAgentAction(action)); + + if (!decryptData) { + return actions; + } + + return Promise.all( + actions.map(async (action) => { + // Get decrypted actions + return savedObjectToAgentAction( + await appContextService + .getEncryptedSavedObjects() + .getDecryptedAsInternalUser( + AGENT_ACTION_SAVED_OBJECT_TYPE, + action.id + ) + ); + }) + ); +} + +function isAgentActionSavedObject( + so: SavedObject +): so is SavedObject { + return so.attributes.agent_id !== undefined; +} + export async function getNewActionsSince(soClient: SavedObjectsClientContract, timestamp: string) { - const res = await soClient.find({ + const res = await soClient.find({ type: AGENT_ACTION_SAVED_OBJECT_TYPE, filter: `not ${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.sent_at: * AND ${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.created_at >= "${timestamp}"`, }); - return res.saved_objects.map(savedObjectToAgentAction); + return res.saved_objects + .filter(isAgentActionSavedObject) + .map((so) => savedObjectToAgentAction(so as SavedObject)); } export async function getLatestConfigChangeAction( soClient: SavedObjectsClientContract, policyId: string ) { - const res = await soClient.find({ + const res = await soClient.find({ type: AGENT_ACTION_SAVED_OBJECT_TYPE, search: policyId, searchFields: ['policy_id'], @@ -114,6 +186,6 @@ export interface ActionsService { createAgentAction: ( soClient: SavedObjectsClientContract, - newAgentAction: AgentActionSOAttributes + newAgentAction: Omit ) => Promise; } diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts index 3c508a6277ed3..ae8eef3a6daf4 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts @@ -5,6 +5,7 @@ */ import { timer, from, Observable, TimeoutError } from 'rxjs'; +import { omit } from 'lodash'; import { shareReplay, distinctUntilKeyChanged, @@ -16,7 +17,7 @@ import { take, } from 'rxjs/operators'; import { SavedObjectsClientContract, KibanaRequest } from 'src/core/server'; -import { Agent, AgentAction, AgentSOAttributes } from '../../../types'; +import { Agent, AgentAction, AgentPolicyAction, AgentSOAttributes } from '../../../types'; import * as APIKeysService from '../../api_keys'; import { AGENT_SAVED_OBJECT_TYPE, @@ -24,7 +25,11 @@ import { AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS, AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL, } from '../../../constants'; -import { getNewActionsSince, getLatestConfigChangeAction, getAgentActionByIds } from '../actions'; +import { + getNewActionsSince, + getLatestConfigChangeAction, + getAgentPolicyActionByIds, +} from '../actions'; import { appContextService } from '../../app_context'; import { toPromiseAbortable, AbortError, createRateLimiter } from './rxjs_utils'; @@ -48,10 +53,10 @@ function getInternalUserSOClient() { } function createNewActionsSharedObservable(): Observable { + const internalSOClient = getInternalUserSOClient(); + return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe( switchMap(() => { - const internalSOClient = getInternalUserSOClient(); - return from(getNewActionsSince(internalSOClient, new Date().toISOString())); }), shareReplay({ refCount: true, bufferSize: 1 }) @@ -60,11 +65,14 @@ function createNewActionsSharedObservable(): Observable { function createAgentPolicyActionSharedObservable(agentPolicyId: string) { const internalSOClient = getInternalUserSOClient(); + return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe( switchMap(() => from(getLatestConfigChangeAction(internalSOClient, agentPolicyId))), - filter((data): data is AgentAction => data !== undefined), + filter((data): data is AgentPolicyAction => data !== undefined), distinctUntilKeyChanged('id'), - switchMap((data) => from(getAgentActionByIds(internalSOClient, [data.id]).then((r) => r[0]))), + switchMap((data) => + from(getAgentPolicyActionByIds(internalSOClient, [data.id]).then((r) => r[0])) + ), shareReplay({ refCount: true, bufferSize: 1 }) ); } @@ -95,13 +103,19 @@ async function getOrCreateAgentDefaultOutputAPIKey( async function createAgentActionFromPolicyAction( soClient: SavedObjectsClientContract, agent: Agent, - policyAction: AgentAction + policyAction: AgentPolicyAction ) { - // Faster than clone - const newAgentAction: AgentAction = JSON.parse(JSON.stringify(policyAction)); - - delete newAgentAction.policy_id; - delete newAgentAction.policy_revision; + const newAgentAction: AgentAction = Object.assign( + omit( + // Faster than clone + JSON.parse(JSON.stringify(policyAction)) as AgentPolicyAction, + 'policy_id', + 'policy_revision' + ), + { + agent_id: agent.id, + } + ); // Mutate the policy to set the api token for this agent newAgentAction.data.config.outputs.default.api_key = await getOrCreateAgentDefaultOutputAPIKey( @@ -109,14 +123,12 @@ async function createAgentActionFromPolicyAction( agent ); - newAgentAction.agent_id = agent.id; - return [newAgentAction]; } export function agentCheckinStateNewActionsFactory() { // Shared Observables - const agentPolicies$ = new Map>(); + const agentPolicies$ = new Map>(); const newActions$ = createNewActionsSharedObservable(); // Rx operators const rateLimiter = createRateLimiter( diff --git a/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts b/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts index 836a193ccb14a..36d4de20b7253 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts @@ -6,7 +6,15 @@ import Boom from 'boom'; import { SavedObject } from 'src/core/server'; -import { Agent, AgentSOAttributes, AgentAction, AgentActionSOAttributes } from '../../types'; +import { + Agent, + AgentSOAttributes, + AgentAction, + AgentPolicyAction, + AgentActionSOAttributes, + AgentPolicyActionSOAttributes, + BaseAgentActionSOAttributes, +} from '../../types'; export function savedObjectToAgent(so: SavedObject): Agent { if (so.error) { @@ -27,7 +35,13 @@ export function savedObjectToAgent(so: SavedObject): Agent { }; } -export function savedObjectToAgentAction(so: SavedObject): AgentAction { +export function savedObjectToAgentAction(so: SavedObject): AgentAction; +export function savedObjectToAgentAction( + so: SavedObject +): AgentPolicyAction; +export function savedObjectToAgentAction( + so: SavedObject +): AgentAction | AgentPolicyAction { if (so.error) { if (so.error.statusCode === 404) { throw Boom.notFound(so.error.message); @@ -36,9 +50,29 @@ export function savedObjectToAgentAction(so: SavedObject Date: Mon, 31 Aug 2020 15:37:35 -0400 Subject: [PATCH 4/9] Create config change action during setup --- .../server/services/agent_policy.ts | 28 ++++++++++++++++++- .../server/services/agent_policy_update.ts | 24 ++-------------- .../server/services/agents/acks.test.ts | 16 +++++------ .../ingest_manager/server/services/setup.ts | 6 ++++ 4 files changed, 44 insertions(+), 30 deletions(-) diff --git a/x-pack/plugins/ingest_manager/server/services/agent_policy.ts b/x-pack/plugins/ingest_manager/server/services/agent_policy.ts index ea8505a0bafe6..f36150695b917 100644 --- a/x-pack/plugins/ingest_manager/server/services/agent_policy.ts +++ b/x-pack/plugins/ingest_manager/server/services/agent_policy.ts @@ -21,7 +21,7 @@ import { ListWithKuery, } from '../types'; import { DeleteAgentPolicyResponse, storedPackagePoliciesToAgentInputs } from '../../common'; -import { listAgents } from './agents'; +import { createAgentPolicyAction, listAgents } from './agents'; import { packagePolicyService } from './package_policy'; import { outputService } from './output'; import { agentPolicyUpdateEventHandler } from './agent_policy_update'; @@ -368,6 +368,32 @@ class AgentPolicyService { }; } + public async createFleetPolicyChangeAction( + soClient: SavedObjectsClientContract, + agentPolicyId: string + ) { + const policy = await agentPolicyService.getFullAgentPolicy(soClient, agentPolicyId); + if (!policy || !policy.revision) { + return; + } + const packages = policy.inputs.reduce((acc, input) => { + const packageName = input.meta?.package?.name; + if (packageName && acc.indexOf(packageName) < 0) { + return [packageName, ...acc]; + } + return acc; + }, []); + + await createAgentPolicyAction(soClient, { + type: 'CONFIG_CHANGE', + data: { config: policy } as any, + ack_data: { packages }, + created_at: new Date().toISOString(), + policy_id: policy.id, + policy_revision: policy.revision, + }); + } + public async getFullAgentPolicy( soClient: SavedObjectsClientContract, id: string, diff --git a/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts b/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts index 8a8f60b52e3e1..bd46c050d4098 100644 --- a/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts +++ b/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts @@ -6,7 +6,7 @@ import { SavedObjectsClientContract } from 'src/core/server'; import { generateEnrollmentAPIKey, deleteEnrollmentApiKeyForAgentPolicyId } from './api_keys'; -import { unenrollForAgentPolicyId, createAgentPolicyAction } from './agents'; +import { unenrollForAgentPolicyId } from './agents'; import { outputService } from './output'; import { agentPolicyService } from './agent_policy'; @@ -25,29 +25,11 @@ export async function agentPolicyUpdateEventHandler( await generateEnrollmentAPIKey(soClient, { agentPolicyId, }); + await agentPolicyService.createFleetPolicyChangeAction(soClient, agentPolicyId); } if (action === 'updated') { - const policy = await agentPolicyService.getFullAgentPolicy(soClient, agentPolicyId); - if (!policy || !policy.revision) { - return; - } - const packages = policy.inputs.reduce((acc, input) => { - const packageName = input.meta?.package?.name; - if (packageName && acc.indexOf(packageName) < 0) { - return [packageName, ...acc]; - } - return acc; - }, []); - - await createAgentPolicyAction(soClient, { - type: 'CONFIG_CHANGE', - data: { config: policy } as any, - ack_data: { packages }, - created_at: new Date().toISOString(), - policy_id: policy.id, - policy_revision: policy.revision, - }); + await agentPolicyService.createFleetPolicyChangeAction(soClient, agentPolicyId); } if (action === 'deleted') { diff --git a/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts b/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts index 21e71031d0886..866aa587b8a56 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts @@ -74,7 +74,7 @@ describe('test agent acks services', () => { Promise.resolve({ saved_objects: [ { - id: 'action1', + id: 'action2', references: [], type: AGENT_ACTION_SAVED_OBJECT_TYPE, attributes: actionAttributes, @@ -95,7 +95,7 @@ describe('test agent acks services', () => { type: 'ACTION_RESULT', subtype: 'CONFIG', timestamp: '2019-01-04T14:32:03.36764-05:00', - action_id: 'action1', + action_id: 'action2', agent_id: 'id', } as AgentEvent, ] @@ -123,7 +123,7 @@ describe('test agent acks services', () => { Promise.resolve({ saved_objects: [ { - id: 'action1', + id: 'action3', references: [], type: AGENT_ACTION_SAVED_OBJECT_TYPE, attributes: { @@ -152,7 +152,7 @@ describe('test agent acks services', () => { type: 'ACTION_RESULT', subtype: 'CONFIG', timestamp: '2019-01-04T14:32:03.36764-05:00', - action_id: 'action1', + action_id: 'action3', agent_id: 'id', } as AgentEvent, ] @@ -167,7 +167,7 @@ describe('test agent acks services', () => { Promise.resolve({ saved_objects: [ { - id: 'action1', + id: 'action4', error: { message: 'Not found', statusCode: 404, @@ -189,7 +189,7 @@ describe('test agent acks services', () => { type: 'ACTION_RESULT', subtype: 'CONFIG', timestamp: '2019-01-04T14:32:03.36764-05:00', - action_id: 'action2', + action_id: 'action4', agent_id: 'id', } as unknown) as AgentEvent, ] @@ -207,7 +207,7 @@ describe('test agent acks services', () => { Promise.resolve({ saved_objects: [ { - id: 'action1', + id: 'action5', references: [], type: AGENT_ACTION_SAVED_OBJECT_TYPE, attributes: { @@ -234,7 +234,7 @@ describe('test agent acks services', () => { type: 'ACTION', subtype: 'FAILED', timestamp: '2019-01-04T14:32:03.36764-05:00', - action_id: 'action1', + action_id: 'action5', agent_id: 'id', } as unknown) as AgentEvent, ] diff --git a/x-pack/plugins/ingest_manager/server/services/setup.ts b/x-pack/plugins/ingest_manager/server/services/setup.ts index fd5d94a71d672..ba1d252c2729f 100644 --- a/x-pack/plugins/ingest_manager/server/services/setup.ts +++ b/x-pack/plugins/ingest_manager/server/services/setup.ts @@ -190,6 +190,12 @@ export async function setupFleet( }); }) ); + + await Promise.all( + agentPolicies.map((agentPolicy) => + agentPolicyService.createFleetPolicyChangeAction(soClient, agentPolicy.id) + ) + ); } function generateRandomPassword() { From 218832e248eed5121e0301d7aff1c047d9e67016 Mon Sep 17 00:00:00 2001 From: Nicolas Chaulet Date: Thu, 3 Sep 2020 09:54:35 -0400 Subject: [PATCH 5/9] Fix tests --- .../apis/fleet/agents/actions.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/test/ingest_manager_api_integration/apis/fleet/agents/actions.ts b/x-pack/test/ingest_manager_api_integration/apis/fleet/agents/actions.ts index 2b4bb335dfc5c..68e02933f5650 100644 --- a/x-pack/test/ingest_manager_api_integration/apis/fleet/agents/actions.ts +++ b/x-pack/test/ingest_manager_api_integration/apis/fleet/agents/actions.ts @@ -28,13 +28,12 @@ export default function (providerContext: FtrProviderContext) { action: { type: 'CONFIG_CHANGE', data: { data: 'action_data' }, - sent_at: '2020-03-18T19:45:02.620Z', }, }) .expect(200); + expect(apiResponse.item.type).to.eql('CONFIG_CHANGE'); expect(apiResponse.item.data).to.eql({ data: 'action_data' }); - expect(apiResponse.item.sent_at).to.be('2020-03-18T19:45:02.620Z'); }); it('should return a 400 when request does not have type information', async () => { From 4f733975f014bc5c3bc215edcc9e11e5073dc46c Mon Sep 17 00:00:00 2001 From: Nicolas Chaulet Date: Tue, 8 Sep 2020 10:15:21 -0400 Subject: [PATCH 6/9] Fix tests --- .../apis/agent_policy/agent_policy.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x-pack/test/ingest_manager_api_integration/apis/agent_policy/agent_policy.ts b/x-pack/test/ingest_manager_api_integration/apis/agent_policy/agent_policy.ts index 410b0fe093002..18310ed4065a3 100644 --- a/x-pack/test/ingest_manager_api_integration/apis/agent_policy/agent_policy.ts +++ b/x-pack/test/ingest_manager_api_integration/apis/agent_policy/agent_policy.ts @@ -51,6 +51,9 @@ export default function ({ getService }: FtrProviderContext) { before(async () => { await esArchiver.loadIfNeeded('fleet/agents'); }); + before(async () => { + await supertest.post(`/api/ingest_manager/setup`).set('kbn-xsrf', 'xxx').send(); + }); after(async () => { await esArchiver.unload('fleet/agents'); }); From fe9d42a8fa6a05a1d241c2b3fa4566c50adb9b83 Mon Sep 17 00:00:00 2001 From: Nicolas Chaulet Date: Tue, 8 Sep 2020 14:10:58 -0400 Subject: [PATCH 7/9] Fix tests --- .../apis/agent_policy/agent_policy.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/x-pack/test/ingest_manager_api_integration/apis/agent_policy/agent_policy.ts b/x-pack/test/ingest_manager_api_integration/apis/agent_policy/agent_policy.ts index 18310ed4065a3..410b0fe093002 100644 --- a/x-pack/test/ingest_manager_api_integration/apis/agent_policy/agent_policy.ts +++ b/x-pack/test/ingest_manager_api_integration/apis/agent_policy/agent_policy.ts @@ -51,9 +51,6 @@ export default function ({ getService }: FtrProviderContext) { before(async () => { await esArchiver.loadIfNeeded('fleet/agents'); }); - before(async () => { - await supertest.post(`/api/ingest_manager/setup`).set('kbn-xsrf', 'xxx').send(); - }); after(async () => { await esArchiver.unload('fleet/agents'); }); From 664dc85d95ba25a0e5a04dd8750010c94e3e0701 Mon Sep 17 00:00:00 2001 From: Nicolas Chaulet Date: Tue, 8 Sep 2020 17:53:58 -0400 Subject: [PATCH 8/9] Fix tests --- .../ingest_manager/server/services/agent_policy_update.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts b/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts index bd46c050d4098..ff20e25e5bf0d 100644 --- a/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts +++ b/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts @@ -16,8 +16,9 @@ export async function agentPolicyUpdateEventHandler( agentPolicyId: string ) { const adminUser = await outputService.getAdminUser(soClient); - // If no admin user fleet is not enabled just skip this hook - if (!adminUser) { + const outputId = await outputService.getDefaultOutputId(soClient); + // If no admin user and no default output fleet is not enabled just skip this hook + if (!adminUser || !outputId) { return; } From ad6a42cf10ca7ea556a93efd25b463f826e9b51a Mon Sep 17 00:00:00 2001 From: Nicolas Chaulet Date: Sun, 13 Sep 2020 18:55:20 -0400 Subject: [PATCH 9/9] Fix after review --- .../common/types/models/agent.ts | 12 +++---- .../server/services/agent_policy.ts | 2 +- .../server/services/agents/actions.ts | 33 ++++++++++--------- .../agents/checkin/state_new_actions.ts | 2 +- .../server/services/agents/saved_objects.ts | 16 +++++++-- 5 files changed, 38 insertions(+), 27 deletions(-) diff --git a/x-pack/plugins/ingest_manager/common/types/models/agent.ts b/x-pack/plugins/ingest_manager/common/types/models/agent.ts index d494507c79a78..a204373fe2e56 100644 --- a/x-pack/plugins/ingest_manager/common/types/models/agent.ts +++ b/x-pack/plugins/ingest_manager/common/types/models/agent.ts @@ -29,7 +29,7 @@ export interface NewAgentAction { sent_at?: string; } -export interface AgentAction { +export interface AgentAction extends NewAgentAction { type: AgentActionType; data?: any; sent_at?: string; @@ -39,7 +39,7 @@ export interface AgentAction { ack_data?: any; } -export interface AgentPolicyAction { +export interface AgentPolicyAction extends NewAgentAction { id: string; type: AgentActionType; data?: any; @@ -58,12 +58,6 @@ interface CommonAgentActionSOAttributes { ack_data?: string; } -export type BaseAgentActionSOAttributes = CommonAgentActionSOAttributes & { - agent_id?: string; - policy_id?: string; - policy_revision?: number; -}; - export type AgentActionSOAttributes = CommonAgentActionSOAttributes & { agent_id: string; }; @@ -72,6 +66,8 @@ export type AgentPolicyActionSOAttributes = CommonAgentActionSOAttributes & { policy_revision: number; }; +export type BaseAgentActionSOAttributes = AgentActionSOAttributes | AgentPolicyActionSOAttributes; + export interface NewAgentEvent { type: 'STATE' | 'ERROR' | 'ACTION_RESULT' | 'ACTION'; subtype: // State diff --git a/x-pack/plugins/ingest_manager/server/services/agent_policy.ts b/x-pack/plugins/ingest_manager/server/services/agent_policy.ts index 4142cf5ce6ca0..938cfb4351630 100644 --- a/x-pack/plugins/ingest_manager/server/services/agent_policy.ts +++ b/x-pack/plugins/ingest_manager/server/services/agent_policy.ts @@ -398,7 +398,7 @@ class AgentPolicyService { const packages = policy.inputs.reduce((acc, input) => { const packageName = input.meta?.package?.name; if (packageName && acc.indexOf(packageName) < 0) { - return [packageName, ...acc]; + acc.push(packageName); } return acc; }, []); diff --git a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts index d2d1601a8ede8..8519714334986 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { SavedObjectsClientContract, SavedObject } from 'kibana/server'; +import { SavedObjectsClientContract } from 'kibana/server'; import { Agent, AgentAction, @@ -14,7 +14,11 @@ import { AgentPolicyActionSOAttributes, } from '../../../common/types/models'; import { AGENT_ACTION_SAVED_OBJECT_TYPE } from '../../../common/constants'; -import { savedObjectToAgentAction } from './saved_objects'; +import { + isAgentActionSavedObject, + isPolicyActionSavedObject, + savedObjectToAgentAction, +} from './saved_objects'; import { appContextService } from '../app_context'; import { nodeTypes } from '../../../../../../src/plugins/data/common'; @@ -49,13 +53,18 @@ async function createAction( ack_data: newAgentAction.ack_data ? JSON.stringify(newAgentAction.ack_data) : undefined, }); - const agentAction = - so.attributes.agent_id !== undefined - ? savedObjectToAgentAction(so as SavedObject) - : savedObjectToAgentAction(so as SavedObject); - agentAction.data = newAgentAction.data; + if (isAgentActionSavedObject(so)) { + const agentAction = savedObjectToAgentAction(so); + agentAction.data = newAgentAction.data; + + return agentAction; + } else if (isPolicyActionSavedObject(so)) { + const agentAction = savedObjectToAgentAction(so); + agentAction.data = newAgentAction.data; - return agentAction; + return agentAction; + } + throw new Error('Invalid action'); } export async function getAgentActionsForCheckin( @@ -163,12 +172,6 @@ export async function getAgentPolicyActionByIds( ); } -function isAgentActionSavedObject( - so: SavedObject -): so is SavedObject { - return so.attributes.agent_id !== undefined; -} - export async function getNewActionsSince(soClient: SavedObjectsClientContract, timestamp: string) { const filter = nodeTypes.function.buildNode('and', [ nodeTypes.function.buildNode( @@ -194,7 +197,7 @@ export async function getNewActionsSince(soClient: SavedObjectsClientContract, t return res.saved_objects .filter(isAgentActionSavedObject) - .map((so) => savedObjectToAgentAction(so as SavedObject)); + .map((so) => savedObjectToAgentAction(so)); } export async function getLatestConfigChangeAction( diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts index b5891a86ed270..8f586420c3ecb 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts @@ -175,7 +175,7 @@ export function agentCheckinStateNewActionsFactory() { if (!data) { return; } - const newActions = data.filter((action) => action.agent_id); + const newActions = data.filter((action) => action.agent_id === agent.id); if (newActions.length === 0) { return; } diff --git a/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts b/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts index 36d4de20b7253..3ae664c086da9 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts @@ -51,7 +51,7 @@ export function savedObjectToAgentAction( } // If it's an AgentPolicyAction - if (so.attributes.policy_id !== undefined && so.attributes.policy_revision !== undefined) { + if (isPolicyActionSavedObject(so)) { return { id: so.id, type: so.attributes.type, @@ -63,7 +63,7 @@ export function savedObjectToAgentAction( }; } - if (so.attributes.agent_id === undefined) { + if (!isAgentActionSavedObject(so)) { throw new Error(`Malformed saved object AgentAction ${so.id}`); } @@ -77,3 +77,15 @@ export function savedObjectToAgentAction( ack_data: so.attributes.ack_data ? JSON.parse(so.attributes.ack_data) : undefined, }; } + +export function isAgentActionSavedObject( + so: SavedObject +): so is SavedObject { + return (so.attributes as AgentActionSOAttributes).agent_id !== undefined; +} + +export function isPolicyActionSavedObject( + so: SavedObject +): so is SavedObject { + return (so.attributes as AgentPolicyActionSOAttributes).policy_id !== undefined; +}