Skip to content

Commit

Permalink
feat: Error handling and Send Problem Reports (#65)
Browse files Browse the repository at this point in the history
Error handling and Send Problem Reports
For ATL-5340

- Catch Errors and send Problem Reports:
  - (sync) e.p.crypto - is message is tampering (any crypto error).
    - [WIP]
  - (sync) e.p.crypto.unsupported - is message is tampering (any crypto error).
    - [WIP]
  - (sync & async) e.p.crypto.replay - if the message is reply (possible he replay attack).
  - (sync) e.p.req - pickup message before enroling.
  - [QA] StatusRequest - https://didcomm.org/messagepickup/3.0/status-request
  - [QA] DeliveryRequest - https://didcomm.org/messagepickup/3.0/delivery-request
  - (sync) e.p.me.res.storage - connection MongoBD is not working.
  - [QA] catch all StorageCollection Error
  - (sync) e.p.me.res.storage - business logic MongoBD is not working.
  - [QA] catch all StorageThrowable
  - (sync) e.p.did - for any DID method that is not `did.peer`.
  - (sync) e.p.did.malformed - for any DID method malformed.
  - (sync) e.p.msg - for parsing error from the message.
    - [QA] All parsing errors from the decrypt function
    - [TODO] parsing for a specific data model of each protocol
  - (sync) e.p.msg.unsupported - for the message type LiveModeChange and all message that is not the role of the mediator
    - [QA] MediateGrant
    - [QA] MediateDeny
    - [QA] KeylistResponse
    - [QA] Status - https://didcomm.org/messagepickup/3.0/status
    - [TODO] ...
  - LiveModeChange Not Supported 
    - [QA] "e.m.live-mode-not-supported" - https://didcomm.org/messagepickup/3.0/live-delivery-change
  - (sync) e.p.msg.unsupported - for parsing errors due to unsupported version or protocol.
    - [QA] MissingProtocolExecuter (unsupported protocol it also works fine for unsupported versions)
  - (sync & async) e.p.req.not_enroll - Get a Forward message to a DID that is not enrolled.
    - [QA] Send a Problem Report if the next DID is not enrolled in the Mediator.
  - (sync & async) e.p.me - catch all error at the end.
    - [WIP]

---------

Signed-off-by: Fabio Pinheiro <fabiomgpinheiro@gmail.com>
  • Loading branch information
FabioPinheiro authored and mineme0110 committed Apr 30, 2024
1 parent e303319 commit 9160dca
Show file tree
Hide file tree
Showing 14 changed files with 607 additions and 190 deletions.
65 changes: 65 additions & 0 deletions Mediator-Error_Handling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Error Handling


More info see https://input-output.atlassian.net/browse/ATL-5236

https://identity.foundation/didcomm-messaging/spec/#problem-reports


On each step of all our protocols processing, when something wrong is happening, we need to:
Goals
- Update the record to a documented error state
- Log the error in the service logs
- Send the problem report message when appropriate

Goal other: error recover/resilient
- [optional] Send event that record state changed to error
- Decide on the policy of re-trying sending errors (one of the proposals is just to send it once, and if a recipient did not get this, then it’s on its own requesting record ID state)



Note: most errors in Mediator will be synchronous

- Store messages when sending (1w)
- Catch Errors and send Problem Reports (1w):
- (sync) e.p.crypto - is message is tampering (any crypto error).
- [WIP]
- (sync) e.p.crypto.unsupported - is message is tampering (any crypto error).
- [WIP]
- (sync & async) e.p.crypto.replay - if the message is replay (possible he replay attack).
- (sync) e.p.req - pickup message before enroling.
- [QA] StatusRequest - https://didcomm.org/messagepickup/3.0/status-request
- [QA] DeliveryRequest - https://didcomm.org/messagepickup/3.0/delivery-request
- (sync) e.p.me.res.storage - connection MongoBD is not working.
- [QA] catch all StorageCollection Error
- (sync) e.p.me.res.storage - business logic MongoBD is not working.
- [QA] catch all StorageThrowable
- (sync) e.p.did - for any DID method that is not `did.peer`.
- (sync) e.p.did.malformed - for any DID method malformed.
- (sync) e.p.msg - for parsing error from the message.
- [QA] All parsing errors from the decrypt function
- [TODO] parsing for a specific data model of each protocol
- (sync) e.p.msg.unsupported - for the message type LiveModeChange and all message that is not role of the mediator
- [QA] MediateGrant
- [QA] MediateDeny
- [QA] KeylistResponse
- [QA] Status - https://didcomm.org/messagepickup/3.0/status
- [TODO] ...
- LiveModeChange Not Supported
- [QA] "e.m.live-mode-not-supported" - https://didcomm.org/messagepickup/3.0/live-delivery-change
- (sync) e.p.msg.unsupported - for parsing error due to unsupported version or protocol.
- [QA] MissingProtocolExecuter (unsupported protocol it also works fine for unsupported versions)
- (sync & async) e.p.req.not_enroll - Get a Forward message to a DID that is not enrolled.
- [QA] Send a Problem Report if the next DID is not enrolled in the Mediator.
- (sync & async) e.p.me - catch all error at the end.
- [WIP]
- Receive a problem report (1w):
- in case of Warnings Reply `w.p` -> log warnings and escalate to an error `e.p` on the reply
- in case of Error `e.p` -> log error

- Traceability of the MsgID of the Problem Report to the original error (2d) -> ATL-4147
- [optional] Log - https://input-output.atlassian.net/browse/ATL-4147
- escalate_to must be configurable (1d)
- [optional] update the protocol with new tokens (2d)
- `e.p.me.res.storage`
- `e.p.me.res.not_enroll`
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ inThisBuild(

/** Versions */
lazy val V = new {
val scalaDID = "0.1.0-M6"
val scalaDID = "0.1.0-M7"
// val scalajsJavaSecureRandom = "1.0.0"

// FIXME another bug in the test framework https://github.com/scalameta/munit/issues/554
Expand Down
6 changes: 4 additions & 2 deletions mediator/src/main/scala/io/iohk/atala/mediator/Error.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import fmgp.did.*
import fmgp.did.comm.*
import zio.json.*

trait MediatorError
sealed trait MediatorError

case class MediatorException(fail: MediatorError) extends Exception(fail.toString())

Expand All @@ -20,8 +20,9 @@ object MediatorThrowable {
}

// Storage
case class StorageException(fail: StorageError) extends Exception(fail.toString())

trait StorageError extends MediatorError {
sealed trait StorageError { // extends MediatorError {
def error: String
}

Expand All @@ -35,6 +36,7 @@ object StorageThrowable {
def apply(throwable: Throwable) = new StorageThrowable(throwable.getClass.getName() + ":" + throwable.getMessage)
}

// ProtocolError
sealed trait ProtocolError extends MediatorError {
def piuri: PIURI
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package io.iohk.atala.mediator.actions

import fmgp.crypto.error.*
import fmgp.did.*
import fmgp.did.comm.*
import fmgp.did.comm.Operations.*
import fmgp.did.comm.protocol.*
import fmgp.did.comm.protocol.basicmessage2.*
import fmgp.did.comm.protocol.trustping2.*
import io.iohk.atala.mediator.*
import io.iohk.atala.mediator.comm.*
import io.iohk.atala.mediator.db.*
import io.iohk.atala.mediator.protocols.NullProtocolExecuter
import zio.*
import zio.json.*
import io.iohk.atala.mediator.protocols.MissingProtocolExecuter

object ActionUtils {

def packResponse(
plaintextMessage: Option[PlaintextMessage],
action: Action
): ZIO[Operations & Agent & Resolver & MessageDispatcher, MediatorError, Option[EncryptedMessage]] =
action match {
case _: NoReply.type => ZIO.succeed(None)
case action: AnyReply =>
val reply = action.msg
for {
msg <- {
reply.from match
case Some(value) => authEncrypt(reply)
case None => anonEncrypt(reply)
}.mapError(fail => MediatorDidError(fail))
// TODO forward message
maybeSyncReplyMsg <- reply.to.map(_.toSeq) match // TODO improve
case None => ZIO.logWarning("Have a reply but the field 'to' is missing") *> ZIO.none
case Some(Seq()) => ZIO.logWarning("Have a reply but the field 'to' is empty") *> ZIO.none
case Some(send2DIDs) =>
ZIO
.foreach(send2DIDs)(to =>
val job: ZIO[MessageDispatcher & (Resolver & Any), MediatorError, Matchable] = for {
messageDispatcher <- ZIO.service[MessageDispatcher]
resolver <- ZIO.service[Resolver]
doc <- resolver
.didDocument(to)
.mapError(fail => MediatorDidError(fail))
mURL = doc.service.toSeq.flatten
.filter(_.`type` match {
case str: String => str == DIDService.TYPE_DIDCommMessaging
case seq: Seq[String] => seq.contains(DIDService.TYPE_DIDCommMessaging)
}) match {
case head +: next => // FIXME discarte the next
head.getServiceEndpointAsURIs.headOption // TODO head
case Seq() => None // TODO
}
jobToRun <- mURL match
case None => ZIO.logWarning(s"No url to send message")
case Some(url) => {
ZIO.log(s"Send to url: $url") *>
messageDispatcher
.send(
msg,
url,
None
// url match // FIXME REMOVE (use for local env)
// case http if http.startsWith("http://") => Some(url.drop(7).split(':').head.split('/').head)
// case https if https.startsWith("https://") =>
// Some(url.drop(8).split(':').head.split('/').head)
// case _ => None
)
.catchAll { case DispatcherError(error) => ZIO.logWarning(s"Dispatch Error: $error") }
}

} yield (jobToRun)
action match
case Reply(_) => job
case SyncReplyOnly(_) => ZIO.unit
case AsyncReplyOnly(_) => job
) *> ZIO
.succeed(msg)
.when(
{
plaintextMessage.map(_.return_route).contains(ReturnRoute.all)
&& {
plaintextMessage.flatMap(_.from.map(_.asTO)) match {
case None => false
case Some(replyTo) => send2DIDs.contains(replyTo)
}
}
} || action.isInstanceOf[SyncReplyOnly]
)
} yield maybeSyncReplyMsg
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,124 +13,66 @@ import io.iohk.atala.mediator.db.*
import io.iohk.atala.mediator.protocols.NullProtocolExecuter
import zio.*
import zio.json.*
import io.iohk.atala.mediator.protocols.MissingProtocolExecuter
//TODO pick a better name // maybe "Protocol" only

trait ProtocolExecuter[-R] {
trait ProtocolExecuter[-R, +E] { // <: MediatorError | StorageError] {

def suportedPIURI: Seq[PIURI]

/** @return can return a Sync Reply Msg */
def execute[R1 <: R](plaintextMessage: PlaintextMessage): ZIO[R1, MediatorError, Option[EncryptedMessage]] =
def execute[R1 <: R](
plaintextMessage: PlaintextMessage
): ZIO[R1, E, Option[EncryptedMessage]] =
program(plaintextMessage) *> ZIO.none

def program[R1 <: R](plaintextMessage: PlaintextMessage): ZIO[R1, MediatorError, Action]
def program[R1 <: R](plaintextMessage: PlaintextMessage): ZIO[R1, E, Action]
}

object ProtocolExecuter {
type Services = Resolver & Agent & Operations & MessageDispatcher
type Erros = MediatorError | StorageError
}
case class ProtocolExecuterCollection[-R](executers: ProtocolExecuter[R]*) extends ProtocolExecuter[R] {
case class ProtocolExecuterCollection[-R <: Agent, +E](
executers: ProtocolExecuter[R, E]*
) extends ProtocolExecuter[R, E] {

override def suportedPIURI: Seq[PIURI] = executers.flatMap(_.suportedPIURI)

def selectExecutersFor(piuri: PIURI) = executers.find(_.suportedPIURI.contains(piuri))

override def execute[R1 <: R](
plaintextMessage: PlaintextMessage,
): ZIO[R1, MediatorError, Option[EncryptedMessage]] =
): ZIO[R1, E, Option[EncryptedMessage]] =
selectExecutersFor(plaintextMessage.`type`) match
case None => NullProtocolExecuter.execute(plaintextMessage)
// case None => NullProtocolExecuter.execute(plaintextMessage)
case None => MissingProtocolExecuter.execute(plaintextMessage)
case Some(px) => px.execute(plaintextMessage)

override def program[R1 <: R](
plaintextMessage: PlaintextMessage,
): ZIO[R1, MediatorError, Action] =
): ZIO[R1, E, Action] =
selectExecutersFor(plaintextMessage.`type`) match
case None => NullProtocolExecuter.program(plaintextMessage)
// case None => NullProtocolExecuter.program(plaintextMessage)
case None => MissingProtocolExecuter.program(plaintextMessage)
case Some(px) => px.program(plaintextMessage)
}

trait ProtocolExecuterWithServices[-R <: ProtocolExecuter.Services] extends ProtocolExecuter[R] {
trait ProtocolExecuterWithServices[
-R <: ProtocolExecuter.Services,
+E >: MediatorError // ProtocolExecuter.Erros
] extends ProtocolExecuter[R, E] {

override def execute[R1 <: R](
plaintextMessage: PlaintextMessage,
// context: Context
): ZIO[R1, MediatorError, Option[EncryptedMessage]] =
): ZIO[R1, E, Option[EncryptedMessage]] =
program(plaintextMessage)
.tap(v => ZIO.logDebug(v.toString)) // DEBUG
.flatMap {
case _: NoReply.type => ZIO.succeed(None)
case action: AnyReply =>
val reply = action.msg
for {
msg <- {
reply.from match
case Some(value) => authEncrypt(reply)
case None => anonEncrypt(reply)
}.mapError(fail => MediatorDidError(fail))
// TODO forward message
maybeSyncReplyMsg <- reply.to.map(_.toSeq) match // TODO improve
case None => ZIO.logWarning("Have a reply but the field 'to' is missing") *> ZIO.none
case Some(Seq()) => ZIO.logWarning("Have a reply but the field 'to' is empty") *> ZIO.none
case Some(send2DIDs) =>
ZIO
.foreach(send2DIDs)(to =>
val job: ZIO[MessageDispatcher & (Resolver & Any), MediatorError, Matchable] = for {
messageDispatcher <- ZIO.service[MessageDispatcher]
resolver <- ZIO.service[Resolver]
doc <- resolver
.didDocument(to)
.mapError(fail => MediatorDidError(fail))
mURL = doc.service.toSeq.flatten
.filter(_.`type` match {
case str: String => str == DIDService.TYPE_DIDCommMessaging
case seq: Seq[String] => seq.contains(DIDService.TYPE_DIDCommMessaging)
}) match {
case head +: next => // FIXME discarte the next
head.getServiceEndpointAsURIs.headOption // TODO head
case Seq() => None // TODO
}
jobToRun <- mURL match
case None => ZIO.logWarning(s"No url to send message")
case Some(url) => {
ZIO.log(s"Send to url: $url") *>
messageDispatcher
.send(
msg,
url, // "http://localhost:8080", // FIXME REMOVE (use for local env)
None
// url match // FIXME REMOVE (use for local env)
// case http if http.startsWith("http://") => Some(url.drop(7).split(':').head.split('/').head)
// case https if https.startsWith("https://") =>
// Some(url.drop(8).split(':').head.split('/').head)
// case _ => None
)
.catchAll { case DispatcherError(error) => ZIO.logWarning(s"Dispatch Error: $error") }
}

} yield (jobToRun)
action match
case Reply(_) => job
case SyncReplyOnly(_) => ZIO.unit
case AsyncReplyOnly(_) => job
) *> ZIO
.succeed(msg)
.when(
{
plaintextMessage.return_route.contains(ReturnRoute.all)
&& {
plaintextMessage.from.map(_.asTO) match {
case None => false
case Some(replyTo) => send2DIDs.contains(replyTo)
}
}
} || action.isInstanceOf[SyncReplyOnly]
)
} yield maybeSyncReplyMsg
}
.flatMap(action => ActionUtils.packResponse(Some(plaintextMessage), action))

override def program[R1 <: R](
plaintextMessage: PlaintextMessage,
// context: Context
): ZIO[R1, MediatorError, Action]
): ZIO[R1, E, Action]
}
Loading

0 comments on commit 9160dca

Please sign in to comment.