Skip to content

Commit

Permalink
Added XRequestId as part of MessageMetaData and MessageItem Transport… (
Browse files Browse the repository at this point in the history
#113)

Added XRequestId as part of MessageMetaData and MessageItem TransportInfo for identifying the message received and send message

Signed-off-by: Shailesh Patil <shailesh.patil@iohk.io>
Signed-off-by: Shailesh <Patil>
  • Loading branch information
mineme0110 authored and Shailesh committed Apr 30, 2024
1 parent da56d94 commit 6ad106f
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ object ActionUtils {
for {

outboxRepo <- ZIO.service[OutboxMessageRepo]
xRequestId <- ZIO.logAnnotations.map(_.get(XRequestId.value))
// TODO forward message
maybeSyncReplyMsg: Option[SignedMessage | EncryptedMessage] <- reply.to.map(_.toSeq) match // TODO improve
case None =>
Expand Down Expand Up @@ -99,6 +100,7 @@ object ActionUtils {
case str: String => Some(str)
case _: Unit => None
,
xRequestId = xRequestId
)
) // Maybe fork
.catchAll { case error => ZIO.logError(s"Store Outbox Error: $error") }
Expand Down Expand Up @@ -132,6 +134,7 @@ object ActionUtils {
distination = None,
sendMethod = MessageSendMethod.INLINE_REPLY,
result = None,
xRequestId = xRequestId,
)
)
.catchAll { case error => ZIO.logError(s"Store Outbox Error: $error") }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ case class MediatorAgent(
protocolHandler <- ZIO.service[ProtocolExecuter[Services, MediatorError | StorageError]]
plaintextMessage <- decrypt(msg)
maybeActionStorageError <- messageItemRepo
.insert(MessageItem(msg)) // store all message
.insert(msg) // store all message
.map(_ /*WriteResult*/ => None
// TODO messages already on the database -> so this might be a replay attack
)
Expand Down
45 changes: 36 additions & 9 deletions mediator/src/main/scala/io/iohk/atala/mediator/db/DataModels.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,35 @@ import zio.json._

type HASH = String
// messages
type XRequestID = String // x-request-id

case class MessageItem(_id: HASH, msg: EncryptedMessage, headers: ProtectedHeader)
case class MessageItem(
_id: HASH,
msg: EncryptedMessage,
headers: ProtectedHeader,
ts: String,
xRequestId: Option[XRequestID]
)
object MessageItem {
def apply(msg: EncryptedMessage): MessageItem = {
new MessageItem(msg.sha1, msg, msg.`protected`.obj)
def apply(msg: EncryptedMessage, xRequestId: Option[XRequestID]): MessageItem = {
new MessageItem(msg.sha1, msg, msg.`protected`.obj, Instant.now().toString, xRequestId)
}
given BSONDocumentWriter[MessageItem] = Macros.writer[MessageItem]
given BSONDocumentReader[MessageItem] = Macros.reader[MessageItem]
}

case class MessageMetaData(hash: HASH, recipient: DIDSubject, state: Boolean, ts: String)
case class MessageMetaData(
hash: HASH,
recipient: DIDSubject,
state: Boolean,
ts: String,
xRequestId: Option[XRequestID]
)
object MessageMetaData {
given BSONDocumentWriter[MessageMetaData] = Macros.writer[MessageMetaData]
given BSONDocumentReader[MessageMetaData] = Macros.reader[MessageMetaData]
def apply(hash: HASH, recipient: DIDSubject) = {
new MessageMetaData(hash = hash, recipient = recipient, state = false, ts = Instant.now().toString)
def apply(hash: HASH, recipient: DIDSubject, xRequestId: Option[XRequestID]): MessageMetaData = {
new MessageMetaData(hash = hash, recipient = recipient, state = false, ts = Instant.now().toString, xRequestId)
}
}

Expand Down Expand Up @@ -59,7 +72,8 @@ object SentMessageItem {
recipient: Set[TO],
distination: Option[String],
sendMethod: MessageSendMethod,
result: Option[String]
result: Option[String],
xRequestId: Option[XRequestID]
): SentMessageItem = {
msg match
case sMsg: SignedMessage =>
Expand All @@ -69,7 +83,13 @@ object SentMessageItem {
headers = sMsg.signatures.headOption.flatMap(_.`protected`.obj.toJsonAST.toOption).getOrElse(ast.Json.Null),
plaintext = plaintext,
transport = Seq(
TransportInfo(recipient = recipient, distination = distination, sendMethod = sendMethod, result = result)
TransportInfo(
recipient = recipient,
distination = distination,
sendMethod = sendMethod,
result = result,
xRequestId = xRequestId
)
)
)
case eMsg: EncryptedMessage =>
Expand All @@ -79,7 +99,13 @@ object SentMessageItem {
headers = eMsg.`protected`.obj.toJsonAST.getOrElse(ast.Json.Null),
plaintext = plaintext,
transport = Seq(
TransportInfo(recipient = recipient, distination = distination, sendMethod = sendMethod, result = result)
TransportInfo(
recipient = recipient,
distination = distination,
sendMethod = sendMethod,
result = result,
xRequestId = xRequestId
)
)
)

Expand All @@ -100,6 +126,7 @@ object SentMessageItem {
sendMethod: MessageSendMethod,
timestamp: BSONDateTime = BSONDateTime(Instant.now().toEpochMilli()), // Long,
result: Option[String],
xRequestId: Option[XRequestID]
)
object SentMessageItem {
given BSONDocumentWriter[TransportInfo] = Macros.writer[TransportInfo]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.iohk.atala.mediator.db

import fmgp.did.*
import fmgp.did.comm.EncryptedMessage
import io.iohk.atala.mediator.{StorageCollection, StorageError, StorageThrowable}
import reactivemongo.api.bson.*
import reactivemongo.api.bson.collection.BSONCollection
Expand All @@ -25,12 +26,13 @@ class MessageItemRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon
.map(_.collection(collectionName))
.mapError(ex => StorageCollection(ex))

def insert(value: MessageItem): IO[StorageError, WriteResult] = {
def insert(msg: EncryptedMessage): IO[StorageError, WriteResult] = {
for {
_ <- ZIO.logInfo("insert")
xRequestId <- ZIO.logAnnotations.map(_.get(XRequestId.value))
coll <- collection
result <- ZIO
.fromFuture(implicit ec => coll.insert.one(value))
.fromFuture(implicit ec => coll.insert.one(MessageItem(msg, xRequestId)))
.tapError(err => ZIO.logError(s"insert : ${err.getMessage}"))
.mapError(ex => StorageThrowable(ex))
} yield result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon
.collect[Seq](1, Cursor.FailOnError[Seq[DidAccount]]()) // Just one
.map(_.headOption)
)
.tapError(err => ZIO.logError(s"Insert newDidAccount (check condition setp): ${err.getMessage}"))
.tapError(err => ZIO.logError(s"Insert newDidAccount (check condition step): ${err.getMessage}"))
.mapError(ex => StorageThrowable(ex))
result <- findR match
case Some(data) if data.did != did => ZIO.left("Fail found document: " + data)
case Some(data) if data.did != did => ZIO.left(s"Fail found document: $data")
case Some(old) => ZIO.right(old)
case None =>
val value = DidAccount(
Expand All @@ -68,7 +68,7 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon
.map(e =>
e.n match
case 1 => Right(value)
case _ => Left("Fail to inserte: " + e.toString())
case _ => Left(s"Fail to insert: ${e.toString}")
)
.tapError(err => ZIO.logError(s"Insert newDidAccount : ${err.getMessage}"))
.mapError(ex => StorageThrowable(ex))
Expand Down Expand Up @@ -143,7 +143,10 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon
/** @return
* number of documents updated in DB
*/
def addToInboxes(recipients: Set[DIDSubject], msg: EncryptedMessage): ZIO[Any, StorageError, Int] = {
def addToInboxes(
recipients: Set[DIDSubject],
msg: EncryptedMessage
): ZIO[Any, StorageError, Int] = {
def selector =
BSONDocument(
"alias" -> BSONDocument("$in" -> recipients.map(_.did)),
Expand All @@ -160,22 +163,23 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon
)
)

def update: BSONDocument = BSONDocument(
def update(xRequestId: Option[XRequestID]): BSONDocument = BSONDocument(
"$push" -> BSONDocument(
"messagesRef" -> BSONDocument(
"$each" ->
recipients.map(recipient => MessageMetaData(msg.sha1, recipient))
recipients.map(recipient => MessageMetaData(msg.sha1, recipient, xRequestId))
)
)
)

for {
_ <- ZIO.logInfo("addToInboxes")
xRequestId <- ZIO.logAnnotations.map(_.get(XRequestId.value))
coll <- collection
result <- ZIO
.fromFuture(implicit ec =>
coll.update
.one(selector, update) // Just one
.one(selector, update(xRequestId)) // Just one
)
.tapError(err => ZIO.logError(s"addToInboxes : ${err.getMessage}"))
.mapError(ex => StorageThrowable(ex))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package io.iohk.atala.mediator.db

case object XRequestId:
inline def value: String = "x-request-id"
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object ForwardMessageExecuter
msg <-
if (numbreOfUpdated > 0) { // Or maybe we can add all the time
for {
_ <- repoMessageItem.insert(MessageItem(m.msg))
_ <- repoMessageItem.insert(m.msg)
_ <- ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo")
} yield NoReply
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,49 @@ object MessageItemRepoSpec extends ZIOSpecDefault with AccountStubSetup {

override def spec = suite("MessageItemSpec")(
test("insert message") {
for {
messageItem <- ZIO.service[MessageItemRepo]
msg <- ZIO.fromEither(encryptedMessageAlice)
result <- messageItem.insert(MessageItem(msg))
} yield {
assertTrue(result.writeErrors == Nil) &&
assertTrue(result.n == 1)
ZIO.logAnnotate(XRequestId.value, "b373423c-c78f-4cbc-a3fe-89cbc1351835") {
for {
messageItem <- ZIO.service[MessageItemRepo]

msg <- ZIO.fromEither(encryptedMessageAlice)
result <- messageItem.insert(msg)
} yield {
assertTrue(result.writeErrors == Nil) &&
assertTrue(result.n == 1)
}
}

},
test("findById message") {
for {
messageItem <- ZIO.service[MessageItemRepo]
msg <- ZIO.fromEither(encryptedMessageAlice)
result <- messageItem.findById(msg.sha1)
} yield {
assertTrue(result.contains(MessageItem(msg)))
val outcome = result.forall { messageItem =>
messageItem.msg == msg &&
messageItem._id == msg.sha1 &&
messageItem.xRequestId.contains("b373423c-c78f-4cbc-a3fe-89cbc1351835")
}
assertTrue(outcome)
}
},
test("findByIds messages") {
for {
messageItem <- ZIO.service[MessageItemRepo]
msg <- ZIO.fromEither(encryptedMessageAlice)
msg2 <- ZIO.fromEither(encryptedMessageBob)
msg2Added <- messageItem.insert(MessageItem(msg2))
result <- messageItem.findByIds(Seq(msg.sha1, msg2.sha1))
} yield {
assertTrue(result.contains(MessageItem(msg))) &&
assertTrue(result.contains(MessageItem(msg2)))
ZIO.logAnnotate(XRequestId.value, "b373423c-c78f-4cbc-a3fe-89cbc1351835") {
for {
messageItem <- ZIO.service[MessageItemRepo]
msg <- ZIO.fromEither(encryptedMessageAlice)
msg2 <- ZIO.fromEither(encryptedMessageBob)
msg2Added <- messageItem.insert(msg2)
result <- messageItem.findByIds(Seq(msg.sha1, msg2.sha1))
} yield {
val outcome = result.forall { messageItem =>
Seq(msg, msg2).contains(messageItem.msg) &&
Seq(msg.sha1, msg2.sha1).contains(messageItem._id) &&
messageItem.xRequestId.contains("b373423c-c78f-4cbc-a3fe-89cbc1351835")
}
assertTrue(outcome)
}
}
}
).provideLayerShared(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,28 +133,32 @@ object UserAccountRepoSpec extends ZIOSpecDefault with AccountStubSetup {
}
},
test("addMessage to inbox for given Account") {
for {
userAccount <- ZIO.service[UserAccountRepo]
messageItem <- ZIO.service[MessageItemRepo]
result <- userAccount.addAlias(DIDSubject(alice), DIDSubject(bob))
msg <- ZIO.fromEither(encryptedMessageAlice)
msgAdded <- messageItem.insert(MessageItem(msg))
addedToInbox <- userAccount.addToInboxes(Set(DIDSubject(bob)), msg)
didAccount <- userAccount.getDidAccount(DIDSubject(alice))
} yield {
val messageMetaData: Seq[MessageMetaData] = didAccount.map(_.messagesRef).getOrElse(Seq.empty)
val xRequestId = "b373423c-c78f-4cbc-a3fe-89cbc1351835"
ZIO.logAnnotate(XRequestId.value, xRequestId) {
for {
userAccount <- ZIO.service[UserAccountRepo]
messageItem <- ZIO.service[MessageItemRepo]
result <- userAccount.addAlias(DIDSubject(alice), DIDSubject(bob))
msg <- ZIO.fromEither(encryptedMessageAlice)
msgAdded <- messageItem.insert(msg)
addedToInbox <- userAccount.addToInboxes(Set(DIDSubject(bob)), msg)
didAccount <- userAccount.getDidAccount(DIDSubject(alice))
} yield {
val messageMetaData: Seq[MessageMetaData] = didAccount.map(_.messagesRef).getOrElse(Seq.empty)

assertTrue(result.isRight) &&
assertTrue(result == Right(1)) &&
assertTrue(msgAdded.writeErrors == Nil) &&
assertTrue(msgAdded.n == 1) &&
assertTrue(addedToInbox == 1) &&
assert(messageMetaData)(
forall(
hasField("hash", (m: MessageMetaData) => m.hash, equalTo(msg.sha1))
&& hasField("recipient", (m: MessageMetaData) => m.recipient, equalTo(DIDSubject(bob)))
assertTrue(result.isRight) &&
assertTrue(result == Right(1)) &&
assertTrue(msgAdded.writeErrors == Nil) &&
assertTrue(msgAdded.n == 1) &&
assertTrue(addedToInbox == 1) &&
assert(messageMetaData)(
forall(
hasField("hash", (m: MessageMetaData) => m.hash, equalTo(msg.sha1))
&& hasField("recipient", (m: MessageMetaData) => m.recipient, equalTo(DIDSubject(bob)))
&& hasField("xRequestId", (m: MessageMetaData) => m.xRequestId, equalTo(Some(xRequestId)))
)
)
)
}
}
},
test("mark message as delivered given did") {
Expand Down

0 comments on commit 6ad106f

Please sign in to comment.