Skip to content

Commit

Permalink
fix: AgentMessageProcessedEvent payload
Browse files Browse the repository at this point in the history
Signed-off-by: Sai Ranjit Tummalapalli <sairanjit.tummalapalli@ayanworks.com>
  • Loading branch information
sairanjit committed Sep 18, 2024
1 parent 696afdf commit 93eaf72
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 8 deletions.
4 changes: 3 additions & 1 deletion packages/core/src/agent/Dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { CredoError } from '../error'
import { Logger } from '../logger'
import { ProblemReportError, ProblemReportReason } from '../modules/problem-reports'
import { injectable, inject } from '../plugins'
import { EncryptedMessage } from '../types'
import { canHandleMessageType, parseMessageType } from '../utils/messageType'

import { ProblemReportMessage } from './../modules/problem-reports/messages/ProblemReportMessage'
Expand Down Expand Up @@ -64,7 +65,7 @@ class Dispatcher {
await next()
}

public async dispatch(messageContext: InboundMessageContext): Promise<void> {
public async dispatch(messageContext: InboundMessageContext, encryptedMessage?: EncryptedMessage): Promise<void> {
const { agentContext, connection, senderKey, recipientKey, message } = messageContext

// Set default handler if available, middleware can still override the message handler
Expand Down Expand Up @@ -138,6 +139,7 @@ class Dispatcher {
message,
connection,
receivedAt: messageContext.receivedAt,
encryptedMessage,
},
})
}
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/agent/Events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { AgentMessage } from './AgentMessage'
import type { TransportSession } from './TransportService'
import type { OutboundMessageContext, OutboundMessageSendStatus } from './models'
import type { ConnectionRecord } from '../modules/connections'
import type { EncryptedMessage } from '../types'
import type { Observable } from 'rxjs'

import { filter } from 'rxjs'
Expand Down Expand Up @@ -45,6 +46,7 @@ export interface AgentMessageProcessedEvent extends BaseEvent {
message: AgentMessage
connection?: ConnectionRecord
receivedAt?: Date
encryptedMessage?: EncryptedMessage
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/agent/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ export class MessageReceiver {
await session.close()
}

await this.dispatcher.dispatch(messageContext)
await this.dispatcher.dispatch(messageContext, encryptedMessage)
}

/**
Expand Down
7 changes: 4 additions & 3 deletions packages/node/src/transport/HttpInboundTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import type {
import type { Express, Request, Response } from 'express'
import type { Server } from 'http'

import { DidCommMimeType, CredoError, TransportService, utils, AgentEventTypes } from '@credo-ts/core'
import { DidCommMimeType, CredoError, TransportService, utils, AgentEventTypes, deepEquality } from '@credo-ts/core'
import express, { text } from 'express'
import { first, firstValueFrom, ReplaySubject, timeout } from 'rxjs'
import { filter, firstValueFrom, ReplaySubject, timeout } from 'rxjs'

const supportedContentTypes: string[] = [DidCommMimeType.V0, DidCommMimeType.V1]

Expand Down Expand Up @@ -66,7 +66,8 @@ export class HttpInboundTransport implements InboundTransport {

observable
.pipe(
first((e) => e.type === AgentEventTypes.AgentMessageProcessed),
filter((e) => e.type === AgentEventTypes.AgentMessageProcessed),
filter((e) => deepEquality(e.payload.encryptedMessage, encryptedMessage)),
timeout({
first: 10000, // timeout after 10 seconds
meta: 'HttpInboundTransport.start',
Expand Down
7 changes: 4 additions & 3 deletions packages/node/src/transport/WsInboundTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import type {
AgentMessageProcessedEvent,
} from '@credo-ts/core'

import { CredoError, TransportService, utils, AgentEventTypes } from '@credo-ts/core'
import { first, firstValueFrom, ReplaySubject, timeout } from 'rxjs'
import { CredoError, TransportService, utils, AgentEventTypes, deepEquality } from '@credo-ts/core'
import { filter, firstValueFrom, ReplaySubject, timeout } from 'rxjs'
// eslint-disable-next-line import/no-named-as-default
import WebSocket, { Server } from 'ws'

Expand Down Expand Up @@ -79,7 +79,8 @@ export class WsInboundTransport implements InboundTransport {

observable
.pipe(
first((e) => e.type === AgentEventTypes.AgentMessageProcessed),
filter((e) => e.type === AgentEventTypes.AgentMessageProcessed),
filter((e) => deepEquality(e.payload.encryptedMessage, encryptedMessage)),
timeout({
first: 10000, // timeout after 10 seconds
meta: 'WsInboundTransport.listenOnWebSocketMessages',
Expand Down

0 comments on commit 93eaf72

Please sign in to comment.