diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala index 31e82b6f..498c8728 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala @@ -32,6 +32,7 @@ object ActionUtils { for { outboxRepo <- ZIO.service[OutboxMessageRepo] + xRequestId <- ZIO.logAnnotations.map(_.get(XRequestId.value)) // TODO forward message maybeSyncReplyMsg: Option[SignedMessage | EncryptedMessage] <- reply.to.map(_.toSeq) match // TODO improve case None => @@ -99,6 +100,7 @@ object ActionUtils { case str: String => Some(str) case _: Unit => None , + xRequestId = xRequestId ) ) // Maybe fork .catchAll { case error => ZIO.logError(s"Store Outbox Error: $error") } @@ -132,6 +134,7 @@ object ActionUtils { distination = None, sendMethod = MessageSendMethod.INLINE_REPLY, result = None, + xRequestId = xRequestId, ) ) .catchAll { case error => ZIO.logError(s"Store Outbox Error: $error") } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala index 48489471..3b38b278 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala @@ -121,7 +121,7 @@ case class MediatorAgent( protocolHandler <- ZIO.service[ProtocolExecuter[Services, MediatorError | StorageError]] plaintextMessage <- decrypt(msg) maybeActionStorageError <- messageItemRepo - .insert(MessageItem(msg)) // store all message + .insert(msg) // store all message .map(_ /*WriteResult*/ => None // TODO messages already on the database -> so this might be a replay attack ) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/db/DataModels.scala b/mediator/src/main/scala/io/iohk/atala/mediator/db/DataModels.scala index c4a04116..ac6e929e 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/db/DataModels.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/db/DataModels.scala @@ -9,22 +9,35 @@ import zio.json._ type HASH = String // messages +type XRequestID = String // x-request-id -case class MessageItem(_id: HASH, msg: EncryptedMessage, headers: ProtectedHeader) +case class MessageItem( + _id: HASH, + msg: EncryptedMessage, + headers: ProtectedHeader, + ts: String, + xRequestId: Option[XRequestID] +) object MessageItem { - def apply(msg: EncryptedMessage): MessageItem = { - new MessageItem(msg.sha1, msg, msg.`protected`.obj) + def apply(msg: EncryptedMessage, xRequestId: Option[XRequestID]): MessageItem = { + new MessageItem(msg.sha1, msg, msg.`protected`.obj, Instant.now().toString, xRequestId) } given BSONDocumentWriter[MessageItem] = Macros.writer[MessageItem] given BSONDocumentReader[MessageItem] = Macros.reader[MessageItem] } -case class MessageMetaData(hash: HASH, recipient: DIDSubject, state: Boolean, ts: String) +case class MessageMetaData( + hash: HASH, + recipient: DIDSubject, + state: Boolean, + ts: String, + xRequestId: Option[XRequestID] +) object MessageMetaData { given BSONDocumentWriter[MessageMetaData] = Macros.writer[MessageMetaData] given BSONDocumentReader[MessageMetaData] = Macros.reader[MessageMetaData] - def apply(hash: HASH, recipient: DIDSubject) = { - new MessageMetaData(hash = hash, recipient = recipient, state = false, ts = Instant.now().toString) + def apply(hash: HASH, recipient: DIDSubject, xRequestId: Option[XRequestID]): MessageMetaData = { + new MessageMetaData(hash = hash, recipient = recipient, state = false, ts = Instant.now().toString, xRequestId) } } @@ -59,7 +72,8 @@ object SentMessageItem { recipient: Set[TO], distination: Option[String], sendMethod: MessageSendMethod, - result: Option[String] + result: Option[String], + xRequestId: Option[XRequestID] ): SentMessageItem = { msg match case sMsg: SignedMessage => @@ -69,7 +83,13 @@ object SentMessageItem { headers = sMsg.signatures.headOption.flatMap(_.`protected`.obj.toJsonAST.toOption).getOrElse(ast.Json.Null), plaintext = plaintext, transport = Seq( - TransportInfo(recipient = recipient, distination = distination, sendMethod = sendMethod, result = result) + TransportInfo( + recipient = recipient, + distination = distination, + sendMethod = sendMethod, + result = result, + xRequestId = xRequestId + ) ) ) case eMsg: EncryptedMessage => @@ -79,7 +99,13 @@ object SentMessageItem { headers = eMsg.`protected`.obj.toJsonAST.getOrElse(ast.Json.Null), plaintext = plaintext, transport = Seq( - TransportInfo(recipient = recipient, distination = distination, sendMethod = sendMethod, result = result) + TransportInfo( + recipient = recipient, + distination = distination, + sendMethod = sendMethod, + result = result, + xRequestId = xRequestId + ) ) ) @@ -100,6 +126,7 @@ object SentMessageItem { sendMethod: MessageSendMethod, timestamp: BSONDateTime = BSONDateTime(Instant.now().toEpochMilli()), // Long, result: Option[String], + xRequestId: Option[XRequestID] ) object SentMessageItem { given BSONDocumentWriter[TransportInfo] = Macros.writer[TransportInfo] diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/db/MessageItemRepo.scala b/mediator/src/main/scala/io/iohk/atala/mediator/db/MessageItemRepo.scala index 931d10ba..f5143477 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/db/MessageItemRepo.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/db/MessageItemRepo.scala @@ -1,6 +1,7 @@ package io.iohk.atala.mediator.db import fmgp.did.* +import fmgp.did.comm.EncryptedMessage import io.iohk.atala.mediator.{StorageCollection, StorageError, StorageThrowable} import reactivemongo.api.bson.* import reactivemongo.api.bson.collection.BSONCollection @@ -25,12 +26,13 @@ class MessageItemRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon .map(_.collection(collectionName)) .mapError(ex => StorageCollection(ex)) - def insert(value: MessageItem): IO[StorageError, WriteResult] = { + def insert(msg: EncryptedMessage): IO[StorageError, WriteResult] = { for { _ <- ZIO.logInfo("insert") + xRequestId <- ZIO.logAnnotations.map(_.get(XRequestId.value)) coll <- collection result <- ZIO - .fromFuture(implicit ec => coll.insert.one(value)) + .fromFuture(implicit ec => coll.insert.one(MessageItem(msg, xRequestId))) .tapError(err => ZIO.logError(s"insert : ${err.getMessage}")) .mapError(ex => StorageThrowable(ex)) } yield result diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/db/UserAccountRepo.scala b/mediator/src/main/scala/io/iohk/atala/mediator/db/UserAccountRepo.scala index f330b622..47362a84 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/db/UserAccountRepo.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/db/UserAccountRepo.scala @@ -52,10 +52,10 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon .collect[Seq](1, Cursor.FailOnError[Seq[DidAccount]]()) // Just one .map(_.headOption) ) - .tapError(err => ZIO.logError(s"Insert newDidAccount (check condition setp): ${err.getMessage}")) + .tapError(err => ZIO.logError(s"Insert newDidAccount (check condition step): ${err.getMessage}")) .mapError(ex => StorageThrowable(ex)) result <- findR match - case Some(data) if data.did != did => ZIO.left("Fail found document: " + data) + case Some(data) if data.did != did => ZIO.left(s"Fail found document: $data") case Some(old) => ZIO.right(old) case None => val value = DidAccount( @@ -68,7 +68,7 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon .map(e => e.n match case 1 => Right(value) - case _ => Left("Fail to inserte: " + e.toString()) + case _ => Left(s"Fail to insert: ${e.toString}") ) .tapError(err => ZIO.logError(s"Insert newDidAccount : ${err.getMessage}")) .mapError(ex => StorageThrowable(ex)) @@ -143,7 +143,10 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon /** @return * number of documents updated in DB */ - def addToInboxes(recipients: Set[DIDSubject], msg: EncryptedMessage): ZIO[Any, StorageError, Int] = { + def addToInboxes( + recipients: Set[DIDSubject], + msg: EncryptedMessage + ): ZIO[Any, StorageError, Int] = { def selector = BSONDocument( "alias" -> BSONDocument("$in" -> recipients.map(_.did)), @@ -160,22 +163,23 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon ) ) - def update: BSONDocument = BSONDocument( + def update(xRequestId: Option[XRequestID]): BSONDocument = BSONDocument( "$push" -> BSONDocument( "messagesRef" -> BSONDocument( "$each" -> - recipients.map(recipient => MessageMetaData(msg.sha1, recipient)) + recipients.map(recipient => MessageMetaData(msg.sha1, recipient, xRequestId)) ) ) ) for { _ <- ZIO.logInfo("addToInboxes") + xRequestId <- ZIO.logAnnotations.map(_.get(XRequestId.value)) coll <- collection result <- ZIO .fromFuture(implicit ec => coll.update - .one(selector, update) // Just one + .one(selector, update(xRequestId)) // Just one ) .tapError(err => ZIO.logError(s"addToInboxes : ${err.getMessage}")) .mapError(ex => StorageThrowable(ex)) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/db/XRequestId.scala b/mediator/src/main/scala/io/iohk/atala/mediator/db/XRequestId.scala new file mode 100644 index 00000000..d86e4698 --- /dev/null +++ b/mediator/src/main/scala/io/iohk/atala/mediator/db/XRequestId.scala @@ -0,0 +1,4 @@ +package io.iohk.atala.mediator.db + +case object XRequestId: + inline def value: String = "x-request-id" diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala index cf0794c2..15275274 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala @@ -38,7 +38,7 @@ object ForwardMessageExecuter msg <- if (numbreOfUpdated > 0) { // Or maybe we can add all the time for { - _ <- repoMessageItem.insert(MessageItem(m.msg)) + _ <- repoMessageItem.insert(m.msg) _ <- ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo") } yield NoReply } else { diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/db/MessageItemRepoSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/db/MessageItemRepoSpec.scala index f2f7e158..e8f0d444 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/db/MessageItemRepoSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/db/MessageItemRepoSpec.scala @@ -14,14 +14,18 @@ object MessageItemRepoSpec extends ZIOSpecDefault with AccountStubSetup { override def spec = suite("MessageItemSpec")( test("insert message") { - for { - messageItem <- ZIO.service[MessageItemRepo] - msg <- ZIO.fromEither(encryptedMessageAlice) - result <- messageItem.insert(MessageItem(msg)) - } yield { - assertTrue(result.writeErrors == Nil) && - assertTrue(result.n == 1) + ZIO.logAnnotate(XRequestId.value, "b373423c-c78f-4cbc-a3fe-89cbc1351835") { + for { + messageItem <- ZIO.service[MessageItemRepo] + + msg <- ZIO.fromEither(encryptedMessageAlice) + result <- messageItem.insert(msg) + } yield { + assertTrue(result.writeErrors == Nil) && + assertTrue(result.n == 1) + } } + }, test("findById message") { for { @@ -29,19 +33,30 @@ object MessageItemRepoSpec extends ZIOSpecDefault with AccountStubSetup { msg <- ZIO.fromEither(encryptedMessageAlice) result <- messageItem.findById(msg.sha1) } yield { - assertTrue(result.contains(MessageItem(msg))) + val outcome = result.forall { messageItem => + messageItem.msg == msg && + messageItem._id == msg.sha1 && + messageItem.xRequestId.contains("b373423c-c78f-4cbc-a3fe-89cbc1351835") + } + assertTrue(outcome) } }, test("findByIds messages") { - for { - messageItem <- ZIO.service[MessageItemRepo] - msg <- ZIO.fromEither(encryptedMessageAlice) - msg2 <- ZIO.fromEither(encryptedMessageBob) - msg2Added <- messageItem.insert(MessageItem(msg2)) - result <- messageItem.findByIds(Seq(msg.sha1, msg2.sha1)) - } yield { - assertTrue(result.contains(MessageItem(msg))) && - assertTrue(result.contains(MessageItem(msg2))) + ZIO.logAnnotate(XRequestId.value, "b373423c-c78f-4cbc-a3fe-89cbc1351835") { + for { + messageItem <- ZIO.service[MessageItemRepo] + msg <- ZIO.fromEither(encryptedMessageAlice) + msg2 <- ZIO.fromEither(encryptedMessageBob) + msg2Added <- messageItem.insert(msg2) + result <- messageItem.findByIds(Seq(msg.sha1, msg2.sha1)) + } yield { + val outcome = result.forall { messageItem => + Seq(msg, msg2).contains(messageItem.msg) && + Seq(msg.sha1, msg2.sha1).contains(messageItem._id) && + messageItem.xRequestId.contains("b373423c-c78f-4cbc-a3fe-89cbc1351835") + } + assertTrue(outcome) + } } } ).provideLayerShared( diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/db/UserAccountRepoSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/db/UserAccountRepoSpec.scala index 7d0a2021..844e0130 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/db/UserAccountRepoSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/db/UserAccountRepoSpec.scala @@ -133,28 +133,32 @@ object UserAccountRepoSpec extends ZIOSpecDefault with AccountStubSetup { } }, test("addMessage to inbox for given Account") { - for { - userAccount <- ZIO.service[UserAccountRepo] - messageItem <- ZIO.service[MessageItemRepo] - result <- userAccount.addAlias(DIDSubject(alice), DIDSubject(bob)) - msg <- ZIO.fromEither(encryptedMessageAlice) - msgAdded <- messageItem.insert(MessageItem(msg)) - addedToInbox <- userAccount.addToInboxes(Set(DIDSubject(bob)), msg) - didAccount <- userAccount.getDidAccount(DIDSubject(alice)) - } yield { - val messageMetaData: Seq[MessageMetaData] = didAccount.map(_.messagesRef).getOrElse(Seq.empty) + val xRequestId = "b373423c-c78f-4cbc-a3fe-89cbc1351835" + ZIO.logAnnotate(XRequestId.value, xRequestId) { + for { + userAccount <- ZIO.service[UserAccountRepo] + messageItem <- ZIO.service[MessageItemRepo] + result <- userAccount.addAlias(DIDSubject(alice), DIDSubject(bob)) + msg <- ZIO.fromEither(encryptedMessageAlice) + msgAdded <- messageItem.insert(msg) + addedToInbox <- userAccount.addToInboxes(Set(DIDSubject(bob)), msg) + didAccount <- userAccount.getDidAccount(DIDSubject(alice)) + } yield { + val messageMetaData: Seq[MessageMetaData] = didAccount.map(_.messagesRef).getOrElse(Seq.empty) - assertTrue(result.isRight) && - assertTrue(result == Right(1)) && - assertTrue(msgAdded.writeErrors == Nil) && - assertTrue(msgAdded.n == 1) && - assertTrue(addedToInbox == 1) && - assert(messageMetaData)( - forall( - hasField("hash", (m: MessageMetaData) => m.hash, equalTo(msg.sha1)) - && hasField("recipient", (m: MessageMetaData) => m.recipient, equalTo(DIDSubject(bob))) + assertTrue(result.isRight) && + assertTrue(result == Right(1)) && + assertTrue(msgAdded.writeErrors == Nil) && + assertTrue(msgAdded.n == 1) && + assertTrue(addedToInbox == 1) && + assert(messageMetaData)( + forall( + hasField("hash", (m: MessageMetaData) => m.hash, equalTo(msg.sha1)) + && hasField("recipient", (m: MessageMetaData) => m.recipient, equalTo(DIDSubject(bob))) + && hasField("xRequestId", (m: MessageMetaData) => m.xRequestId, equalTo(Some(xRequestId))) + ) ) - ) + } } }, test("mark message as delivered given did") {