From 15a727ce9b2551ceb230c767c943fba711d340b6 Mon Sep 17 00:00:00 2001 From: FabioPinheiro Date: Mon, 24 Jul 2023 17:28:20 +0100 Subject: [PATCH 1/9] feat: Send problem report on missing protocol or wrong version or worng role --- TASKs.md | 77 +++++++++++++++++++ build.sbt | 2 +- .../mediator/actions/ProtocolExecute.scala | 9 ++- .../MediatorCoordinationExecuter.scala | 47 ++++++++++- .../protocols/MissingProtocolExecuter.scala | 29 +++++++ .../mediator/protocols/PickupExecuter.scala | 32 +++++++- .../atala/mediator/protocols/Problems.scala | 59 ++++++++++++++ 7 files changed, 244 insertions(+), 11 deletions(-) create mode 100644 TASKs.md create mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/protocols/MissingProtocolExecuter.scala create mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala diff --git a/TASKs.md b/TASKs.md new file mode 100644 index 00000000..a7312e78 --- /dev/null +++ b/TASKs.md @@ -0,0 +1,77 @@ +# ATL-5236 - Error Handling (PRISM Agent and Mediator) + +https://input-output.atlassian.net/browse/ATL-5236 + +https://identity.foundation/didcomm-messaging/spec/#problem-reports + + +On each step of all our protocols processing, when something wrong is happening, we need to: +Goals +- Update the record to a documented error state +- Log the error in the service logs +- Send the problem report message when appropriate + +Goal other: error recover/resilient +- [optional] Send event that record state changed to error +- Decide on the policy of re-trying sending errors (one of the proposals is just to send it once, and if a recipient did not get this, then it’s on its own requesting record ID state) + + + +Tasks: +- (2W * 2Dev) Mediator (Note: most errors in Mediator will be synchronous) + - Store messages when sending (1w) + - Catch Errors and send Problem Reports (1w): + - (sync) e.p.crypto - is message is tampering (any crypto error). + - (sync) e.p.crypto.unsupported - is message is tampering (any crypto error). + - (sync & async) e.p.crypto.replay - if the message is replay (possible he replay attack). + - (sync) e.p.req - pickup message before enroling. + - (sync) e.p.me.res.storage - connection MongoBD is not working. + - (sync) e.p.me.res.storage - business logic MongoBD is not working. + - (sync) e.p.did - for any DID method that is not `did.peer`. + - (sync) e.p.did.malformed - for any DID method malformed. + - (sync) e.p.msg - for parsing error from the message. + - (sync) e.p.msg.unsupported - for the message type LiveModeChange and all message that is not role of the mediator + - [DONE] MediateGrant + - [DONE] MediateDeny + - [DONE] KeylistResponse + - [DONE] Status = https://didcomm.org/messagepickup/3.0/status + - [DONE] LiveModeChange - https://didcomm.org/messagepickup/3.0/live-delivery-change + - [TODO] ... + - (sync) e.p.msg.unsupported - for parsing error due to unsupported version or protocol. + - [DONE] MissingProtocolExecuter (unsupported protocol it also works fine for unsupported versions) + - (sync & async) e.p.req.not_enroll - Get a Forward message to a DID that is not enrolled. + - (sync & async) e.p.me - catch all error at the end. + - Receive a problem report (1w): + - in case of Warnings Reply `w.p` -> log warnings and escalate to an error `e.p` on the reply + - in case of Error `e.p` -> log error + +- (4/6W * 2Dev) PRISM Agent (Note: most error in PRISM Agent will be asynchronous) + - Store DID Comm messages in a searchable way (4W) + - Store receiving messages (in a searchable way) + - Store sending messages (in a searchable way) + - both plaintext and encrypted + - [optional] Maybe later MongoBD for PRISM Agent + - Implement Problem Reports in Mercury (DONE??) + - Catch all the Errors and send Problem Reports: (4W) + - e.p.xfer - if it found nobody listening on the specified port. (service endpoint unavailable) + - (async) e.p.crypto - is message is tampering (any crypto error). + - (async) e.p.crypto.unsupported - is message is tampering (any crypto error). + - (sync & async) e.p.crypto.replay - if the message is replay (possible he replay attack). + - (async) e.p.me.res.storage - connection MongoBD is not working. + - (async) e.p.me.res.storage - business logic MongoBD is not working. + - (async) e.p.did - for any DID method that is not `did.peer`. + - (async) e.p.did.malformed - for any DID method malformed. + - (async) e.p.msg - for parsing errors from the message. + - (async) e.p.msg.unsupported - for the message with the wrong role of the agent. + - (async) e.p.msg.unsupported - for parsing error due to unsupported version or protocol. + - (sync & async) e.p.me - catch all error at the end. + - (async) TODO `w.m` + - Receive e Problem Reports (1W): + - support for scope `m` -> state update (of the protocol execution) + - support for scope `p` -> state update (and fail protocol execution) + +- Other task (not for this ticket) + - Traceability of the MsgID of the Problem Report to the original error (2d) -> ATL-4147 + - [optional] Log - https://input-output.atlassian.net/browse/ATL-4147 + - escalate_to must be configurable (1d) + - [optional] update the protocol with new tokens (2d) diff --git a/build.sbt b/build.sbt index ac9639e5..c0d62393 100644 --- a/build.sbt +++ b/build.sbt @@ -9,7 +9,7 @@ inThisBuild( /** Versions */ lazy val V = new { - val scalaDID = "0.1.0-M6" + val scalaDID = "0.1.0-M6+0-b02d2b9e+20230724-1637-SNAPSHOT" // val scalajsJavaSecureRandom = "1.0.0" // FIXME another bug in the test framework https://github.com/scalameta/munit/issues/554 diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala index 63ffc2b2..a94c476c 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala @@ -13,6 +13,7 @@ import io.iohk.atala.mediator.db.* import io.iohk.atala.mediator.protocols.NullProtocolExecuter import zio.* import zio.json.* +import io.iohk.atala.mediator.protocols.MissingProtocolExecuter //TODO pick a better name // maybe "Protocol" only trait ProtocolExecuter[-R] { @@ -29,7 +30,7 @@ trait ProtocolExecuter[-R] { object ProtocolExecuter { type Services = Resolver & Agent & Operations & MessageDispatcher } -case class ProtocolExecuterCollection[-R](executers: ProtocolExecuter[R]*) extends ProtocolExecuter[R] { +case class ProtocolExecuterCollection[-R <: Agent](executers: ProtocolExecuter[R]*) extends ProtocolExecuter[R] { override def suportedPIURI: Seq[PIURI] = executers.flatMap(_.suportedPIURI) @@ -39,14 +40,16 @@ case class ProtocolExecuterCollection[-R](executers: ProtocolExecuter[R]*) exten plaintextMessage: PlaintextMessage, ): ZIO[R1, MediatorError, Option[EncryptedMessage]] = selectExecutersFor(plaintextMessage.`type`) match - case None => NullProtocolExecuter.execute(plaintextMessage) + // case None => NullProtocolExecuter.execute(plaintextMessage) + case None => MissingProtocolExecuter.execute(plaintextMessage) case Some(px) => px.execute(plaintextMessage) override def program[R1 <: R]( plaintextMessage: PlaintextMessage, ): ZIO[R1, MediatorError, Action] = selectExecutersFor(plaintextMessage.`type`) match - case None => NullProtocolExecuter.program(plaintextMessage) + // case None => NullProtocolExecuter.program(plaintextMessage) + case None => MissingProtocolExecuter.program(plaintextMessage) case Some(px) => px.program(plaintextMessage) } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala index ddcfe424..0800b47c 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala @@ -44,8 +44,34 @@ object MediatorCoordinationExecuter extends ProtocolExecuterWithServices[Protoco case `piuriKeylistQuery` => plaintextMessage.toKeylistQuery case `piuriKeylist` => plaintextMessage.toKeylist }).map { - case m: MediateGrant => ZIO.logWarning("MediateGrant") *> ZIO.succeed(NoReply) - case m: MediateDeny => ZIO.logWarning("MediateDeny") *> ZIO.succeed(NoReply) + case m: MediateGrant => + ZIO.logWarning("MediateGrant") *> ZIO.succeed(NoReply) *> + ZIO.succeed( + SyncReplyOnly( + Problems + .unsupportedProtocolRole( + from = m.to.asFROM, + to = m.from.asTO, + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + ) + .toPlaintextMessage + ) + ) + case m: MediateDeny => + ZIO.logWarning("MediateDeny") *> ZIO.succeed(NoReply) *> + ZIO.succeed( + SyncReplyOnly( + Problems + .unsupportedProtocolRole( + from = m.to.asFROM, + to = m.from.asTO, + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + ) + .toPlaintextMessage + ) + ) case m: MediateRequest => for { _ <- ZIO.logInfo("MediateRequest") @@ -74,7 +100,20 @@ object MediatorCoordinationExecuter extends ProtocolExecuterWithServices[Protoco } } } yield SyncReplyOnly(m.makeKeylistResponse(updateResponse).toPlaintextMessage) - case m: KeylistResponse => ZIO.logWarning("KeylistResponse") *> ZIO.succeed(NoReply) + case m: KeylistResponse => + ZIO.logWarning("KeylistResponse") *> ZIO.succeed(NoReply) *> + ZIO.succeed( + SyncReplyOnly( + Problems + .unsupportedProtocolRole( + from = m.to.asFROM, + to = m.from.asTO, + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + ) + .toPlaintextMessage + ) + ) case m: KeylistQuery => for { _ <- ZIO.logInfo("KeylistQuery") @@ -94,7 +133,7 @@ object MediatorCoordinationExecuter extends ProtocolExecuterWithServices[Protoco case Some(response) => SyncReplyOnly(response.toPlaintextMessage) case m: Keylist => ZIO.logWarning("Keylist") *> ZIO.succeed(NoReply) } match - case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) + case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) // TODO error report case Right(program) => program } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MissingProtocolExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MissingProtocolExecuter.scala new file mode 100644 index 00000000..99e54771 --- /dev/null +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MissingProtocolExecuter.scala @@ -0,0 +1,29 @@ +package io.iohk.atala.mediator.protocols + +import zio.ZIO + +import fmgp.did.* +import fmgp.did.comm.PlaintextMessage +import io.iohk.atala.mediator.MissingProtocolError +import io.iohk.atala.mediator.actions.ProtocolExecuter +import io.iohk.atala.mediator.actions.Reply + +object MissingProtocolExecuter extends ProtocolExecuter[Agent] { + + override def suportedPIURI = Seq() + override def program[R1 <: Agent](plaintextMessage: PlaintextMessage) = + ZIO + .service[Agent] + .map(agent => + Reply( + Problems + .unsupportedProtocolType( + to = plaintextMessage.from.map(_.asTO).toSet, + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + .toPlaintextMessage + ) + ) +} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala index f2a5687d..77b06ee4 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala @@ -64,7 +64,20 @@ object PickupExecuter live_delivery = None, // TODO ) } yield SyncReplyOnly(status.toPlaintextMessage) - case m: Status => ZIO.logInfo("Status") *> ZIO.succeed(NoReply) + case m: Status => + ZIO.logInfo("Status") *> + ZIO.succeed( + SyncReplyOnly( + Problems + .unsupportedProtocolRole( + from = m.to.asFROM, + to = m.from.asTO, + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + ) + .toPlaintextMessage + ) + ) case m: DeliveryRequest => for { _ <- ZIO.logInfo("DeliveryRequest") @@ -73,7 +86,7 @@ object PickupExecuter didRequestingMessages = m.from.asFROMTO mDidAccount <- repoDidAccount.getDidAccount(didRequestingMessages.toDID) msgHash = mDidAccount match - case None => ??? + case None => ??? // TODO ERROR case Some(didAccount) => didAccount.messagesRef.filter(_.state == false).map(_.hash) allMessagesFor <- repoMessageItem.findByIds(msgHash) messagesToReturn = @@ -115,7 +128,20 @@ object PickupExecuter m.message_id_list ) } yield NoReply - case m: LiveModeChange => ZIO.logWarning("LiveModeChange not implemented") *> ZIO.succeed(NoReply) // TODO + case m: LiveModeChange => + ZIO.logWarning("LiveModeChange not implemented") *> + ZIO.succeed( + SyncReplyOnly( + Problems + .protocolNotImplemented( + from = m.to.asFROM, + to = m.from.asTO, + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + ) + .toPlaintextMessage + ) + ) } match case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala new file mode 100644 index 00000000..41bfedc6 --- /dev/null +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala @@ -0,0 +1,59 @@ +package io.iohk.atala.mediator.protocols + +import fmgp.did.* +import fmgp.did.comm.* +import fmgp.did.comm.protocol.reportproblem2.* + +object Problems { + def unsupportedProtocolType( + to: Set[TO], + from: FROM, + pthid: MsgID, + piuri: PIURI, + ) = ProblemReport( + // id: MsgID = MsgID(), + to = to, + from = from, // Can it be Option? + pthid = pthid, + ack = None, // Option[Seq[MsgID]], + code = ProblemCode.ErroFail("msg", "unsupported"), + comment = None, // Option[String], + args = None, // Option[Seq[String]], + escalate_to = None, // Option[String], + ) + + def unsupportedProtocolRole( + to: TO, + from: FROM, + pthid: MsgID, + piuri: PIURI, + ) = ProblemReport( + // id: MsgID = MsgID(), + to = Set(to), + from = from, // Can it be Option? + pthid = pthid, + ack = None, // Option[Seq[MsgID]], + code = ProblemCode.ErroFail("msg", "unsupported"), + comment = None, // Option[String], + args = None, // Option[Seq[String]], + escalate_to = None, // Option[String], + ) + + def protocolNotImplemented( + to: TO, + from: FROM, + pthid: MsgID, + piuri: PIURI, + ) = ProblemReport( + // id: MsgID = MsgID(), + to = Set(to), + from = from, // Can it be Option? + pthid = pthid, + ack = None, // Option[Seq[MsgID]], + code = ProblemCode.ErroFail("msg", "unsupported"), + comment = None, // Option[String], + args = None, // Option[Seq[String]], + escalate_to = None, // Option[String], + ) + +} From 491a64050b65d2084c9913d72d2a8501322a9573 Mon Sep 17 00:00:00 2001 From: FabioPinheiro Date: Tue, 25 Jul 2023 18:14:37 +0100 Subject: [PATCH 2/9] Catch All StorageError --- TASKs.md | 15 +-- .../atala/mediator/actions/ActionUtils.scala | 94 +++++++++++++++++++ .../mediator/actions/ProtocolExecute.scala | 71 +------------- .../atala/mediator/app/MediatorAgent.scala | 61 ++++++++++-- .../mediator/protocols/PickupExecuter.scala | 2 +- .../atala/mediator/protocols/Problems.scala | 17 ++++ 6 files changed, 175 insertions(+), 85 deletions(-) create mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala diff --git a/TASKs.md b/TASKs.md index a7312e78..1bfb43c6 100644 --- a/TASKs.md +++ b/TASKs.md @@ -26,19 +26,21 @@ Tasks: - (sync & async) e.p.crypto.replay - if the message is replay (possible he replay attack). - (sync) e.p.req - pickup message before enroling. - (sync) e.p.me.res.storage - connection MongoBD is not working. + - [QA] catch all StorageCollection Error - (sync) e.p.me.res.storage - business logic MongoBD is not working. + - [QA] catch all StorageThrowable - (sync) e.p.did - for any DID method that is not `did.peer`. - (sync) e.p.did.malformed - for any DID method malformed. - (sync) e.p.msg - for parsing error from the message. - (sync) e.p.msg.unsupported - for the message type LiveModeChange and all message that is not role of the mediator - - [DONE] MediateGrant - - [DONE] MediateDeny - - [DONE] KeylistResponse - - [DONE] Status = https://didcomm.org/messagepickup/3.0/status - - [DONE] LiveModeChange - https://didcomm.org/messagepickup/3.0/live-delivery-change + - [QA] MediateGrant + - [QA] MediateDeny + - [QA] KeylistResponse + - [QA] Status = https://didcomm.org/messagepickup/3.0/status + - [QA] LiveModeChange - https://didcomm.org/messagepickup/3.0/live-delivery-change - [TODO] ... - (sync) e.p.msg.unsupported - for parsing error due to unsupported version or protocol. - - [DONE] MissingProtocolExecuter (unsupported protocol it also works fine for unsupported versions) + - [QA] MissingProtocolExecuter (unsupported protocol it also works fine for unsupported versions) - (sync & async) e.p.req.not_enroll - Get a Forward message to a DID that is not enrolled. - (sync & async) e.p.me - catch all error at the end. - Receive a problem report (1w): @@ -75,3 +77,4 @@ Tasks: - [optional] Log - https://input-output.atlassian.net/browse/ATL-4147 - escalate_to must be configurable (1d) - [optional] update the protocol with new tokens (2d) + - `e.p.me.res.storage` diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala new file mode 100644 index 00000000..4e94a349 --- /dev/null +++ b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala @@ -0,0 +1,94 @@ +package io.iohk.atala.mediator.actions + +import fmgp.crypto.error.* +import fmgp.did.* +import fmgp.did.comm.* +import fmgp.did.comm.Operations.* +import fmgp.did.comm.protocol.* +import fmgp.did.comm.protocol.basicmessage2.* +import fmgp.did.comm.protocol.trustping2.* +import io.iohk.atala.mediator.* +import io.iohk.atala.mediator.comm.* +import io.iohk.atala.mediator.db.* +import io.iohk.atala.mediator.protocols.NullProtocolExecuter +import zio.* +import zio.json.* +import io.iohk.atala.mediator.protocols.MissingProtocolExecuter + +object ActionUtils { + + def packResponse( + plaintextMessage: PlaintextMessage, + action: Action + ): ZIO[Operations & Agent & (Resolver & MessageDispatcher), MediatorError, Option[EncryptedMessage]] = + action match { + case _: NoReply.type => ZIO.succeed(None) + case action: AnyReply => + val reply = action.msg + for { + msg <- { + reply.from match + case Some(value) => authEncrypt(reply) + case None => anonEncrypt(reply) + }.mapError(fail => MediatorDidError(fail)) + // TODO forward message + maybeSyncReplyMsg <- reply.to.map(_.toSeq) match // TODO improve + case None => ZIO.logWarning("Have a reply but the field 'to' is missing") *> ZIO.none + case Some(Seq()) => ZIO.logWarning("Have a reply but the field 'to' is empty") *> ZIO.none + case Some(send2DIDs) => + ZIO + .foreach(send2DIDs)(to => + val job: ZIO[MessageDispatcher & (Resolver & Any), MediatorError, Matchable] = for { + messageDispatcher <- ZIO.service[MessageDispatcher] + resolver <- ZIO.service[Resolver] + doc <- resolver + .didDocument(to) + .mapError(fail => MediatorDidError(fail)) + mURL = doc.service.toSeq.flatten + .filter(_.`type` match { + case str: String => str == DIDService.TYPE_DIDCommMessaging + case seq: Seq[String] => seq.contains(DIDService.TYPE_DIDCommMessaging) + }) match { + case head +: next => // FIXME discarte the next + head.getServiceEndpointAsURIs.headOption // TODO head + case Seq() => None // TODO + } + jobToRun <- mURL match + case None => ZIO.logWarning(s"No url to send message") + case Some(url) => { + ZIO.log(s"Send to url: $url") *> + messageDispatcher + .send( + msg, + url, // "http://localhost:8080", // FIXME REMOVE (use for local env) + None + // url match // FIXME REMOVE (use for local env) + // case http if http.startsWith("http://") => Some(url.drop(7).split(':').head.split('/').head) + // case https if https.startsWith("https://") => + // Some(url.drop(8).split(':').head.split('/').head) + // case _ => None + ) + .catchAll { case DispatcherError(error) => ZIO.logWarning(s"Dispatch Error: $error") } + } + + } yield (jobToRun) + action match + case Reply(_) => job + case SyncReplyOnly(_) => ZIO.unit + case AsyncReplyOnly(_) => job + ) *> ZIO + .succeed(msg) + .when( + { + plaintextMessage.return_route.contains(ReturnRoute.all) + && { + plaintextMessage.from.map(_.asTO) match { + case None => false + case Some(replyTo) => send2DIDs.contains(replyTo) + } + } + } || action.isInstanceOf[SyncReplyOnly] + ) + } yield maybeSyncReplyMsg + } +} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala index a94c476c..dad9959b 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala @@ -61,76 +61,7 @@ trait ProtocolExecuterWithServices[-R <: ProtocolExecuter.Services] extends Prot ): ZIO[R1, MediatorError, Option[EncryptedMessage]] = program(plaintextMessage) .tap(v => ZIO.logDebug(v.toString)) // DEBUG - .flatMap { - case _: NoReply.type => ZIO.succeed(None) - case action: AnyReply => - val reply = action.msg - for { - msg <- { - reply.from match - case Some(value) => authEncrypt(reply) - case None => anonEncrypt(reply) - }.mapError(fail => MediatorDidError(fail)) - // TODO forward message - maybeSyncReplyMsg <- reply.to.map(_.toSeq) match // TODO improve - case None => ZIO.logWarning("Have a reply but the field 'to' is missing") *> ZIO.none - case Some(Seq()) => ZIO.logWarning("Have a reply but the field 'to' is empty") *> ZIO.none - case Some(send2DIDs) => - ZIO - .foreach(send2DIDs)(to => - val job: ZIO[MessageDispatcher & (Resolver & Any), MediatorError, Matchable] = for { - messageDispatcher <- ZIO.service[MessageDispatcher] - resolver <- ZIO.service[Resolver] - doc <- resolver - .didDocument(to) - .mapError(fail => MediatorDidError(fail)) - mURL = doc.service.toSeq.flatten - .filter(_.`type` match { - case str: String => str == DIDService.TYPE_DIDCommMessaging - case seq: Seq[String] => seq.contains(DIDService.TYPE_DIDCommMessaging) - }) match { - case head +: next => // FIXME discarte the next - head.getServiceEndpointAsURIs.headOption // TODO head - case Seq() => None // TODO - } - jobToRun <- mURL match - case None => ZIO.logWarning(s"No url to send message") - case Some(url) => { - ZIO.log(s"Send to url: $url") *> - messageDispatcher - .send( - msg, - url, // "http://localhost:8080", // FIXME REMOVE (use for local env) - None - // url match // FIXME REMOVE (use for local env) - // case http if http.startsWith("http://") => Some(url.drop(7).split(':').head.split('/').head) - // case https if https.startsWith("https://") => - // Some(url.drop(8).split(':').head.split('/').head) - // case _ => None - ) - .catchAll { case DispatcherError(error) => ZIO.logWarning(s"Dispatch Error: $error") } - } - - } yield (jobToRun) - action match - case Reply(_) => job - case SyncReplyOnly(_) => ZIO.unit - case AsyncReplyOnly(_) => job - ) *> ZIO - .succeed(msg) - .when( - { - plaintextMessage.return_route.contains(ReturnRoute.all) - && { - plaintextMessage.from.map(_.asTO) match { - case None => false - case Some(replyTo) => send2DIDs.contains(replyTo) - } - } - } || action.isInstanceOf[SyncReplyOnly] - ) - } yield maybeSyncReplyMsg - } + .flatMap(action => ActionUtils.packResponse(plaintextMessage, action)) override def program[R1 <: R]( plaintextMessage: PlaintextMessage, diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala index 692b65ef..f58fcd17 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala @@ -112,17 +112,60 @@ case class MediatorAgent( ] = ZIO .logAnnotate("msgHash", msg.sha1) { - for { + val xxx = for { _ <- ZIO.log("receivedMessage") maybeSyncReplyMsg <- if (!msg.recipientsSubject.contains(id)) - ZIO.logError(s"This mediator '${id.string}' is not a recipient") - *> ZIO.none + ZIO.logError(s"This mediator '${id.string}' is not a recipient") *> ZIO.none else for { messageItemRepo <- ZIO.service[MessageItemRepo] - _ <- messageItemRepo.insert(MessageItem(msg)) // store all message + protocolHandler <- ZIO.service[ProtocolExecuter[Services]] plaintextMessage <- decrypt(msg) + maybeActionStorageError <- messageItemRepo + .insert(MessageItem(msg)) // store all message + .map(_ /*WriteResult*/ => None + // TODO messages already on the database -> so this might be a replay attack + ) + .catchSome { + case StorageCollection(error) => + // This deals with connection errors to the database. + ZIO.logWarning(s"Error StorageCollection: $error") *> + ZIO + .service[Agent] + .map(agent => + Some( + Reply( + Problems + .storageError( + to = plaintextMessage.from.map(_.asTO).toSet, + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + .toPlaintextMessage + ) + ) + ) + case StorageThrowable(error) => + ZIO.logWarning(s"Error StorageThrowable: $error") *> + ZIO + .service[Agent] + .map(agent => + Some( + Reply( + Problems + .storageError( + to = plaintextMessage.from.map(_.asTO).toSet, + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + .toPlaintextMessage + ) + ) + ) + } _ <- didSocketManager.get.flatMap { m => // TODO HACK REMOVE !!!!!!!!!!!!!!!!!!!!!!!! ZIO.foreach(m.tapSockets)(_.socketOutHub.publish(TapMessage(msg, plaintextMessage).toJson)) } @@ -137,12 +180,14 @@ case class MediatorAgent( } // TODO Store context of the decrypt unwarping // TODO SreceiveMessagetore context with MsgID and PIURI - protocolHandler <- ZIO.service[ProtocolExecuter[Services]] - ret <- protocolHandler - .execute(plaintextMessage) - .tapError(ex => ZIO.logError(s"Error when execute Protocol: $ex")) + ret <- { + maybeActionStorageError match + case Some(reply) => ActionUtils.packResponse(plaintextMessage, reply) + case None => protocolHandler.execute(plaintextMessage) + }.tapError(ex => ZIO.logError(s"Error when execute Protocol: $ex")) } yield ret } yield maybeSyncReplyMsg + xxx } .provideSomeLayer( /*resolverLayer ++ indentityLayer ++*/ protocolHandlerLayer) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala index 77b06ee4..013996f1 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala @@ -25,7 +25,7 @@ object PickupExecuter override def program[R1 <: UserAccountRepo & MessageItemRepo]( plaintextMessage: PlaintextMessage - ): ZIO[R1, MediatorError, Action] = { + ): ZIO[R1, MediatorError & StorageError, Action] = { // the val is from the match to be definitely stable val piuriStatusRequest = StatusRequest.piuri val piuriStatus = Status.piuri diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala index 41bfedc6..27c5c903 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala @@ -56,4 +56,21 @@ object Problems { escalate_to = None, // Option[String], ) + def storageError( + to: Set[TO], + from: FROM, + pthid: MsgID, + piuri: PIURI, + ) = ProblemReport( + // id: MsgID = MsgID(), + to = to, + from = from, // Can it be Option? + pthid = pthid, + ack = None, // Option[Seq[MsgID]], + code = ProblemCode.ErroFail("me", "res", "storage"), + comment = None, // Option[String], + args = None, // Option[Seq[String]], + escalate_to = None, // Option[String], + ) + } From 4e7e465034bd1f7684b6ae45d87b2ea3e824466d Mon Sep 17 00:00:00 2001 From: FabioPinheiro Date: Wed, 26 Jul 2023 16:15:03 +0100 Subject: [PATCH 3/9] Error for the pickup protocol before enroling --- TASKs.md | 4 +- .../mediator/protocols/PickupExecuter.scala | 95 +++++++++++-------- .../atala/mediator/protocols/Problems.scala | 59 ++++++++---- 3 files changed, 100 insertions(+), 58 deletions(-) diff --git a/TASKs.md b/TASKs.md index 1bfb43c6..6158edfa 100644 --- a/TASKs.md +++ b/TASKs.md @@ -25,6 +25,8 @@ Tasks: - (sync) e.p.crypto.unsupported - is message is tampering (any crypto error). - (sync & async) e.p.crypto.replay - if the message is replay (possible he replay attack). - (sync) e.p.req - pickup message before enroling. + - [QA] StatusRequest - https://didcomm.org/messagepickup/3.0/status-request + - [QA] DeliveryRequest - https://didcomm.org/messagepickup/3.0/delivery-request - (sync) e.p.me.res.storage - connection MongoBD is not working. - [QA] catch all StorageCollection Error - (sync) e.p.me.res.storage - business logic MongoBD is not working. @@ -36,7 +38,7 @@ Tasks: - [QA] MediateGrant - [QA] MediateDeny - [QA] KeylistResponse - - [QA] Status = https://didcomm.org/messagepickup/3.0/status + - [QA] Status - https://didcomm.org/messagepickup/3.0/status - [QA] LiveModeChange - https://didcomm.org/messagepickup/3.0/live-delivery-change - [TODO] ... - (sync) e.p.msg.unsupported - for parsing error due to unsupported version or protocol. diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala index 013996f1..35aa047c 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala @@ -48,22 +48,31 @@ object PickupExecuter repoDidAccount <- ZIO.service[UserAccountRepo] didRequestingMessages = m.from.asFROMTO mDidAccount <- repoDidAccount.getDidAccount(didRequestingMessages.toDID) - msgHash = mDidAccount match - case None => ??? // TODO FIXME - case Some(didAccount) => didAccount.messagesRef.filter(_.state == false).map(_.hash) - status = Status( - thid = m.id, - from = m.to.asFROM, - to = m.from.asTO, - recipient_did = m.recipient_did, - message_count = msgHash.size, - longest_waited_seconds = None, // TODO - newest_received_time = None, // TODO - oldest_received_time = None, // TODO - total_bytes = None, // TODO - live_delivery = None, // TODO - ) - } yield SyncReplyOnly(status.toPlaintextMessage) + ret = mDidAccount match + case None => + Problems + .notEnroledError( + from = m.to.asFROM, + to = m.from.asTO, + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + ) + .toPlaintextMessage + case Some(didAccount) => + val msgHash = didAccount.messagesRef.filter(_.state == false).map(_.hash) + Status( + thid = m.id, + from = m.to.asFROM, + to = m.from.asTO, + recipient_did = m.recipient_did, + message_count = msgHash.size, + longest_waited_seconds = None, // TODO + newest_received_time = None, // TODO + oldest_received_time = None, // TODO + total_bytes = None, // TODO + live_delivery = None, // TODO + ).toPlaintextMessage + } yield SyncReplyOnly(ret) case m: Status => ZIO.logInfo("Status") *> ZIO.succeed( @@ -85,27 +94,39 @@ object PickupExecuter repoDidAccount <- ZIO.service[UserAccountRepo] didRequestingMessages = m.from.asFROMTO mDidAccount <- repoDidAccount.getDidAccount(didRequestingMessages.toDID) - msgHash = mDidAccount match - case None => ??? // TODO ERROR - case Some(didAccount) => didAccount.messagesRef.filter(_.state == false).map(_.hash) - allMessagesFor <- repoMessageItem.findByIds(msgHash) - messagesToReturn = - if (m.recipient_did.isEmpty) allMessagesFor - else { - allMessagesFor.filterNot( - _.msg.recipientsSubject - .map(_.did) - .forall(e => !m.recipient_did.map(_.toDID.did).contains(e)) + ret <- mDidAccount match + case None => + ZIO.succeed( + Problems + .notEnroledError( + from = m.to.asFROM, + to = m.from.asTO, + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + ) + .toPlaintextMessage ) - } - deliveryRequest = MessageDelivery( - thid = m.id, - from = m.to.asFROM, - to = m.from.asTO, - recipient_did = m.recipient_did, - attachments = messagesToReturn.map(m => (m._id, m.msg)).toMap, - ) - } yield SyncReplyOnly(deliveryRequest.toPlaintextMessage) + case Some(didAccount) => + val msgHash = didAccount.messagesRef.filter(_.state == false).map(_.hash) + for { + allMessagesFor <- repoMessageItem.findByIds(msgHash) + messagesToReturn = + if (m.recipient_did.isEmpty) allMessagesFor + else { + allMessagesFor.filterNot( + _.msg.recipientsSubject + .map(_.did) + .forall(e => !m.recipient_did.map(_.toDID.did).contains(e)) + ) + } + } yield MessageDelivery( + thid = m.id, + from = m.to.asFROM, + to = m.from.asTO, + recipient_did = m.recipient_did, + attachments = messagesToReturn.map(m => (m._id, m.msg)).toMap, + ).toPlaintextMessage + } yield SyncReplyOnly(ret) case m: MessageDelivery => ZIO.logInfo("MessageDelivery") *> ZIO.succeed( @@ -133,7 +154,7 @@ object PickupExecuter ZIO.succeed( SyncReplyOnly( Problems - .protocolNotImplemented( + .protocolNotImplemented( // TODO from = m.to.asFROM, to = m.from.asTO, pthid = m.id, // TODO CHECK pthid diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala index 27c5c903..3c2aa62d 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala @@ -5,6 +5,8 @@ import fmgp.did.comm.* import fmgp.did.comm.protocol.reportproblem2.* object Problems { + val email = Some("atala@iohk.io") + def unsupportedProtocolType( to: Set[TO], from: FROM, @@ -13,13 +15,13 @@ object Problems { ) = ProblemReport( // id: MsgID = MsgID(), to = to, - from = from, // Can it be Option? + from = from, pthid = pthid, - ack = None, // Option[Seq[MsgID]], + ack = None, code = ProblemCode.ErroFail("msg", "unsupported"), - comment = None, // Option[String], - args = None, // Option[Seq[String]], - escalate_to = None, // Option[String], + comment = None, + args = None, + escalate_to = email, ) def unsupportedProtocolRole( @@ -30,13 +32,13 @@ object Problems { ) = ProblemReport( // id: MsgID = MsgID(), to = Set(to), - from = from, // Can it be Option? + from = from, pthid = pthid, - ack = None, // Option[Seq[MsgID]], + ack = None, code = ProblemCode.ErroFail("msg", "unsupported"), - comment = None, // Option[String], - args = None, // Option[Seq[String]], - escalate_to = None, // Option[String], + comment = None, + args = None, + escalate_to = email, ) def protocolNotImplemented( @@ -47,13 +49,13 @@ object Problems { ) = ProblemReport( // id: MsgID = MsgID(), to = Set(to), - from = from, // Can it be Option? + from = from, pthid = pthid, - ack = None, // Option[Seq[MsgID]], + ack = None, code = ProblemCode.ErroFail("msg", "unsupported"), - comment = None, // Option[String], - args = None, // Option[Seq[String]], - escalate_to = None, // Option[String], + comment = None, + args = None, + escalate_to = email, ) def storageError( @@ -64,13 +66,30 @@ object Problems { ) = ProblemReport( // id: MsgID = MsgID(), to = to, - from = from, // Can it be Option? + from = from, pthid = pthid, - ack = None, // Option[Seq[MsgID]], + ack = None, code = ProblemCode.ErroFail("me", "res", "storage"), - comment = None, // Option[String], - args = None, // Option[Seq[String]], - escalate_to = None, // Option[String], + comment = None, + args = None, + escalate_to = email, + ) + + def notEnroledError( + to: TO, + from: FROM, + pthid: MsgID, + piuri: PIURI, + ) = ProblemReport( + // id: MsgID = MsgID(), + to = Set(to), + from = from, + pthid = pthid, + ack = None, + code = ProblemCode.ErroFail("req"), + comment = Some("The DID '{1}' is not enroled."), + args = Some(Seq(to.value)), + escalate_to = email, ) } From 1f542d2ae2926cd4bd510d529b6bb61f4900d24b Mon Sep 17 00:00:00 2001 From: FabioPinheiro Date: Thu, 27 Jul 2023 17:19:47 +0100 Subject: [PATCH 4/9] Parameterize error in ProtocolExecuter --- .../scala/io/iohk/atala/mediator/Error.scala | 6 +++-- .../atala/mediator/actions/ActionUtils.scala | 2 +- .../mediator/actions/ProtocolExecute.scala | 23 +++++++++++-------- .../atala/mediator/app/MediatorAgent.scala | 18 +++++++++------ .../protocols/BasicMessageExecuter.scala | 5 ++-- .../protocols/ForwardMessageExecuter.scala | 8 +++---- .../MediatorCoordinationExecuter.scala | 2 +- .../protocols/MissingProtocolExecuter.scala | 2 +- .../protocols/NullProtocolExecuter.scala | 2 +- .../mediator/protocols/PickupExecuter.scala | 2 +- 10 files changed, 40 insertions(+), 30 deletions(-) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala b/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala index 36d17c33..a75f023b 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala @@ -5,7 +5,7 @@ import fmgp.did.* import fmgp.did.comm.* import zio.json.* -trait MediatorError +sealed trait MediatorError case class MediatorException(fail: MediatorError) extends Exception(fail.toString()) @@ -20,8 +20,9 @@ object MediatorThrowable { } // Storage +case class StorageException(fail: StorageError) extends Exception(fail.toString()) -trait StorageError extends MediatorError { +sealed trait StorageError { // extends MediatorError { def error: String } @@ -35,6 +36,7 @@ object StorageThrowable { def apply(throwable: Throwable) = new StorageThrowable(throwable.getClass.getName() + ":" + throwable.getMessage) } +// ProtocolError sealed trait ProtocolError extends MediatorError { def piuri: PIURI } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala index 4e94a349..d4ecb706 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala @@ -20,7 +20,7 @@ object ActionUtils { def packResponse( plaintextMessage: PlaintextMessage, action: Action - ): ZIO[Operations & Agent & (Resolver & MessageDispatcher), MediatorError, Option[EncryptedMessage]] = + ): ZIO[Operations & Agent & Resolver & MessageDispatcher, MediatorError, Option[EncryptedMessage]] = action match { case _: NoReply.type => ZIO.succeed(None) case action: AnyReply => diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala index dad9959b..47978ab4 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala @@ -16,21 +16,25 @@ import zio.json.* import io.iohk.atala.mediator.protocols.MissingProtocolExecuter //TODO pick a better name // maybe "Protocol" only -trait ProtocolExecuter[-R] { +trait ProtocolExecuter[-R, +E] { // <: MediatorError | StorageError] { def suportedPIURI: Seq[PIURI] /** @return can return a Sync Reply Msg */ - def execute[R1 <: R](plaintextMessage: PlaintextMessage): ZIO[R1, MediatorError, Option[EncryptedMessage]] = + def execute[R1 <: R]( + plaintextMessage: PlaintextMessage + ): ZIO[R1, E, Option[EncryptedMessage]] = program(plaintextMessage) *> ZIO.none - def program[R1 <: R](plaintextMessage: PlaintextMessage): ZIO[R1, MediatorError, Action] + def program[R1 <: R](plaintextMessage: PlaintextMessage): ZIO[R1, E, Action] } object ProtocolExecuter { type Services = Resolver & Agent & Operations & MessageDispatcher } -case class ProtocolExecuterCollection[-R <: Agent](executers: ProtocolExecuter[R]*) extends ProtocolExecuter[R] { +case class ProtocolExecuterCollection[-R <: Agent, +E]( // <: MediatorError | StorageError + executers: ProtocolExecuter[R, E]* +) extends ProtocolExecuter[R, E] { override def suportedPIURI: Seq[PIURI] = executers.flatMap(_.suportedPIURI) @@ -38,7 +42,7 @@ case class ProtocolExecuterCollection[-R <: Agent](executers: ProtocolExecuter[R override def execute[R1 <: R]( plaintextMessage: PlaintextMessage, - ): ZIO[R1, MediatorError, Option[EncryptedMessage]] = + ): ZIO[R1, E, Option[EncryptedMessage]] = selectExecutersFor(plaintextMessage.`type`) match // case None => NullProtocolExecuter.execute(plaintextMessage) case None => MissingProtocolExecuter.execute(plaintextMessage) @@ -46,19 +50,20 @@ case class ProtocolExecuterCollection[-R <: Agent](executers: ProtocolExecuter[R override def program[R1 <: R]( plaintextMessage: PlaintextMessage, - ): ZIO[R1, MediatorError, Action] = + ): ZIO[R1, E, Action] = selectExecutersFor(plaintextMessage.`type`) match // case None => NullProtocolExecuter.program(plaintextMessage) case None => MissingProtocolExecuter.program(plaintextMessage) case Some(px) => px.program(plaintextMessage) } -trait ProtocolExecuterWithServices[-R <: ProtocolExecuter.Services] extends ProtocolExecuter[R] { +trait ProtocolExecuterWithServices[-R <: ProtocolExecuter.Services] + extends ProtocolExecuter[R, MediatorError | StorageError] { override def execute[R1 <: R]( plaintextMessage: PlaintextMessage, // context: Context - ): ZIO[R1, MediatorError, Option[EncryptedMessage]] = + ): ZIO[R1, MediatorError | StorageError, Option[EncryptedMessage]] = program(plaintextMessage) .tap(v => ZIO.logDebug(v.toString)) // DEBUG .flatMap(action => ActionUtils.packResponse(plaintextMessage, action)) @@ -66,5 +71,5 @@ trait ProtocolExecuterWithServices[-R <: ProtocolExecuter.Services] extends Prot override def program[R1 <: R]( plaintextMessage: PlaintextMessage, // context: Context - ): ZIO[R1, MediatorError, Action] + ): ZIO[R1, MediatorError | StorageError, Action] } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala index f58fcd17..ee4b2b59 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala @@ -36,9 +36,10 @@ case class MediatorAgent( // DynamicResolver.resolverLayer(didSocketManager) type Services = Resolver & Agent & Operations & MessageDispatcher & UserAccountRepo & MessageItemRepo - val protocolHandlerLayer: URLayer[UserAccountRepo & MessageItemRepo, ProtocolExecuter[Services]] = + val protocolHandlerLayer + : URLayer[UserAccountRepo & MessageItemRepo, ProtocolExecuter[Services, MediatorError | StorageError]] = ZLayer.succeed( - ProtocolExecuterCollection[Services]( + ProtocolExecuterCollection[Services, MediatorError | StorageError]( BasicMessageExecuter, new TrustPingExecuter, MediatorCoordinationExecuter, @@ -86,7 +87,7 @@ case class MediatorAgent( mSocketID: Option[SocketID], ): ZIO[ Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo, - MediatorError, + MediatorError | StorageError, Option[EncryptedMessage] ] = for { @@ -107,7 +108,7 @@ case class MediatorAgent( mSocketID: Option[SocketID] ): ZIO[ Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo, - MediatorError, + MediatorError | StorageError, Option[EncryptedMessage] ] = ZIO @@ -120,7 +121,7 @@ case class MediatorAgent( else for { messageItemRepo <- ZIO.service[MessageItemRepo] - protocolHandler <- ZIO.service[ProtocolExecuter[Services]] + protocolHandler <- ZIO.service[ProtocolExecuter[Services, MediatorError | StorageError]] plaintextMessage <- decrypt(msg) maybeActionStorageError <- messageItemRepo .insert(MessageItem(msg)) // store all message @@ -209,7 +210,10 @@ case class MediatorAgent( DIDSocketManager .newMessage(ch, text) .flatMap { case (socketID, encryptedMessage) => receiveMessage(encryptedMessage, Some(socketID)) } - .mapError(ex => MediatorException(ex)) + .mapError { + case ex: MediatorError => MediatorException(ex) + case ex: StorageError => StorageException(ex) + } } case ChannelEvent(ch, ChannelEvent.ChannelUnregistered) => ZIO.logAnnotate(LogAnnotation(SOCKET_ID, ch.id), annotationMap: _*) { @@ -347,7 +351,7 @@ object MediatorAgent { // TODO [return_route extension](https://github.com/decentralized-identity/didcomm-messaging/blob/main/extensions/return_route/main.md) case req @ Method.POST -> !! => ZIO - .logError(s"Request Headers : ${req.headers.mkString(",")}") + .logError(s"Request Headers: ${req.headers.mkString(",")}") .as( Response .text(s"The content-type must be ${MediaTypes.SIGNED.typ} or ${MediaTypes.ENCRYPTED.typ}") diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/BasicMessageExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/BasicMessageExecuter.scala index a430cef9..9b607581 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/BasicMessageExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/BasicMessageExecuter.scala @@ -3,11 +3,11 @@ package io.iohk.atala.mediator.protocols import fmgp.crypto.error.FailToParse import fmgp.did.comm.{PIURI, PlaintextMessage} import fmgp.did.comm.protocol.basicmessage2.BasicMessage -import io.iohk.atala.mediator.{MediatorDidError, MediatorThrowable} +import io.iohk.atala.mediator.{MediatorError, MediatorDidError, MediatorThrowable} import io.iohk.atala.mediator.actions.{NoReply, ProtocolExecuter} import zio.{Console, ZIO} -object BasicMessageExecuter extends ProtocolExecuter[Any] { +object BasicMessageExecuter extends ProtocolExecuter[Any, MediatorError] { override def suportedPIURI: Seq[PIURI] = Seq(BasicMessage.piuri) override def program[R1 <: Any](plaintextMessage: PlaintextMessage) = for { @@ -15,4 +15,5 @@ object BasicMessageExecuter extends ProtocolExecuter[Any] { case Left(error) => ZIO.fail(MediatorDidError(FailToParse(error))) case Right(bm) => Console.printLine(bm.toString).mapError(ex => MediatorThrowable(ex)) } yield NoReply + } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala index e692c157..dc1fa5a0 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala @@ -5,22 +5,20 @@ import fmgp.did.* import fmgp.did.comm.* import fmgp.did.comm.protocol.* import fmgp.did.comm.protocol.routing2.* -import io.iohk.atala.mediator.MediatorError +import io.iohk.atala.mediator._ import io.iohk.atala.mediator.actions.* import io.iohk.atala.mediator.db.* import zio.* import zio.json.* object ForwardMessageExecuter - extends ProtocolExecuterWithServices[ - ProtocolExecuter.Services & UserAccountRepo & MessageItemRepo - ] { + extends ProtocolExecuterWithServices[ProtocolExecuter.Services & UserAccountRepo & MessageItemRepo] { override def suportedPIURI: Seq[PIURI] = Seq(ForwardMessage.piuri) override def program[R1 <: UserAccountRepo & MessageItemRepo]( plaintextMessage: PlaintextMessage - ): ZIO[R1, MediatorError, Action] = { + ): ZIO[R1, MediatorError | StorageError, Action] = { // the val is from the match to be definitely stable val piuriForwardMessage = ForwardMessage.piuri diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala index 0800b47c..b46ff956 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala @@ -25,7 +25,7 @@ object MediatorCoordinationExecuter extends ProtocolExecuterWithServices[Protoco override def program[R1 <: (UserAccountRepo)]( plaintextMessage: PlaintextMessage - ): ZIO[R1, MediatorError, Action] = { + ): ZIO[R1, MediatorError | StorageError, Action] = { // the val is from the match to be definitely stable val piuriMediateRequest = MediateRequest.piuri val piuriMediateGrant = MediateGrant.piuri diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MissingProtocolExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MissingProtocolExecuter.scala index 99e54771..40d481cb 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MissingProtocolExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MissingProtocolExecuter.scala @@ -8,7 +8,7 @@ import io.iohk.atala.mediator.MissingProtocolError import io.iohk.atala.mediator.actions.ProtocolExecuter import io.iohk.atala.mediator.actions.Reply -object MissingProtocolExecuter extends ProtocolExecuter[Agent] { +object MissingProtocolExecuter extends ProtocolExecuter[Agent, Nothing] { override def suportedPIURI = Seq() override def program[R1 <: Agent](plaintextMessage: PlaintextMessage) = diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/NullProtocolExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/NullProtocolExecuter.scala index 657c1b96..9013ca57 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/NullProtocolExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/NullProtocolExecuter.scala @@ -5,7 +5,7 @@ import io.iohk.atala.mediator.MissingProtocolError import io.iohk.atala.mediator.actions.ProtocolExecuter import zio.ZIO -object NullProtocolExecuter extends ProtocolExecuter[Any] { +object NullProtocolExecuter extends ProtocolExecuter[Any, MissingProtocolError] { override def suportedPIURI = Seq() override def program[R1 <: Any](plaintextMessage: PlaintextMessage) = diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala index 35aa047c..3d13b047 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala @@ -25,7 +25,7 @@ object PickupExecuter override def program[R1 <: UserAccountRepo & MessageItemRepo]( plaintextMessage: PlaintextMessage - ): ZIO[R1, MediatorError & StorageError, Action] = { + ): ZIO[R1, MediatorError | StorageError, Action] = { // the val is from the match to be definitely stable val piuriStatusRequest = StatusRequest.piuri val piuriStatus = Status.piuri From 1eee225db807fc42848712d3bdaa8d51ff71609f Mon Sep 17 00:00:00 2001 From: FabioPinheiro Date: Thu, 27 Jul 2023 18:57:05 +0100 Subject: [PATCH 5/9] Minor improvements for parameterize error in ProtocolExecuter --- .../atala/mediator/actions/ProtocolExecute.scala | 13 ++++++++----- .../mediator/protocols/ForwardMessageExecuter.scala | 7 +++++-- .../protocols/MediatorCoordinationExecuter.scala | 6 +++++- .../atala/mediator/protocols/PickupExecuter.scala | 7 +++++-- .../mediator/protocols/TrustPingExecuter.scala | 2 +- 5 files changed, 24 insertions(+), 11 deletions(-) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala index 47978ab4..1ce2c864 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala @@ -31,8 +31,9 @@ trait ProtocolExecuter[-R, +E] { // <: MediatorError | StorageError] { object ProtocolExecuter { type Services = Resolver & Agent & Operations & MessageDispatcher + type Erros = MediatorError | StorageError } -case class ProtocolExecuterCollection[-R <: Agent, +E]( // <: MediatorError | StorageError +case class ProtocolExecuterCollection[-R <: Agent, +E]( executers: ProtocolExecuter[R, E]* ) extends ProtocolExecuter[R, E] { @@ -57,13 +58,15 @@ case class ProtocolExecuterCollection[-R <: Agent, +E]( // <: MediatorError | St case Some(px) => px.program(plaintextMessage) } -trait ProtocolExecuterWithServices[-R <: ProtocolExecuter.Services] - extends ProtocolExecuter[R, MediatorError | StorageError] { +trait ProtocolExecuterWithServices[ + -R <: ProtocolExecuter.Services, + +E >: MediatorError // ProtocolExecuter.Erros +] extends ProtocolExecuter[R, E] { override def execute[R1 <: R]( plaintextMessage: PlaintextMessage, // context: Context - ): ZIO[R1, MediatorError | StorageError, Option[EncryptedMessage]] = + ): ZIO[R1, E, Option[EncryptedMessage]] = program(plaintextMessage) .tap(v => ZIO.logDebug(v.toString)) // DEBUG .flatMap(action => ActionUtils.packResponse(plaintextMessage, action)) @@ -71,5 +74,5 @@ trait ProtocolExecuterWithServices[-R <: ProtocolExecuter.Services] override def program[R1 <: R]( plaintextMessage: PlaintextMessage, // context: Context - ): ZIO[R1, MediatorError | StorageError, Action] + ): ZIO[R1, E, Action] } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala index dc1fa5a0..299ffc65 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala @@ -12,13 +12,16 @@ import zio.* import zio.json.* object ForwardMessageExecuter - extends ProtocolExecuterWithServices[ProtocolExecuter.Services & UserAccountRepo & MessageItemRepo] { + extends ProtocolExecuterWithServices[ + ProtocolExecuter.Services & UserAccountRepo & MessageItemRepo, + ProtocolExecuter.Erros + ] { override def suportedPIURI: Seq[PIURI] = Seq(ForwardMessage.piuri) override def program[R1 <: UserAccountRepo & MessageItemRepo]( plaintextMessage: PlaintextMessage - ): ZIO[R1, MediatorError | StorageError, Action] = { + ): ZIO[R1, ProtocolExecuter.Erros, Action] = { // the val is from the match to be definitely stable val piuriForwardMessage = ForwardMessage.piuri diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala index b46ff956..e63acb8e 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala @@ -11,7 +11,11 @@ import io.iohk.atala.mediator.actions.* import io.iohk.atala.mediator.db.UserAccountRepo import zio.* import zio.json.* -object MediatorCoordinationExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Services & UserAccountRepo] { +object MediatorCoordinationExecuter + extends ProtocolExecuterWithServices[ + ProtocolExecuter.Services & UserAccountRepo, + ProtocolExecuter.Erros + ] { override def suportedPIURI: Seq[PIURI] = Seq( MediateRequest.piuri, diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala index 3d13b047..c6625c32 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala @@ -12,7 +12,10 @@ import io.iohk.atala.mediator.db.* import zio.* import zio.json.* object PickupExecuter - extends ProtocolExecuterWithServices[ProtocolExecuter.Services & UserAccountRepo & MessageItemRepo] { + extends ProtocolExecuterWithServices[ + ProtocolExecuter.Services & UserAccountRepo & MessageItemRepo, + ProtocolExecuter.Erros + ] { override def suportedPIURI: Seq[PIURI] = Seq( StatusRequest.piuri, @@ -25,7 +28,7 @@ object PickupExecuter override def program[R1 <: UserAccountRepo & MessageItemRepo]( plaintextMessage: PlaintextMessage - ): ZIO[R1, MediatorError | StorageError, Action] = { + ): ZIO[R1, StorageError, Action] = { // the val is from the match to be definitely stable val piuriStatusRequest = StatusRequest.piuri val piuriStatus = Status.piuri diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/TrustPingExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/TrustPingExecuter.scala index 49c27510..4b973c4b 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/TrustPingExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/TrustPingExecuter.scala @@ -13,7 +13,7 @@ import io.iohk.atala.mediator.{MediatorDidError, MediatorError} import io.iohk.atala.mediator.actions.{Action, NoReply, ProtocolExecuter, ProtocolExecuterWithServices, Reply} import zio.ZIO -class TrustPingExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Services] { +class TrustPingExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Services, MediatorError] { override def suportedPIURI: Seq[PIURI] = Seq(TrustPing.piuri, TrustPingResponse.piuri) From 8ba775c1c2bff3348c9ffd392e66646d66c332b0 Mon Sep 17 00:00:00 2001 From: FabioPinheiro Date: Fri, 28 Jul 2023 16:01:01 +0100 Subject: [PATCH 6/9] Problem Report for Forward message when next DID is not enrolled --- TASKs.md | 2 + .../protocols/ForwardMessageExecuter.scala | 60 ++++++++++++------- .../mediator/protocols/PickupExecuter.scala | 4 +- .../atala/mediator/protocols/Problems.scala | 8 +-- 4 files changed, 48 insertions(+), 26 deletions(-) diff --git a/TASKs.md b/TASKs.md index 6158edfa..d6f9d5ef 100644 --- a/TASKs.md +++ b/TASKs.md @@ -44,6 +44,7 @@ Tasks: - (sync) e.p.msg.unsupported - for parsing error due to unsupported version or protocol. - [QA] MissingProtocolExecuter (unsupported protocol it also works fine for unsupported versions) - (sync & async) e.p.req.not_enroll - Get a Forward message to a DID that is not enrolled. + - [QA] Send Problem Report if the next DID is not enrolled in the Mediator. - (sync & async) e.p.me - catch all error at the end. - Receive a problem report (1w): - in case of Warnings Reply `w.p` -> log warnings and escalate to an error `e.p` on the reply @@ -80,3 +81,4 @@ Tasks: - escalate_to must be configurable (1d) - [optional] update the protocol with new tokens (2d) - `e.p.me.res.storage` + - `e.p.me.res.not_enroll` diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala index 299ffc65..92b71884 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala @@ -10,6 +10,7 @@ import io.iohk.atala.mediator.actions.* import io.iohk.atala.mediator.db.* import zio.* import zio.json.* +import fmgp.did.comm.protocol.reportproblem2.ProblemReport object ForwardMessageExecuter extends ProtocolExecuterWithServices[ @@ -19,31 +20,50 @@ object ForwardMessageExecuter override def suportedPIURI: Seq[PIURI] = Seq(ForwardMessage.piuri) - override def program[R1 <: UserAccountRepo & MessageItemRepo]( + override def program[R1 <: UserAccountRepo & MessageItemRepo & Agent]( plaintextMessage: PlaintextMessage ): ZIO[R1, ProtocolExecuter.Erros, Action] = { // the val is from the match to be definitely stable val piuriForwardMessage = ForwardMessage.piuri - (plaintextMessage.`type` match { - case `piuriForwardMessage` => plaintextMessage.toForwardMessage - }).map { case m: ForwardMessage => - for { - _ <- ZIO.logInfo("ForwardMessage") - repoMessageItem <- ZIO.service[MessageItemRepo] - repoDidAccount <- ZIO.service[UserAccountRepo] - recipientsSubject = Set(m.next) // m.msg.recipientsSubject - numbreOfUpdated <- repoDidAccount.addToInboxes(recipientsSubject, m.msg) - msg <- - if (numbreOfUpdated > 0) { // Or maybe we can add all the time - repoMessageItem.insert(MessageItem(m.msg)) *> - ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo") // TODO change to debug level - } else - ZIO.logWarning("Note: No update on the DidAccount of the recipients") - } yield None - } match - case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) - case Right(program) => program *> ZIO.succeed(NoReply) + (plaintextMessage.`type` match { case `piuriForwardMessage` => plaintextMessage.toForwardMessage }) match + case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) + case Right(m: ForwardMessage) => + for { + _ <- ZIO.logInfo("ForwardMessage") + repoMessageItem <- ZIO.service[MessageItemRepo] + repoDidAccount <- ZIO.service[UserAccountRepo] + recipientsSubject = Set(m.next) // m.msg.recipientsSubject + numbreOfUpdated <- repoDidAccount.addToInboxes(recipientsSubject, m.msg) + msg <- + if (numbreOfUpdated > 0) { // Or maybe we can add all the time + for { + _ <- repoMessageItem.insert(MessageItem(m.msg)) + _ <- ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo") + } yield NoReply + } else { + for { + _ <- ZIO.logWarning("Note: No update on the DidAccount of the recipients") + agent <- ZIO.service[Agent] + problem = plaintextMessage.from match { + case Some(to) => + Problems.notEnroledError( + to = Some(to.asTO), + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + case None => + Problems.notEnroledError( + to = None, + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + } + } yield Reply(problem.toPlaintextMessage) + } + } yield msg } } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala index c6625c32..258ba98a 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala @@ -56,7 +56,7 @@ object PickupExecuter Problems .notEnroledError( from = m.to.asFROM, - to = m.from.asTO, + to = Some(m.from.asTO), pthid = m.id, // TODO CHECK pthid piuri = m.piuri, ) @@ -103,7 +103,7 @@ object PickupExecuter Problems .notEnroledError( from = m.to.asFROM, - to = m.from.asTO, + to = Some(m.from.asTO), pthid = m.id, // TODO CHECK pthid piuri = m.piuri, ) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala index 3c2aa62d..1e2e50a1 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala @@ -76,19 +76,19 @@ object Problems { ) def notEnroledError( - to: TO, + to: Option[TO], from: FROM, pthid: MsgID, piuri: PIURI, ) = ProblemReport( // id: MsgID = MsgID(), - to = Set(to), + to = to.toSet, from = from, pthid = pthid, ack = None, - code = ProblemCode.ErroFail("req"), + code = ProblemCode.ErroFail("req", "not_enroll"), comment = Some("The DID '{1}' is not enroled."), - args = Some(Seq(to.value)), + args = Some(to.map(_.value).toSeq), escalate_to = email, ) From 6ec26175e3c1a054241d56cd95c0078971f68c7e Mon Sep 17 00:00:00 2001 From: FabioPinheiro Date: Fri, 28 Jul 2023 20:25:19 +0100 Subject: [PATCH 7/9] All parsing errors from the decrypt function --- TASKs.md | 7 +- .../atala/mediator/actions/ActionUtils.scala | 6 +- .../mediator/actions/ProtocolExecute.scala | 2 +- .../atala/mediator/app/MediatorAgent.scala | 140 +++++++++--------- 4 files changed, 84 insertions(+), 71 deletions(-) diff --git a/TASKs.md b/TASKs.md index d6f9d5ef..c1c040cd 100644 --- a/TASKs.md +++ b/TASKs.md @@ -22,7 +22,9 @@ Tasks: - Store messages when sending (1w) - Catch Errors and send Problem Reports (1w): - (sync) e.p.crypto - is message is tampering (any crypto error). + - [WIP] - (sync) e.p.crypto.unsupported - is message is tampering (any crypto error). + - [WIP] - (sync & async) e.p.crypto.replay - if the message is replay (possible he replay attack). - (sync) e.p.req - pickup message before enroling. - [QA] StatusRequest - https://didcomm.org/messagepickup/3.0/status-request @@ -34,6 +36,8 @@ Tasks: - (sync) e.p.did - for any DID method that is not `did.peer`. - (sync) e.p.did.malformed - for any DID method malformed. - (sync) e.p.msg - for parsing error from the message. + - [QA] All parsing errors from the decrypt function + - [TODO] parsing for a specific data model of each protocol - (sync) e.p.msg.unsupported - for the message type LiveModeChange and all message that is not role of the mediator - [QA] MediateGrant - [QA] MediateDeny @@ -44,8 +48,9 @@ Tasks: - (sync) e.p.msg.unsupported - for parsing error due to unsupported version or protocol. - [QA] MissingProtocolExecuter (unsupported protocol it also works fine for unsupported versions) - (sync & async) e.p.req.not_enroll - Get a Forward message to a DID that is not enrolled. - - [QA] Send Problem Report if the next DID is not enrolled in the Mediator. + - [QA] Send a Problem Report if the next DID is not enrolled in the Mediator. - (sync & async) e.p.me - catch all error at the end. + - [WIP] - Receive a problem report (1w): - in case of Warnings Reply `w.p` -> log warnings and escalate to an error `e.p` on the reply - in case of Error `e.p` -> log error diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala index d4ecb706..ba30f3f8 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala @@ -18,7 +18,7 @@ import io.iohk.atala.mediator.protocols.MissingProtocolExecuter object ActionUtils { def packResponse( - plaintextMessage: PlaintextMessage, + plaintextMessage: Option[PlaintextMessage], action: Action ): ZIO[Operations & Agent & Resolver & MessageDispatcher, MediatorError, Option[EncryptedMessage]] = action match { @@ -80,9 +80,9 @@ object ActionUtils { .succeed(msg) .when( { - plaintextMessage.return_route.contains(ReturnRoute.all) + plaintextMessage.map(_.return_route).contains(ReturnRoute.all) && { - plaintextMessage.from.map(_.asTO) match { + plaintextMessage.flatMap(_.from.map(_.asTO)) match { case None => false case Some(replyTo) => send2DIDs.contains(replyTo) } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala index 1ce2c864..bf122039 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala @@ -69,7 +69,7 @@ trait ProtocolExecuterWithServices[ ): ZIO[R1, E, Option[EncryptedMessage]] = program(plaintextMessage) .tap(v => ZIO.logDebug(v.toString)) // DEBUG - .flatMap(action => ActionUtils.packResponse(plaintextMessage, action)) + .flatMap(action => ActionUtils.packResponse(Some(plaintextMessage), action)) override def program[R1 <: R]( plaintextMessage: PlaintextMessage, diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala index ee4b2b59..9c0b8eda 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala @@ -6,6 +6,7 @@ import fmgp.did.* import fmgp.did.comm.* import fmgp.did.comm.protocol.* import fmgp.did.comm.protocol.oobinvitation.OOBInvitation +import fmgp.did.comm.protocol.reportproblem2.ProblemReport import io.iohk.atala.mediator.* import io.iohk.atala.mediator.actions.* import io.iohk.atala.mediator.comm.* @@ -52,7 +53,7 @@ case class MediatorAgent( MessageDispatcherJVM.layer.mapError(ex => MediatorThrowable(ex)) // TODO move to another place & move validations and build a contex - def decrypt(msg: Message): ZIO[Agent & Resolver & Operations, MediatorError, PlaintextMessage] = + def decrypt(msg: Message): ZIO[Agent & Resolver & Operations, MediatorError | ProblemReport, PlaintextMessage] = { for { ops <- ZIO.service[Operations] plaintextMessage <- msg match @@ -81,6 +82,7 @@ case class MediatorAgent( case Right(msg2) => decrypt(msg2) } } yield (plaintextMessage) + } def receiveMessage( data: String, @@ -113,82 +115,88 @@ case class MediatorAgent( ] = ZIO .logAnnotate("msgHash", msg.sha1) { - val xxx = for { + for { _ <- ZIO.log("receivedMessage") maybeSyncReplyMsg <- if (!msg.recipientsSubject.contains(id)) ZIO.logError(s"This mediator '${id.string}' is not a recipient") *> ZIO.none else - for { - messageItemRepo <- ZIO.service[MessageItemRepo] - protocolHandler <- ZIO.service[ProtocolExecuter[Services, MediatorError | StorageError]] - plaintextMessage <- decrypt(msg) - maybeActionStorageError <- messageItemRepo - .insert(MessageItem(msg)) // store all message - .map(_ /*WriteResult*/ => None - // TODO messages already on the database -> so this might be a replay attack - ) - .catchSome { - case StorageCollection(error) => - // This deals with connection errors to the database. - ZIO.logWarning(s"Error StorageCollection: $error") *> - ZIO - .service[Agent] - .map(agent => - Some( - Reply( - Problems - .storageError( - to = plaintextMessage.from.map(_.asTO).toSet, - from = agent.id, - pthid = plaintextMessage.id, - piuri = plaintextMessage.`type`, - ) - .toPlaintextMessage + { + for { + messageItemRepo <- ZIO.service[MessageItemRepo] + protocolHandler <- ZIO.service[ProtocolExecuter[Services, MediatorError | StorageError]] + plaintextMessage <- decrypt(msg) + maybeActionStorageError <- messageItemRepo + .insert(MessageItem(msg)) // store all message + .map(_ /*WriteResult*/ => None + // TODO messages already on the database -> so this might be a replay attack + ) + .catchSome { + case StorageCollection(error) => + // This deals with connection errors to the database. + ZIO.logWarning(s"Error StorageCollection: $error") *> + ZIO + .service[Agent] + .map(agent => + Some( + Reply( + Problems + .storageError( + to = plaintextMessage.from.map(_.asTO).toSet, + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + .toPlaintextMessage + ) ) ) - ) - case StorageThrowable(error) => - ZIO.logWarning(s"Error StorageThrowable: $error") *> - ZIO - .service[Agent] - .map(agent => - Some( - Reply( - Problems - .storageError( - to = plaintextMessage.from.map(_.asTO).toSet, - from = agent.id, - pthid = plaintextMessage.id, - piuri = plaintextMessage.`type`, - ) - .toPlaintextMessage + case StorageThrowable(error) => + ZIO.logWarning(s"Error StorageThrowable: $error") *> + ZIO + .service[Agent] + .map(agent => + Some( + Reply( + Problems + .storageError( + to = plaintextMessage.from.map(_.asTO).toSet, + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + .toPlaintextMessage + ) ) ) - ) + } + _ <- didSocketManager.get.flatMap { m => // TODO HACK REMOVE !!!!!!!!!!!!!!!!!!!!!!!! + ZIO.foreach(m.tapSockets)(_.socketOutHub.publish(TapMessage(msg, plaintextMessage).toJson)) } - _ <- didSocketManager.get.flatMap { m => // TODO HACK REMOVE !!!!!!!!!!!!!!!!!!!!!!!! - ZIO.foreach(m.tapSockets)(_.socketOutHub.publish(TapMessage(msg, plaintextMessage).toJson)) - } - _ <- mSocketID match - case None => ZIO.unit - case Some(socketID) => - plaintextMessage.from match - case None => ZIO.unit - case Some(from) => - didSocketManager.update { - _.link(from.asFROMTO, socketID) - } - // TODO Store context of the decrypt unwarping - // TODO SreceiveMessagetore context with MsgID and PIURI - ret <- { - maybeActionStorageError match - case Some(reply) => ActionUtils.packResponse(plaintextMessage, reply) - case None => protocolHandler.execute(plaintextMessage) - }.tapError(ex => ZIO.logError(s"Error when execute Protocol: $ex")) - } yield ret + _ <- mSocketID match + case None => ZIO.unit + case Some(socketID) => + plaintextMessage.from match + case None => ZIO.unit + case Some(from) => + didSocketManager.update { + _.link(from.asFROMTO, socketID) + } + // TODO Store context of the decrypt unwarping + // TODO SreceiveMessagetore context with MsgID and PIURI + ret <- { + maybeActionStorageError match + case Some(reply) => ActionUtils.packResponse(Some(plaintextMessage), reply) + case None => protocolHandler.execute(plaintextMessage) + }.tapError(ex => ZIO.logError(s"Error when execute Protocol: $ex")) + } yield ret + }.catchAll { + case ex: MediatorError => ZIO.fail(ex) + case pr: ProblemReport => ActionUtils.packResponse(None, Reply(pr.toPlaintextMessage)) + case ex: StorageCollection => ZIO.fail(ex) + case ex: StorageThrowable => ZIO.fail(ex) + } } yield maybeSyncReplyMsg - xxx } .provideSomeLayer( /*resolverLayer ++ indentityLayer ++*/ protocolHandlerLayer) From 52b6ed4ccf857510fefad28c3af57e1891eb0be1 Mon Sep 17 00:00:00 2001 From: FabioPinheiro Date: Wed, 2 Aug 2023 16:14:46 +0100 Subject: [PATCH 8/9] cleanup & live-mode-not-supported --- TASKs.md | 89 ------------------- build.sbt | 2 +- .../atala/mediator/actions/ActionUtils.scala | 2 +- .../mediator/protocols/PickupExecuter.scala | 7 +- .../atala/mediator/protocols/Problems.scala | 17 ++++ 5 files changed, 22 insertions(+), 95 deletions(-) delete mode 100644 TASKs.md diff --git a/TASKs.md b/TASKs.md deleted file mode 100644 index c1c040cd..00000000 --- a/TASKs.md +++ /dev/null @@ -1,89 +0,0 @@ -# ATL-5236 - Error Handling (PRISM Agent and Mediator) - -https://input-output.atlassian.net/browse/ATL-5236 - -https://identity.foundation/didcomm-messaging/spec/#problem-reports - - -On each step of all our protocols processing, when something wrong is happening, we need to: -Goals -- Update the record to a documented error state -- Log the error in the service logs -- Send the problem report message when appropriate - -Goal other: error recover/resilient -- [optional] Send event that record state changed to error -- Decide on the policy of re-trying sending errors (one of the proposals is just to send it once, and if a recipient did not get this, then it’s on its own requesting record ID state) - - - -Tasks: -- (2W * 2Dev) Mediator (Note: most errors in Mediator will be synchronous) - - Store messages when sending (1w) - - Catch Errors and send Problem Reports (1w): - - (sync) e.p.crypto - is message is tampering (any crypto error). - - [WIP] - - (sync) e.p.crypto.unsupported - is message is tampering (any crypto error). - - [WIP] - - (sync & async) e.p.crypto.replay - if the message is replay (possible he replay attack). - - (sync) e.p.req - pickup message before enroling. - - [QA] StatusRequest - https://didcomm.org/messagepickup/3.0/status-request - - [QA] DeliveryRequest - https://didcomm.org/messagepickup/3.0/delivery-request - - (sync) e.p.me.res.storage - connection MongoBD is not working. - - [QA] catch all StorageCollection Error - - (sync) e.p.me.res.storage - business logic MongoBD is not working. - - [QA] catch all StorageThrowable - - (sync) e.p.did - for any DID method that is not `did.peer`. - - (sync) e.p.did.malformed - for any DID method malformed. - - (sync) e.p.msg - for parsing error from the message. - - [QA] All parsing errors from the decrypt function - - [TODO] parsing for a specific data model of each protocol - - (sync) e.p.msg.unsupported - for the message type LiveModeChange and all message that is not role of the mediator - - [QA] MediateGrant - - [QA] MediateDeny - - [QA] KeylistResponse - - [QA] Status - https://didcomm.org/messagepickup/3.0/status - - [QA] LiveModeChange - https://didcomm.org/messagepickup/3.0/live-delivery-change - - [TODO] ... - - (sync) e.p.msg.unsupported - for parsing error due to unsupported version or protocol. - - [QA] MissingProtocolExecuter (unsupported protocol it also works fine for unsupported versions) - - (sync & async) e.p.req.not_enroll - Get a Forward message to a DID that is not enrolled. - - [QA] Send a Problem Report if the next DID is not enrolled in the Mediator. - - (sync & async) e.p.me - catch all error at the end. - - [WIP] - - Receive a problem report (1w): - - in case of Warnings Reply `w.p` -> log warnings and escalate to an error `e.p` on the reply - - in case of Error `e.p` -> log error - -- (4/6W * 2Dev) PRISM Agent (Note: most error in PRISM Agent will be asynchronous) - - Store DID Comm messages in a searchable way (4W) - - Store receiving messages (in a searchable way) - - Store sending messages (in a searchable way) - - both plaintext and encrypted - - [optional] Maybe later MongoBD for PRISM Agent - - Implement Problem Reports in Mercury (DONE??) - - Catch all the Errors and send Problem Reports: (4W) - - e.p.xfer - if it found nobody listening on the specified port. (service endpoint unavailable) - - (async) e.p.crypto - is message is tampering (any crypto error). - - (async) e.p.crypto.unsupported - is message is tampering (any crypto error). - - (sync & async) e.p.crypto.replay - if the message is replay (possible he replay attack). - - (async) e.p.me.res.storage - connection MongoBD is not working. - - (async) e.p.me.res.storage - business logic MongoBD is not working. - - (async) e.p.did - for any DID method that is not `did.peer`. - - (async) e.p.did.malformed - for any DID method malformed. - - (async) e.p.msg - for parsing errors from the message. - - (async) e.p.msg.unsupported - for the message with the wrong role of the agent. - - (async) e.p.msg.unsupported - for parsing error due to unsupported version or protocol. - - (sync & async) e.p.me - catch all error at the end. - - (async) TODO `w.m` - - Receive e Problem Reports (1W): - - support for scope `m` -> state update (of the protocol execution) - - support for scope `p` -> state update (and fail protocol execution) - -- Other task (not for this ticket) - - Traceability of the MsgID of the Problem Report to the original error (2d) -> ATL-4147 - - [optional] Log - https://input-output.atlassian.net/browse/ATL-4147 - - escalate_to must be configurable (1d) - - [optional] update the protocol with new tokens (2d) - - `e.p.me.res.storage` - - `e.p.me.res.not_enroll` diff --git a/build.sbt b/build.sbt index c0d62393..c814c63a 100644 --- a/build.sbt +++ b/build.sbt @@ -9,7 +9,7 @@ inThisBuild( /** Versions */ lazy val V = new { - val scalaDID = "0.1.0-M6+0-b02d2b9e+20230724-1637-SNAPSHOT" + val scalaDID = "0.1.0-M7" // val scalajsJavaSecureRandom = "1.0.0" // FIXME another bug in the test framework https://github.com/scalameta/munit/issues/554 diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala index ba30f3f8..12083c34 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala @@ -60,7 +60,7 @@ object ActionUtils { messageDispatcher .send( msg, - url, // "http://localhost:8080", // FIXME REMOVE (use for local env) + url, None // url match // FIXME REMOVE (use for local env) // case http if http.startsWith("http://") => Some(url.drop(7).split(':').head.split('/').head) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala index 258ba98a..e74d9fcd 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala @@ -153,20 +153,19 @@ object PickupExecuter ) } yield NoReply case m: LiveModeChange => - ZIO.logWarning("LiveModeChange not implemented") *> + ZIO.logInfo("LiveModeChange Not Supported") *> ZIO.succeed( SyncReplyOnly( Problems - .protocolNotImplemented( // TODO + .liveModeNotSupported( from = m.to.asFROM, to = m.from.asTO, - pthid = m.id, // TODO CHECK pthid + pthid = m.id, piuri = m.piuri, ) .toPlaintextMessage ) ) - } match case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) case Right(program) => program diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala index 1e2e50a1..62673a99 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala @@ -58,6 +58,23 @@ object Problems { escalate_to = email, ) + def liveModeNotSupported( + to: TO, + from: FROM, + pthid: MsgID, + piuri: PIURI, + ) = ProblemReport( + // id: MsgID = MsgID(), + to = Set(to), + from = from, + pthid = pthid, + ack = None, + code = ProblemCode.ErroUndo("live-mode-not-supported"), + comment = Some("Connection does not support Live Delivery"), + args = None, + escalate_to = email, + ) + def storageError( to: Set[TO], from: FROM, From e98449c1c03fd35cf2fb68b551025c3a48964215 Mon Sep 17 00:00:00 2001 From: FabioPinheiro Date: Fri, 4 Aug 2023 16:08:01 +0100 Subject: [PATCH 9/9] file Mediator-Error_Handling.md --- Mediator-Error_Handling.md | 65 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 Mediator-Error_Handling.md diff --git a/Mediator-Error_Handling.md b/Mediator-Error_Handling.md new file mode 100644 index 00000000..7266455a --- /dev/null +++ b/Mediator-Error_Handling.md @@ -0,0 +1,65 @@ +# Error Handling + + +More info see https://input-output.atlassian.net/browse/ATL-5236 + +https://identity.foundation/didcomm-messaging/spec/#problem-reports + + +On each step of all our protocols processing, when something wrong is happening, we need to: +Goals +- Update the record to a documented error state +- Log the error in the service logs +- Send the problem report message when appropriate + +Goal other: error recover/resilient +- [optional] Send event that record state changed to error +- Decide on the policy of re-trying sending errors (one of the proposals is just to send it once, and if a recipient did not get this, then it’s on its own requesting record ID state) + + + +Note: most errors in Mediator will be synchronous + +- Store messages when sending (1w) +- Catch Errors and send Problem Reports (1w): + - (sync) e.p.crypto - is message is tampering (any crypto error). + - [WIP] + - (sync) e.p.crypto.unsupported - is message is tampering (any crypto error). + - [WIP] + - (sync & async) e.p.crypto.replay - if the message is replay (possible he replay attack). + - (sync) e.p.req - pickup message before enroling. + - [QA] StatusRequest - https://didcomm.org/messagepickup/3.0/status-request + - [QA] DeliveryRequest - https://didcomm.org/messagepickup/3.0/delivery-request + - (sync) e.p.me.res.storage - connection MongoBD is not working. + - [QA] catch all StorageCollection Error + - (sync) e.p.me.res.storage - business logic MongoBD is not working. + - [QA] catch all StorageThrowable + - (sync) e.p.did - for any DID method that is not `did.peer`. + - (sync) e.p.did.malformed - for any DID method malformed. + - (sync) e.p.msg - for parsing error from the message. + - [QA] All parsing errors from the decrypt function + - [TODO] parsing for a specific data model of each protocol + - (sync) e.p.msg.unsupported - for the message type LiveModeChange and all message that is not role of the mediator + - [QA] MediateGrant + - [QA] MediateDeny + - [QA] KeylistResponse + - [QA] Status - https://didcomm.org/messagepickup/3.0/status + - [TODO] ... + - LiveModeChange Not Supported + - [QA] "e.m.live-mode-not-supported" - https://didcomm.org/messagepickup/3.0/live-delivery-change + - (sync) e.p.msg.unsupported - for parsing error due to unsupported version or protocol. + - [QA] MissingProtocolExecuter (unsupported protocol it also works fine for unsupported versions) + - (sync & async) e.p.req.not_enroll - Get a Forward message to a DID that is not enrolled. + - [QA] Send a Problem Report if the next DID is not enrolled in the Mediator. + - (sync & async) e.p.me - catch all error at the end. + - [WIP] +- Receive a problem report (1w): +- in case of Warnings Reply `w.p` -> log warnings and escalate to an error `e.p` on the reply +- in case of Error `e.p` -> log error + +- Traceability of the MsgID of the Problem Report to the original error (2d) -> ATL-4147 +- [optional] Log - https://input-output.atlassian.net/browse/ATL-4147 +- escalate_to must be configurable (1d) +- [optional] update the protocol with new tokens (2d) + - `e.p.me.res.storage` + - `e.p.me.res.not_enroll`