diff --git a/Mediator-Error_Handling.md b/Mediator-Error_Handling.md new file mode 100644 index 00000000..7266455a --- /dev/null +++ b/Mediator-Error_Handling.md @@ -0,0 +1,65 @@ +# Error Handling + + +More info see https://input-output.atlassian.net/browse/ATL-5236 + +https://identity.foundation/didcomm-messaging/spec/#problem-reports + + +On each step of all our protocols processing, when something wrong is happening, we need to: +Goals +- Update the record to a documented error state +- Log the error in the service logs +- Send the problem report message when appropriate + +Goal other: error recover/resilient +- [optional] Send event that record state changed to error +- Decide on the policy of re-trying sending errors (one of the proposals is just to send it once, and if a recipient did not get this, then it’s on its own requesting record ID state) + + + +Note: most errors in Mediator will be synchronous + +- Store messages when sending (1w) +- Catch Errors and send Problem Reports (1w): + - (sync) e.p.crypto - is message is tampering (any crypto error). + - [WIP] + - (sync) e.p.crypto.unsupported - is message is tampering (any crypto error). + - [WIP] + - (sync & async) e.p.crypto.replay - if the message is replay (possible he replay attack). + - (sync) e.p.req - pickup message before enroling. + - [QA] StatusRequest - https://didcomm.org/messagepickup/3.0/status-request + - [QA] DeliveryRequest - https://didcomm.org/messagepickup/3.0/delivery-request + - (sync) e.p.me.res.storage - connection MongoBD is not working. + - [QA] catch all StorageCollection Error + - (sync) e.p.me.res.storage - business logic MongoBD is not working. + - [QA] catch all StorageThrowable + - (sync) e.p.did - for any DID method that is not `did.peer`. + - (sync) e.p.did.malformed - for any DID method malformed. + - (sync) e.p.msg - for parsing error from the message. + - [QA] All parsing errors from the decrypt function + - [TODO] parsing for a specific data model of each protocol + - (sync) e.p.msg.unsupported - for the message type LiveModeChange and all message that is not role of the mediator + - [QA] MediateGrant + - [QA] MediateDeny + - [QA] KeylistResponse + - [QA] Status - https://didcomm.org/messagepickup/3.0/status + - [TODO] ... + - LiveModeChange Not Supported + - [QA] "e.m.live-mode-not-supported" - https://didcomm.org/messagepickup/3.0/live-delivery-change + - (sync) e.p.msg.unsupported - for parsing error due to unsupported version or protocol. + - [QA] MissingProtocolExecuter (unsupported protocol it also works fine for unsupported versions) + - (sync & async) e.p.req.not_enroll - Get a Forward message to a DID that is not enrolled. + - [QA] Send a Problem Report if the next DID is not enrolled in the Mediator. + - (sync & async) e.p.me - catch all error at the end. + - [WIP] +- Receive a problem report (1w): +- in case of Warnings Reply `w.p` -> log warnings and escalate to an error `e.p` on the reply +- in case of Error `e.p` -> log error + +- Traceability of the MsgID of the Problem Report to the original error (2d) -> ATL-4147 +- [optional] Log - https://input-output.atlassian.net/browse/ATL-4147 +- escalate_to must be configurable (1d) +- [optional] update the protocol with new tokens (2d) + - `e.p.me.res.storage` + - `e.p.me.res.not_enroll` diff --git a/build.sbt b/build.sbt index 1d961fe1..f724bb04 100644 --- a/build.sbt +++ b/build.sbt @@ -9,7 +9,7 @@ inThisBuild( /** Versions */ lazy val V = new { - val scalaDID = "0.1.0-M6" + val scalaDID = "0.1.0-M7" // val scalajsJavaSecureRandom = "1.0.0" // FIXME another bug in the test framework https://github.com/scalameta/munit/issues/554 diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala b/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala index 36d17c33..a75f023b 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala @@ -5,7 +5,7 @@ import fmgp.did.* import fmgp.did.comm.* import zio.json.* -trait MediatorError +sealed trait MediatorError case class MediatorException(fail: MediatorError) extends Exception(fail.toString()) @@ -20,8 +20,9 @@ object MediatorThrowable { } // Storage +case class StorageException(fail: StorageError) extends Exception(fail.toString()) -trait StorageError extends MediatorError { +sealed trait StorageError { // extends MediatorError { def error: String } @@ -35,6 +36,7 @@ object StorageThrowable { def apply(throwable: Throwable) = new StorageThrowable(throwable.getClass.getName() + ":" + throwable.getMessage) } +// ProtocolError sealed trait ProtocolError extends MediatorError { def piuri: PIURI } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala new file mode 100644 index 00000000..12083c34 --- /dev/null +++ b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala @@ -0,0 +1,94 @@ +package io.iohk.atala.mediator.actions + +import fmgp.crypto.error.* +import fmgp.did.* +import fmgp.did.comm.* +import fmgp.did.comm.Operations.* +import fmgp.did.comm.protocol.* +import fmgp.did.comm.protocol.basicmessage2.* +import fmgp.did.comm.protocol.trustping2.* +import io.iohk.atala.mediator.* +import io.iohk.atala.mediator.comm.* +import io.iohk.atala.mediator.db.* +import io.iohk.atala.mediator.protocols.NullProtocolExecuter +import zio.* +import zio.json.* +import io.iohk.atala.mediator.protocols.MissingProtocolExecuter + +object ActionUtils { + + def packResponse( + plaintextMessage: Option[PlaintextMessage], + action: Action + ): ZIO[Operations & Agent & Resolver & MessageDispatcher, MediatorError, Option[EncryptedMessage]] = + action match { + case _: NoReply.type => ZIO.succeed(None) + case action: AnyReply => + val reply = action.msg + for { + msg <- { + reply.from match + case Some(value) => authEncrypt(reply) + case None => anonEncrypt(reply) + }.mapError(fail => MediatorDidError(fail)) + // TODO forward message + maybeSyncReplyMsg <- reply.to.map(_.toSeq) match // TODO improve + case None => ZIO.logWarning("Have a reply but the field 'to' is missing") *> ZIO.none + case Some(Seq()) => ZIO.logWarning("Have a reply but the field 'to' is empty") *> ZIO.none + case Some(send2DIDs) => + ZIO + .foreach(send2DIDs)(to => + val job: ZIO[MessageDispatcher & (Resolver & Any), MediatorError, Matchable] = for { + messageDispatcher <- ZIO.service[MessageDispatcher] + resolver <- ZIO.service[Resolver] + doc <- resolver + .didDocument(to) + .mapError(fail => MediatorDidError(fail)) + mURL = doc.service.toSeq.flatten + .filter(_.`type` match { + case str: String => str == DIDService.TYPE_DIDCommMessaging + case seq: Seq[String] => seq.contains(DIDService.TYPE_DIDCommMessaging) + }) match { + case head +: next => // FIXME discarte the next + head.getServiceEndpointAsURIs.headOption // TODO head + case Seq() => None // TODO + } + jobToRun <- mURL match + case None => ZIO.logWarning(s"No url to send message") + case Some(url) => { + ZIO.log(s"Send to url: $url") *> + messageDispatcher + .send( + msg, + url, + None + // url match // FIXME REMOVE (use for local env) + // case http if http.startsWith("http://") => Some(url.drop(7).split(':').head.split('/').head) + // case https if https.startsWith("https://") => + // Some(url.drop(8).split(':').head.split('/').head) + // case _ => None + ) + .catchAll { case DispatcherError(error) => ZIO.logWarning(s"Dispatch Error: $error") } + } + + } yield (jobToRun) + action match + case Reply(_) => job + case SyncReplyOnly(_) => ZIO.unit + case AsyncReplyOnly(_) => job + ) *> ZIO + .succeed(msg) + .when( + { + plaintextMessage.map(_.return_route).contains(ReturnRoute.all) + && { + plaintextMessage.flatMap(_.from.map(_.asTO)) match { + case None => false + case Some(replyTo) => send2DIDs.contains(replyTo) + } + } + } || action.isInstanceOf[SyncReplyOnly] + ) + } yield maybeSyncReplyMsg + } +} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala index 63ffc2b2..bf122039 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala @@ -13,23 +13,29 @@ import io.iohk.atala.mediator.db.* import io.iohk.atala.mediator.protocols.NullProtocolExecuter import zio.* import zio.json.* +import io.iohk.atala.mediator.protocols.MissingProtocolExecuter //TODO pick a better name // maybe "Protocol" only -trait ProtocolExecuter[-R] { +trait ProtocolExecuter[-R, +E] { // <: MediatorError | StorageError] { def suportedPIURI: Seq[PIURI] /** @return can return a Sync Reply Msg */ - def execute[R1 <: R](plaintextMessage: PlaintextMessage): ZIO[R1, MediatorError, Option[EncryptedMessage]] = + def execute[R1 <: R]( + plaintextMessage: PlaintextMessage + ): ZIO[R1, E, Option[EncryptedMessage]] = program(plaintextMessage) *> ZIO.none - def program[R1 <: R](plaintextMessage: PlaintextMessage): ZIO[R1, MediatorError, Action] + def program[R1 <: R](plaintextMessage: PlaintextMessage): ZIO[R1, E, Action] } object ProtocolExecuter { type Services = Resolver & Agent & Operations & MessageDispatcher + type Erros = MediatorError | StorageError } -case class ProtocolExecuterCollection[-R](executers: ProtocolExecuter[R]*) extends ProtocolExecuter[R] { +case class ProtocolExecuterCollection[-R <: Agent, +E]( + executers: ProtocolExecuter[R, E]* +) extends ProtocolExecuter[R, E] { override def suportedPIURI: Seq[PIURI] = executers.flatMap(_.suportedPIURI) @@ -37,100 +43,36 @@ case class ProtocolExecuterCollection[-R](executers: ProtocolExecuter[R]*) exten override def execute[R1 <: R]( plaintextMessage: PlaintextMessage, - ): ZIO[R1, MediatorError, Option[EncryptedMessage]] = + ): ZIO[R1, E, Option[EncryptedMessage]] = selectExecutersFor(plaintextMessage.`type`) match - case None => NullProtocolExecuter.execute(plaintextMessage) + // case None => NullProtocolExecuter.execute(plaintextMessage) + case None => MissingProtocolExecuter.execute(plaintextMessage) case Some(px) => px.execute(plaintextMessage) override def program[R1 <: R]( plaintextMessage: PlaintextMessage, - ): ZIO[R1, MediatorError, Action] = + ): ZIO[R1, E, Action] = selectExecutersFor(plaintextMessage.`type`) match - case None => NullProtocolExecuter.program(plaintextMessage) + // case None => NullProtocolExecuter.program(plaintextMessage) + case None => MissingProtocolExecuter.program(plaintextMessage) case Some(px) => px.program(plaintextMessage) } -trait ProtocolExecuterWithServices[-R <: ProtocolExecuter.Services] extends ProtocolExecuter[R] { +trait ProtocolExecuterWithServices[ + -R <: ProtocolExecuter.Services, + +E >: MediatorError // ProtocolExecuter.Erros +] extends ProtocolExecuter[R, E] { override def execute[R1 <: R]( plaintextMessage: PlaintextMessage, // context: Context - ): ZIO[R1, MediatorError, Option[EncryptedMessage]] = + ): ZIO[R1, E, Option[EncryptedMessage]] = program(plaintextMessage) .tap(v => ZIO.logDebug(v.toString)) // DEBUG - .flatMap { - case _: NoReply.type => ZIO.succeed(None) - case action: AnyReply => - val reply = action.msg - for { - msg <- { - reply.from match - case Some(value) => authEncrypt(reply) - case None => anonEncrypt(reply) - }.mapError(fail => MediatorDidError(fail)) - // TODO forward message - maybeSyncReplyMsg <- reply.to.map(_.toSeq) match // TODO improve - case None => ZIO.logWarning("Have a reply but the field 'to' is missing") *> ZIO.none - case Some(Seq()) => ZIO.logWarning("Have a reply but the field 'to' is empty") *> ZIO.none - case Some(send2DIDs) => - ZIO - .foreach(send2DIDs)(to => - val job: ZIO[MessageDispatcher & (Resolver & Any), MediatorError, Matchable] = for { - messageDispatcher <- ZIO.service[MessageDispatcher] - resolver <- ZIO.service[Resolver] - doc <- resolver - .didDocument(to) - .mapError(fail => MediatorDidError(fail)) - mURL = doc.service.toSeq.flatten - .filter(_.`type` match { - case str: String => str == DIDService.TYPE_DIDCommMessaging - case seq: Seq[String] => seq.contains(DIDService.TYPE_DIDCommMessaging) - }) match { - case head +: next => // FIXME discarte the next - head.getServiceEndpointAsURIs.headOption // TODO head - case Seq() => None // TODO - } - jobToRun <- mURL match - case None => ZIO.logWarning(s"No url to send message") - case Some(url) => { - ZIO.log(s"Send to url: $url") *> - messageDispatcher - .send( - msg, - url, // "http://localhost:8080", // FIXME REMOVE (use for local env) - None - // url match // FIXME REMOVE (use for local env) - // case http if http.startsWith("http://") => Some(url.drop(7).split(':').head.split('/').head) - // case https if https.startsWith("https://") => - // Some(url.drop(8).split(':').head.split('/').head) - // case _ => None - ) - .catchAll { case DispatcherError(error) => ZIO.logWarning(s"Dispatch Error: $error") } - } - - } yield (jobToRun) - action match - case Reply(_) => job - case SyncReplyOnly(_) => ZIO.unit - case AsyncReplyOnly(_) => job - ) *> ZIO - .succeed(msg) - .when( - { - plaintextMessage.return_route.contains(ReturnRoute.all) - && { - plaintextMessage.from.map(_.asTO) match { - case None => false - case Some(replyTo) => send2DIDs.contains(replyTo) - } - } - } || action.isInstanceOf[SyncReplyOnly] - ) - } yield maybeSyncReplyMsg - } + .flatMap(action => ActionUtils.packResponse(Some(plaintextMessage), action)) override def program[R1 <: R]( plaintextMessage: PlaintextMessage, // context: Context - ): ZIO[R1, MediatorError, Action] + ): ZIO[R1, E, Action] } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala index 692b65ef..9c0b8eda 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala @@ -6,6 +6,7 @@ import fmgp.did.* import fmgp.did.comm.* import fmgp.did.comm.protocol.* import fmgp.did.comm.protocol.oobinvitation.OOBInvitation +import fmgp.did.comm.protocol.reportproblem2.ProblemReport import io.iohk.atala.mediator.* import io.iohk.atala.mediator.actions.* import io.iohk.atala.mediator.comm.* @@ -36,9 +37,10 @@ case class MediatorAgent( // DynamicResolver.resolverLayer(didSocketManager) type Services = Resolver & Agent & Operations & MessageDispatcher & UserAccountRepo & MessageItemRepo - val protocolHandlerLayer: URLayer[UserAccountRepo & MessageItemRepo, ProtocolExecuter[Services]] = + val protocolHandlerLayer + : URLayer[UserAccountRepo & MessageItemRepo, ProtocolExecuter[Services, MediatorError | StorageError]] = ZLayer.succeed( - ProtocolExecuterCollection[Services]( + ProtocolExecuterCollection[Services, MediatorError | StorageError]( BasicMessageExecuter, new TrustPingExecuter, MediatorCoordinationExecuter, @@ -51,7 +53,7 @@ case class MediatorAgent( MessageDispatcherJVM.layer.mapError(ex => MediatorThrowable(ex)) // TODO move to another place & move validations and build a contex - def decrypt(msg: Message): ZIO[Agent & Resolver & Operations, MediatorError, PlaintextMessage] = + def decrypt(msg: Message): ZIO[Agent & Resolver & Operations, MediatorError | ProblemReport, PlaintextMessage] = { for { ops <- ZIO.service[Operations] plaintextMessage <- msg match @@ -80,13 +82,14 @@ case class MediatorAgent( case Right(msg2) => decrypt(msg2) } } yield (plaintextMessage) + } def receiveMessage( data: String, mSocketID: Option[SocketID], ): ZIO[ Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo, - MediatorError, + MediatorError | StorageError, Option[EncryptedMessage] ] = for { @@ -107,7 +110,7 @@ case class MediatorAgent( mSocketID: Option[SocketID] ): ZIO[ Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo, - MediatorError, + MediatorError | StorageError, Option[EncryptedMessage] ] = ZIO @@ -116,32 +119,83 @@ case class MediatorAgent( _ <- ZIO.log("receivedMessage") maybeSyncReplyMsg <- if (!msg.recipientsSubject.contains(id)) - ZIO.logError(s"This mediator '${id.string}' is not a recipient") - *> ZIO.none + ZIO.logError(s"This mediator '${id.string}' is not a recipient") *> ZIO.none else - for { - messageItemRepo <- ZIO.service[MessageItemRepo] - _ <- messageItemRepo.insert(MessageItem(msg)) // store all message - plaintextMessage <- decrypt(msg) - _ <- didSocketManager.get.flatMap { m => // TODO HACK REMOVE !!!!!!!!!!!!!!!!!!!!!!!! - ZIO.foreach(m.tapSockets)(_.socketOutHub.publish(TapMessage(msg, plaintextMessage).toJson)) - } - _ <- mSocketID match - case None => ZIO.unit - case Some(socketID) => - plaintextMessage.from match - case None => ZIO.unit - case Some(from) => - didSocketManager.update { - _.link(from.asFROMTO, socketID) - } - // TODO Store context of the decrypt unwarping - // TODO SreceiveMessagetore context with MsgID and PIURI - protocolHandler <- ZIO.service[ProtocolExecuter[Services]] - ret <- protocolHandler - .execute(plaintextMessage) - .tapError(ex => ZIO.logError(s"Error when execute Protocol: $ex")) - } yield ret + { + for { + messageItemRepo <- ZIO.service[MessageItemRepo] + protocolHandler <- ZIO.service[ProtocolExecuter[Services, MediatorError | StorageError]] + plaintextMessage <- decrypt(msg) + maybeActionStorageError <- messageItemRepo + .insert(MessageItem(msg)) // store all message + .map(_ /*WriteResult*/ => None + // TODO messages already on the database -> so this might be a replay attack + ) + .catchSome { + case StorageCollection(error) => + // This deals with connection errors to the database. + ZIO.logWarning(s"Error StorageCollection: $error") *> + ZIO + .service[Agent] + .map(agent => + Some( + Reply( + Problems + .storageError( + to = plaintextMessage.from.map(_.asTO).toSet, + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + .toPlaintextMessage + ) + ) + ) + case StorageThrowable(error) => + ZIO.logWarning(s"Error StorageThrowable: $error") *> + ZIO + .service[Agent] + .map(agent => + Some( + Reply( + Problems + .storageError( + to = plaintextMessage.from.map(_.asTO).toSet, + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + .toPlaintextMessage + ) + ) + ) + } + _ <- didSocketManager.get.flatMap { m => // TODO HACK REMOVE !!!!!!!!!!!!!!!!!!!!!!!! + ZIO.foreach(m.tapSockets)(_.socketOutHub.publish(TapMessage(msg, plaintextMessage).toJson)) + } + _ <- mSocketID match + case None => ZIO.unit + case Some(socketID) => + plaintextMessage.from match + case None => ZIO.unit + case Some(from) => + didSocketManager.update { + _.link(from.asFROMTO, socketID) + } + // TODO Store context of the decrypt unwarping + // TODO SreceiveMessagetore context with MsgID and PIURI + ret <- { + maybeActionStorageError match + case Some(reply) => ActionUtils.packResponse(Some(plaintextMessage), reply) + case None => protocolHandler.execute(plaintextMessage) + }.tapError(ex => ZIO.logError(s"Error when execute Protocol: $ex")) + } yield ret + }.catchAll { + case ex: MediatorError => ZIO.fail(ex) + case pr: ProblemReport => ActionUtils.packResponse(None, Reply(pr.toPlaintextMessage)) + case ex: StorageCollection => ZIO.fail(ex) + case ex: StorageThrowable => ZIO.fail(ex) + } } yield maybeSyncReplyMsg } .provideSomeLayer( /*resolverLayer ++ indentityLayer ++*/ protocolHandlerLayer) @@ -164,7 +218,10 @@ case class MediatorAgent( DIDSocketManager .newMessage(ch, text) .flatMap { case (socketID, encryptedMessage) => receiveMessage(encryptedMessage, Some(socketID)) } - .mapError(ex => MediatorException(ex)) + .mapError { + case ex: MediatorError => MediatorException(ex) + case ex: StorageError => StorageException(ex) + } } case ChannelEvent(ch, ChannelEvent.ChannelUnregistered) => ZIO.logAnnotate(LogAnnotation(SOCKET_ID, ch.id), annotationMap: _*) { @@ -302,7 +359,7 @@ object MediatorAgent { // TODO [return_route extension](https://github.com/decentralized-identity/didcomm-messaging/blob/main/extensions/return_route/main.md) case req @ Method.POST -> !! => ZIO - .logError(s"Request Headers : ${req.headers.mkString(",")}") + .logError(s"Request Headers: ${req.headers.mkString(",")}") .as( Response .text(s"The content-type must be ${MediaTypes.SIGNED.typ} or ${MediaTypes.ENCRYPTED.typ}") diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/BasicMessageExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/BasicMessageExecuter.scala index a430cef9..9b607581 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/BasicMessageExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/BasicMessageExecuter.scala @@ -3,11 +3,11 @@ package io.iohk.atala.mediator.protocols import fmgp.crypto.error.FailToParse import fmgp.did.comm.{PIURI, PlaintextMessage} import fmgp.did.comm.protocol.basicmessage2.BasicMessage -import io.iohk.atala.mediator.{MediatorDidError, MediatorThrowable} +import io.iohk.atala.mediator.{MediatorError, MediatorDidError, MediatorThrowable} import io.iohk.atala.mediator.actions.{NoReply, ProtocolExecuter} import zio.{Console, ZIO} -object BasicMessageExecuter extends ProtocolExecuter[Any] { +object BasicMessageExecuter extends ProtocolExecuter[Any, MediatorError] { override def suportedPIURI: Seq[PIURI] = Seq(BasicMessage.piuri) override def program[R1 <: Any](plaintextMessage: PlaintextMessage) = for { @@ -15,4 +15,5 @@ object BasicMessageExecuter extends ProtocolExecuter[Any] { case Left(error) => ZIO.fail(MediatorDidError(FailToParse(error))) case Right(bm) => Console.printLine(bm.toString).mapError(ex => MediatorThrowable(ex)) } yield NoReply + } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala index e692c157..92b71884 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala @@ -5,44 +5,65 @@ import fmgp.did.* import fmgp.did.comm.* import fmgp.did.comm.protocol.* import fmgp.did.comm.protocol.routing2.* -import io.iohk.atala.mediator.MediatorError +import io.iohk.atala.mediator._ import io.iohk.atala.mediator.actions.* import io.iohk.atala.mediator.db.* import zio.* import zio.json.* +import fmgp.did.comm.protocol.reportproblem2.ProblemReport object ForwardMessageExecuter extends ProtocolExecuterWithServices[ - ProtocolExecuter.Services & UserAccountRepo & MessageItemRepo + ProtocolExecuter.Services & UserAccountRepo & MessageItemRepo, + ProtocolExecuter.Erros ] { override def suportedPIURI: Seq[PIURI] = Seq(ForwardMessage.piuri) - override def program[R1 <: UserAccountRepo & MessageItemRepo]( + override def program[R1 <: UserAccountRepo & MessageItemRepo & Agent]( plaintextMessage: PlaintextMessage - ): ZIO[R1, MediatorError, Action] = { + ): ZIO[R1, ProtocolExecuter.Erros, Action] = { // the val is from the match to be definitely stable val piuriForwardMessage = ForwardMessage.piuri - (plaintextMessage.`type` match { - case `piuriForwardMessage` => plaintextMessage.toForwardMessage - }).map { case m: ForwardMessage => - for { - _ <- ZIO.logInfo("ForwardMessage") - repoMessageItem <- ZIO.service[MessageItemRepo] - repoDidAccount <- ZIO.service[UserAccountRepo] - recipientsSubject = Set(m.next) // m.msg.recipientsSubject - numbreOfUpdated <- repoDidAccount.addToInboxes(recipientsSubject, m.msg) - msg <- - if (numbreOfUpdated > 0) { // Or maybe we can add all the time - repoMessageItem.insert(MessageItem(m.msg)) *> - ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo") // TODO change to debug level - } else - ZIO.logWarning("Note: No update on the DidAccount of the recipients") - } yield None - } match - case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) - case Right(program) => program *> ZIO.succeed(NoReply) + (plaintextMessage.`type` match { case `piuriForwardMessage` => plaintextMessage.toForwardMessage }) match + case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) + case Right(m: ForwardMessage) => + for { + _ <- ZIO.logInfo("ForwardMessage") + repoMessageItem <- ZIO.service[MessageItemRepo] + repoDidAccount <- ZIO.service[UserAccountRepo] + recipientsSubject = Set(m.next) // m.msg.recipientsSubject + numbreOfUpdated <- repoDidAccount.addToInboxes(recipientsSubject, m.msg) + msg <- + if (numbreOfUpdated > 0) { // Or maybe we can add all the time + for { + _ <- repoMessageItem.insert(MessageItem(m.msg)) + _ <- ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo") + } yield NoReply + } else { + for { + _ <- ZIO.logWarning("Note: No update on the DidAccount of the recipients") + agent <- ZIO.service[Agent] + problem = plaintextMessage.from match { + case Some(to) => + Problems.notEnroledError( + to = Some(to.asTO), + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + case None => + Problems.notEnroledError( + to = None, + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + } + } yield Reply(problem.toPlaintextMessage) + } + } yield msg } } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala index d04db5af..2a065af3 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala @@ -9,10 +9,16 @@ import fmgp.did.comm.protocol.mediatorcoordination2.* import io.iohk.atala.mediator.* import io.iohk.atala.mediator.actions.* import io.iohk.atala.mediator.db.UserAccountRepo +import io.iohk.atala.mediator.db.DidAccount + import zio.* import zio.json.* -import io.iohk.atala.mediator.db.DidAccount -object MediatorCoordinationExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Services & UserAccountRepo] { + +object MediatorCoordinationExecuter + extends ProtocolExecuterWithServices[ + ProtocolExecuter.Services & UserAccountRepo, + ProtocolExecuter.Erros + ] { override def suportedPIURI: Seq[PIURI] = Seq( MediateRequest.piuri, @@ -26,7 +32,7 @@ object MediatorCoordinationExecuter extends ProtocolExecuterWithServices[Protoco override def program[R1 <: (UserAccountRepo)]( plaintextMessage: PlaintextMessage - ): ZIO[R1, MediatorError, Action] = { + ): ZIO[R1, MediatorError | StorageError, Action] = { // the val is from the match to be definitely stable val piuriMediateRequest = MediateRequest.piuri val piuriMediateGrant = MediateGrant.piuri @@ -45,8 +51,34 @@ object MediatorCoordinationExecuter extends ProtocolExecuterWithServices[Protoco case `piuriKeylistQuery` => plaintextMessage.toKeylistQuery case `piuriKeylist` => plaintextMessage.toKeylist }).map { - case m: MediateGrant => ZIO.logWarning("MediateGrant") *> ZIO.succeed(NoReply) - case m: MediateDeny => ZIO.logWarning("MediateDeny") *> ZIO.succeed(NoReply) + case m: MediateGrant => + ZIO.logWarning("MediateGrant") *> ZIO.succeed(NoReply) *> + ZIO.succeed( + SyncReplyOnly( + Problems + .unsupportedProtocolRole( + from = m.to.asFROM, + to = m.from.asTO, + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + ) + .toPlaintextMessage + ) + ) + case m: MediateDeny => + ZIO.logWarning("MediateDeny") *> ZIO.succeed(NoReply) *> + ZIO.succeed( + SyncReplyOnly( + Problems + .unsupportedProtocolRole( + from = m.to.asFROM, + to = m.from.asTO, + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + ) + .toPlaintextMessage + ) + ) case m: MediateRequest => for { _ <- ZIO.logInfo("MediateRequest") @@ -78,7 +110,20 @@ object MediatorCoordinationExecuter extends ProtocolExecuterWithServices[Protoco } } } yield SyncReplyOnly(m.makeKeylistResponse(updateResponse).toPlaintextMessage) - case m: KeylistResponse => ZIO.logWarning("KeylistResponse") *> ZIO.succeed(NoReply) + case m: KeylistResponse => + ZIO.logWarning("KeylistResponse") *> ZIO.succeed(NoReply) *> + ZIO.succeed( + SyncReplyOnly( + Problems + .unsupportedProtocolRole( + from = m.to.asFROM, + to = m.from.asTO, + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + ) + .toPlaintextMessage + ) + ) case m: KeylistQuery => for { _ <- ZIO.logInfo("KeylistQuery") @@ -98,7 +143,7 @@ object MediatorCoordinationExecuter extends ProtocolExecuterWithServices[Protoco case Some(response) => SyncReplyOnly(response.toPlaintextMessage) case m: Keylist => ZIO.logWarning("Keylist") *> ZIO.succeed(NoReply) } match - case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) + case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) // TODO error report case Right(program) => program } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MissingProtocolExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MissingProtocolExecuter.scala new file mode 100644 index 00000000..40d481cb --- /dev/null +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MissingProtocolExecuter.scala @@ -0,0 +1,29 @@ +package io.iohk.atala.mediator.protocols + +import zio.ZIO + +import fmgp.did.* +import fmgp.did.comm.PlaintextMessage +import io.iohk.atala.mediator.MissingProtocolError +import io.iohk.atala.mediator.actions.ProtocolExecuter +import io.iohk.atala.mediator.actions.Reply + +object MissingProtocolExecuter extends ProtocolExecuter[Agent, Nothing] { + + override def suportedPIURI = Seq() + override def program[R1 <: Agent](plaintextMessage: PlaintextMessage) = + ZIO + .service[Agent] + .map(agent => + Reply( + Problems + .unsupportedProtocolType( + to = plaintextMessage.from.map(_.asTO).toSet, + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + .toPlaintextMessage + ) + ) +} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/NullProtocolExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/NullProtocolExecuter.scala index 657c1b96..9013ca57 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/NullProtocolExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/NullProtocolExecuter.scala @@ -5,7 +5,7 @@ import io.iohk.atala.mediator.MissingProtocolError import io.iohk.atala.mediator.actions.ProtocolExecuter import zio.ZIO -object NullProtocolExecuter extends ProtocolExecuter[Any] { +object NullProtocolExecuter extends ProtocolExecuter[Any, MissingProtocolError] { override def suportedPIURI = Seq() override def program[R1 <: Any](plaintextMessage: PlaintextMessage) = diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala index f2a5687d..e74d9fcd 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala @@ -12,7 +12,10 @@ import io.iohk.atala.mediator.db.* import zio.* import zio.json.* object PickupExecuter - extends ProtocolExecuterWithServices[ProtocolExecuter.Services & UserAccountRepo & MessageItemRepo] { + extends ProtocolExecuterWithServices[ + ProtocolExecuter.Services & UserAccountRepo & MessageItemRepo, + ProtocolExecuter.Erros + ] { override def suportedPIURI: Seq[PIURI] = Seq( StatusRequest.piuri, @@ -25,7 +28,7 @@ object PickupExecuter override def program[R1 <: UserAccountRepo & MessageItemRepo]( plaintextMessage: PlaintextMessage - ): ZIO[R1, MediatorError, Action] = { + ): ZIO[R1, StorageError, Action] = { // the val is from the match to be definitely stable val piuriStatusRequest = StatusRequest.piuri val piuriStatus = Status.piuri @@ -48,23 +51,45 @@ object PickupExecuter repoDidAccount <- ZIO.service[UserAccountRepo] didRequestingMessages = m.from.asFROMTO mDidAccount <- repoDidAccount.getDidAccount(didRequestingMessages.toDID) - msgHash = mDidAccount match - case None => ??? // TODO FIXME - case Some(didAccount) => didAccount.messagesRef.filter(_.state == false).map(_.hash) - status = Status( - thid = m.id, - from = m.to.asFROM, - to = m.from.asTO, - recipient_did = m.recipient_did, - message_count = msgHash.size, - longest_waited_seconds = None, // TODO - newest_received_time = None, // TODO - oldest_received_time = None, // TODO - total_bytes = None, // TODO - live_delivery = None, // TODO + ret = mDidAccount match + case None => + Problems + .notEnroledError( + from = m.to.asFROM, + to = Some(m.from.asTO), + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + ) + .toPlaintextMessage + case Some(didAccount) => + val msgHash = didAccount.messagesRef.filter(_.state == false).map(_.hash) + Status( + thid = m.id, + from = m.to.asFROM, + to = m.from.asTO, + recipient_did = m.recipient_did, + message_count = msgHash.size, + longest_waited_seconds = None, // TODO + newest_received_time = None, // TODO + oldest_received_time = None, // TODO + total_bytes = None, // TODO + live_delivery = None, // TODO + ).toPlaintextMessage + } yield SyncReplyOnly(ret) + case m: Status => + ZIO.logInfo("Status") *> + ZIO.succeed( + SyncReplyOnly( + Problems + .unsupportedProtocolRole( + from = m.to.asFROM, + to = m.from.asTO, + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + ) + .toPlaintextMessage + ) ) - } yield SyncReplyOnly(status.toPlaintextMessage) - case m: Status => ZIO.logInfo("Status") *> ZIO.succeed(NoReply) case m: DeliveryRequest => for { _ <- ZIO.logInfo("DeliveryRequest") @@ -72,27 +97,39 @@ object PickupExecuter repoDidAccount <- ZIO.service[UserAccountRepo] didRequestingMessages = m.from.asFROMTO mDidAccount <- repoDidAccount.getDidAccount(didRequestingMessages.toDID) - msgHash = mDidAccount match - case None => ??? - case Some(didAccount) => didAccount.messagesRef.filter(_.state == false).map(_.hash) - allMessagesFor <- repoMessageItem.findByIds(msgHash) - messagesToReturn = - if (m.recipient_did.isEmpty) allMessagesFor - else { - allMessagesFor.filterNot( - _.msg.recipientsSubject - .map(_.did) - .forall(e => !m.recipient_did.map(_.toDID.did).contains(e)) + ret <- mDidAccount match + case None => + ZIO.succeed( + Problems + .notEnroledError( + from = m.to.asFROM, + to = Some(m.from.asTO), + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + ) + .toPlaintextMessage ) - } - deliveryRequest = MessageDelivery( - thid = m.id, - from = m.to.asFROM, - to = m.from.asTO, - recipient_did = m.recipient_did, - attachments = messagesToReturn.map(m => (m._id, m.msg)).toMap, - ) - } yield SyncReplyOnly(deliveryRequest.toPlaintextMessage) + case Some(didAccount) => + val msgHash = didAccount.messagesRef.filter(_.state == false).map(_.hash) + for { + allMessagesFor <- repoMessageItem.findByIds(msgHash) + messagesToReturn = + if (m.recipient_did.isEmpty) allMessagesFor + else { + allMessagesFor.filterNot( + _.msg.recipientsSubject + .map(_.did) + .forall(e => !m.recipient_did.map(_.toDID.did).contains(e)) + ) + } + } yield MessageDelivery( + thid = m.id, + from = m.to.asFROM, + to = m.from.asTO, + recipient_did = m.recipient_did, + attachments = messagesToReturn.map(m => (m._id, m.msg)).toMap, + ).toPlaintextMessage + } yield SyncReplyOnly(ret) case m: MessageDelivery => ZIO.logInfo("MessageDelivery") *> ZIO.succeed( @@ -115,8 +152,20 @@ object PickupExecuter m.message_id_list ) } yield NoReply - case m: LiveModeChange => ZIO.logWarning("LiveModeChange not implemented") *> ZIO.succeed(NoReply) // TODO - + case m: LiveModeChange => + ZIO.logInfo("LiveModeChange Not Supported") *> + ZIO.succeed( + SyncReplyOnly( + Problems + .liveModeNotSupported( + from = m.to.asFROM, + to = m.from.asTO, + pthid = m.id, + piuri = m.piuri, + ) + .toPlaintextMessage + ) + ) } match case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) case Right(program) => program diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala new file mode 100644 index 00000000..62673a99 --- /dev/null +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala @@ -0,0 +1,112 @@ +package io.iohk.atala.mediator.protocols + +import fmgp.did.* +import fmgp.did.comm.* +import fmgp.did.comm.protocol.reportproblem2.* + +object Problems { + val email = Some("atala@iohk.io") + + def unsupportedProtocolType( + to: Set[TO], + from: FROM, + pthid: MsgID, + piuri: PIURI, + ) = ProblemReport( + // id: MsgID = MsgID(), + to = to, + from = from, + pthid = pthid, + ack = None, + code = ProblemCode.ErroFail("msg", "unsupported"), + comment = None, + args = None, + escalate_to = email, + ) + + def unsupportedProtocolRole( + to: TO, + from: FROM, + pthid: MsgID, + piuri: PIURI, + ) = ProblemReport( + // id: MsgID = MsgID(), + to = Set(to), + from = from, + pthid = pthid, + ack = None, + code = ProblemCode.ErroFail("msg", "unsupported"), + comment = None, + args = None, + escalate_to = email, + ) + + def protocolNotImplemented( + to: TO, + from: FROM, + pthid: MsgID, + piuri: PIURI, + ) = ProblemReport( + // id: MsgID = MsgID(), + to = Set(to), + from = from, + pthid = pthid, + ack = None, + code = ProblemCode.ErroFail("msg", "unsupported"), + comment = None, + args = None, + escalate_to = email, + ) + + def liveModeNotSupported( + to: TO, + from: FROM, + pthid: MsgID, + piuri: PIURI, + ) = ProblemReport( + // id: MsgID = MsgID(), + to = Set(to), + from = from, + pthid = pthid, + ack = None, + code = ProblemCode.ErroUndo("live-mode-not-supported"), + comment = Some("Connection does not support Live Delivery"), + args = None, + escalate_to = email, + ) + + def storageError( + to: Set[TO], + from: FROM, + pthid: MsgID, + piuri: PIURI, + ) = ProblemReport( + // id: MsgID = MsgID(), + to = to, + from = from, + pthid = pthid, + ack = None, + code = ProblemCode.ErroFail("me", "res", "storage"), + comment = None, + args = None, + escalate_to = email, + ) + + def notEnroledError( + to: Option[TO], + from: FROM, + pthid: MsgID, + piuri: PIURI, + ) = ProblemReport( + // id: MsgID = MsgID(), + to = to.toSet, + from = from, + pthid = pthid, + ack = None, + code = ProblemCode.ErroFail("req", "not_enroll"), + comment = Some("The DID '{1}' is not enroled."), + args = Some(to.map(_.value).toSeq), + escalate_to = email, + ) + +} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/TrustPingExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/TrustPingExecuter.scala index 49c27510..4b973c4b 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/TrustPingExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/TrustPingExecuter.scala @@ -13,7 +13,7 @@ import io.iohk.atala.mediator.{MediatorDidError, MediatorError} import io.iohk.atala.mediator.actions.{Action, NoReply, ProtocolExecuter, ProtocolExecuterWithServices, Reply} import zio.ZIO -class TrustPingExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Services] { +class TrustPingExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Services, MediatorError] { override def suportedPIURI: Seq[PIURI] = Seq(TrustPing.piuri, TrustPingResponse.piuri)