Skip to content

Commit

Permalink
Feat/message as component (#541)
Browse files Browse the repository at this point in the history
* feat: move messages to components

* feat: add ChannelMessageReference

allows to link from an operation to the message within a specific channel

* chore: update asyncapi.json

* feat(ui): update after message was moved to components

* fix(ui): map operation binding correctly

* feat(core): remove ChannelMessageReference

* test(example): simplify creation of asyncapi.json files

* test(asyncapi): change channel messages type back to Message

* refactor(asyncapi): rename MessageReference static message

Better indicate what the reference points to

* chore: spotless

* test: fix DefaultChannelsServiceIntegrationTest

* test(kafka): remove duplicated primitite-topic consumer

* feat(core): use AsyncApi 3.0 OperationAction

* feat(ui): show contact info

* chore: update dependency list
  • Loading branch information
timonback committed Jan 19, 2024
1 parent a9e65e6 commit 67cd62d
Show file tree
Hide file tree
Showing 59 changed files with 1,002 additions and 981 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,27 @@ public String getRef() {

/**
* Convenient Builder to create a Message reference to an existing Message
* @param message Message to create the reference to. This Message MUST have a 'messageId' field
*
* @param message Message to create the reference to. This Message MUST have a 'messageName' field
* @return a Message with the 'ref' field pointing to "#/components/messages/{messageName}"
*/
public static MessageReference fromMessage(MessageObject message) {
return fromMessage(message.getName());
public static MessageReference toComponentMessage(MessageObject message) {
return toComponentMessage(message.getName());
}

public static MessageReference fromMessage(String messageName) {
public static MessageReference toComponentMessage(String messageName) {
return new MessageReference("#/components/messages/" + messageName);
}

public static MessageReference fromSchema(String schemaName) {
public static MessageReference toChannelMessage(String channelName, MessageObject message) {
return new MessageReference("#/channels/" + channelName + "/messages/" + message.getName());
}

public static MessageReference toChannelMessage(String channelName, String messageName) {
return new MessageReference("#/channels/" + channelName + "/messages/" + messageName);
}

public static MessageReference toSchema(String schemaName) {
return new MessageReference("#/components/schemas/" + schemaName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void shouldCreateSimpleAsyncAPI() throws IOException {
var channelUserSignedup = ChannelObject.builder()
.channelId("userSignedup")
.address("user/signedup")
.messages(Map.of(userSignUpMessage.getMessageId(), MessageReference.fromMessage("UserSignedUp")))
.messages(Map.of(userSignUpMessage.getMessageId(), MessageReference.toComponentMessage("UserSignedUp")))
.build();

AsyncAPI asyncAPI = AsyncAPI.builder()
Expand All @@ -74,8 +74,8 @@ void shouldCreateSimpleAsyncAPI() throws IOException {
Operation.builder()
.action(OperationAction.SEND)
.channel(ChannelReference.fromChannel(channelUserSignedup))
.messages(
List.of(new MessageReference("#/channels/userSignedup/messages/UserSignedUp")))
.messages(List.of(MessageReference.toChannelMessage(
"userSignedup", userSignUpMessage.getMessageId())))
.build()))
.components(Components.builder()
.messages(Map.of(userSignUpMessage.getMessageId(), userSignUpMessage))
Expand All @@ -98,7 +98,7 @@ void shouldCreateStreetlightsKafkaAsyncAPI() throws IOException {
.traits(List.of(MessageTrait.builder()
.ref("#/components/messageTraits/commonHeaders")
.build()))
.payload(MessagePayload.of(MessageReference.fromSchema("lightMeasuredPayload")))
.payload(MessagePayload.of(MessageReference.toSchema("lightMeasuredPayload")))
.build();

var turnOnOffMessage = MessageObject.builder()
Expand All @@ -109,7 +109,7 @@ void shouldCreateStreetlightsKafkaAsyncAPI() throws IOException {
.traits(List.of(MessageTrait.builder()
.ref("#/components/messageTraits/commonHeaders")
.build()))
.payload(MessagePayload.of(MessageReference.fromSchema("turnOnOffPayload")))
.payload(MessagePayload.of(MessageReference.toSchema("turnOnOffPayload")))
.build();

var dimLightMessage = MessageObject.builder()
Expand All @@ -120,7 +120,7 @@ void shouldCreateStreetlightsKafkaAsyncAPI() throws IOException {
.traits(List.of(MessageTrait.builder()
.ref("#/components/messageTraits/commonHeaders")
.build()))
.payload(MessagePayload.of(MessageReference.fromSchema("dimLightPayload")))
.payload(MessagePayload.of(MessageReference.toSchema("dimLightPayload")))
.build();

AsyncAPI asyncAPI = AsyncAPI.builder()
Expand Down Expand Up @@ -190,7 +190,8 @@ void shouldCreateStreetlightsKafkaAsyncAPI() throws IOException {
"lightingMeasured",
ChannelObject.builder()
.address("smartylighting.streetlights.1.0.event.{streetlightId}.lighting.measured")
.messages(Map.of("lightMeasured", MessageReference.fromMessage(lightMeasuredMessage)))
.messages(Map.of(
"lightMeasured", MessageReference.toComponentMessage(lightMeasuredMessage)))
.description("The topic on which measured values may be produced and consumed.")
.parameters(Map.of(
"streetlightId",
Expand All @@ -201,7 +202,7 @@ void shouldCreateStreetlightsKafkaAsyncAPI() throws IOException {
"lightTurnOn",
ChannelObject.builder()
.address("smartylighting.streetlights.1.0.action.{streetlightId}.turn.on")
.messages(Map.of("turnOn", MessageReference.fromMessage(turnOnOffMessage)))
.messages(Map.of("turnOn", MessageReference.toComponentMessage(turnOnOffMessage)))
.parameters(Map.of(
"streetlightId",
ChannelParameter.builder()
Expand All @@ -211,7 +212,7 @@ void shouldCreateStreetlightsKafkaAsyncAPI() throws IOException {
"lightTurnOff",
ChannelObject.builder()
.address("smartylighting.streetlights.1.0.action.{streetlightId}.turn.off")
.messages(Map.of("turnOff", MessageReference.fromMessage(turnOnOffMessage)))
.messages(Map.of("turnOff", MessageReference.toComponentMessage(turnOnOffMessage)))
.parameters(Map.of(
"streetlightId",
ChannelParameter.builder()
Expand All @@ -221,7 +222,7 @@ void shouldCreateStreetlightsKafkaAsyncAPI() throws IOException {
"lightsDim",
ChannelObject.builder()
.address("smartylighting.streetlights.1.0.action.{streetlightId}.dim")
.messages(Map.of("dimLight", MessageReference.fromMessage(dimLightMessage)))
.messages(Map.of("dimLight", MessageReference.toComponentMessage(dimLightMessage)))
.parameters(Map.of(
"streetlightId",
ChannelParameter.builder()
Expand All @@ -239,8 +240,8 @@ void shouldCreateStreetlightsKafkaAsyncAPI() throws IOException {
.traits(List.of(OperationTraits.builder()
.ref("#/components/operationTraits/kafka")
.build()))
.messages(List.of(
new MessageReference("#/channels/lightingMeasured/messages/lightMeasured")))
.messages(
List.of(MessageReference.toChannelMessage("lightingMeasured", "lightMeasured")))
.build(),
"turnOn",
Operation.builder()
Expand All @@ -251,7 +252,7 @@ void shouldCreateStreetlightsKafkaAsyncAPI() throws IOException {
.traits(List.of(OperationTraits.builder()
.ref("#/components/operationTraits/kafka")
.build()))
.messages(List.of(new MessageReference("#/channels/lightTurnOn/messages/turnOn")))
.messages(List.of(MessageReference.toChannelMessage("lightTurnOn", "turnOn")))
.build(),
"turnOff",
Operation.builder()
Expand All @@ -262,7 +263,7 @@ void shouldCreateStreetlightsKafkaAsyncAPI() throws IOException {
.traits(List.of(OperationTraits.builder()
.ref("#/components/operationTraits/kafka")
.build()))
.messages(List.of(new MessageReference("#/channels/lightTurnOff/messages/turnOff")))
.messages(List.of(MessageReference.toChannelMessage("lightTurnOff", "turnOff")))
.build(),
"dimLight",
Operation.builder()
Expand All @@ -273,7 +274,7 @@ void shouldCreateStreetlightsKafkaAsyncAPI() throws IOException {
.traits(List.of(OperationTraits.builder()
.ref("#/components/operationTraits/kafka")
.build()))
.messages(List.of(new MessageReference("#/channels/lightsDim/messages/dimLight")))
.messages(List.of(MessageReference.toChannelMessage("lightsDim", "dimLight")))
.build()))
.components(Components.builder()
.messages(Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ void shouldSerializeChannelObject() throws IOException {
.description("This channel is used to exchange messages about user events.")
.messages(Map.of(
"userSignedUp",
MessageReference.fromMessage("userSignedUp"),
MessageReference.toComponentMessage("userSignedUp"),
"userCompletedOrder",
MessageReference.fromMessage("userCompletedOrder")))
MessageReference.toComponentMessage("userCompletedOrder")))
.parameters(Map.of(
"userId",
ChannelParameter.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class OperationTest {

@Test
void shouldSerializeOperation() throws IOException {
MessageReference userSignedUpReply = MessageReference.toComponentMessage("userSignedUpReply");
var operation = Operation.builder()
.title("User sign up")
.summary("Action to sign a user up.")
Expand All @@ -37,15 +38,15 @@ void shouldSerializeOperation() throws IOException {
.traits(List.of(OperationTraits.builder()
.ref("#/components/operationTraits/kafka")
.build()))
.messages(List.of(MessageReference.fromMessage("userSignedUp")))
.messages(List.of(MessageReference.toComponentMessage("userSignedUp")))
.reply(OperationReply.builder()
.address(OperationReplyAddress.builder()
.location("$message.header#/replyTo")
.build())
.channel(ChannelReference.builder()
.ref("#/channels/userSignupReply")
.build())
.messages(List.of(MessageReference.fromMessage("userSignedUpReply")))
.messages(List.of(userSignedUpReply))
.build())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import io.github.stavshamir.springwolf.asyncapi.scanners.classes.ComponentClassScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.classes.ConfigurationClassScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.classes.SpringwolfClassScanner;
import io.github.stavshamir.springwolf.asyncapi.types.OperationData;
import io.github.stavshamir.springwolf.asyncapi.v3.model.operation.OperationAction;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand Down Expand Up @@ -121,8 +121,8 @@ public AsyncOperation getAsyncOperation(AsyncListener annotation) {
}

@Override
public OperationData.OperationType getOperationType() {
return OperationData.OperationType.PUBLISH;
public OperationAction getOperationType() {
return OperationAction.SEND;
}
};
}
Expand All @@ -141,8 +141,8 @@ public AsyncOperation getAsyncOperation(AsyncPublisher annotation) {
}

@Override
public OperationData.OperationType getOperationType() {
return OperationData.OperationType.SUBSCRIBE;
public OperationAction getOperationType() {
return OperationAction.RECEIVE;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ protected synchronized void initAsyncAPI() {
Map<String, Operation> operations = channelsService.findOperations();

Components components = Components.builder()
.schemas(schemasService.getDefinitions())
.schemas(schemasService.getSchemas())
.messages(schemasService.getMessages())
.build();

AsyncAPI asyncAPI = AsyncAPI.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.stavshamir.springwolf.asyncapi;

import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.Message;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.MessageObject;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.MessageReference;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand All @@ -23,18 +21,12 @@ public class MessageHelper {

private MessageHelper() {}

public static Map<String, Message> toMessagesMap(Set<MessageObject> messages) {
public static Map<String, MessageReference> toMessagesMap(Set<MessageObject> messages) {
if (messages.isEmpty()) {
throw new IllegalArgumentException("messages must not be empty");
}

return new ArrayList<>(messages.stream().collect(Collectors.toCollection(messageSupplier)))
.stream().collect(Collectors.toMap(MessageObject::getMessageId, Function.identity()));
}

// FIXME: Do we need this method?
@SuppressWarnings("unchecked")
public static Set<Message> messageObjectToSet(Map<String, Message> messages) {
return new HashSet<>(messages.values());
.stream().collect(Collectors.toMap(MessageObject::getName, MessageReference::toComponentMessage));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class PublishingPayloadCreator {
public Result createPayloadObject(MessageDto message) {
String messagePayloadType = message.getPayloadType();

List<String> knownSchemaNames = schemasService.getDefinitions().values().stream()
List<String> knownSchemaNames = schemasService.getSchemas().values().stream()
.map(Schema::getName)
.toList();
for (String schemaPayloadType : knownSchemaNames) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;

import io.github.stavshamir.springwolf.asyncapi.MessageHelper;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.Channel;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.ChannelObject;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.Message;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.MessageObject;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.MessageReference;
import io.github.stavshamir.springwolf.asyncapi.v3.model.operation.Operation;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.github.stavshamir.springwolf.asyncapi.MessageHelper.toMessagesMap;

/**
* Util to merge multiple {@link Channel}s
Expand Down Expand Up @@ -79,10 +72,21 @@ public static Map<String, Operation> mergeOperations(List<Map.Entry<String, Oper
private static ChannelObject mergeChannel(ChannelObject channel, ChannelObject otherChannel) {
ChannelObject mergedChannel = channel != null ? channel : otherChannel;

Set<MessageObject> mergedMessages = mergeMessages(getMessages(channel), getMessages(otherChannel));
Map<String, Message> channelMessages = channel.getMessages();
Map<String, Message> otherChannelMessages = otherChannel.getMessages();

Map<String, Message> mergedMessages = new HashMap<>();
if (channelMessages != null) {
mergedMessages.putAll(channelMessages);
}
if (otherChannelMessages != null) {
otherChannelMessages.forEach(mergedMessages::putIfAbsent);
}

if (!mergedMessages.isEmpty()) {
mergedChannel.setMessages(toMessagesMap(mergedMessages));
mergedChannel.setMessages(mergedMessages);
}

return mergedChannel;
}

Expand All @@ -97,24 +101,8 @@ private static Operation mergeOperation(Operation operation, Operation otherOper
return mergedOperation;
}

private static Set<MessageObject> mergeMessages(Set<Message> messages, Set<Message> otherMessages) {
// FIXME: We will lose any MessageReference we get
Map<String, MessageObject> nameToMessage = messages.stream()
.filter(MessageObject.class::isInstance)
.map(MessageObject.class::cast)
.collect(Collectors.toMap(MessageObject::getName, Function.identity()));

for (Message otherMessage : otherMessages) {
if (otherMessage instanceof MessageObject otherMessageObject) {
nameToMessage.putIfAbsent(otherMessageObject.getName(), otherMessageObject);
}
}

return new HashSet<>(nameToMessage.values());
}

private static List<MessageReference> mergeMessageReferences(
List<MessageReference> messages, List<MessageReference> otherMessages) {
Collection<MessageReference> messages, Collection<MessageReference> otherMessages) {
var messageReferences = new HashSet<MessageReference>();
if (messages != null) {
messageReferences.addAll(messages);
Expand All @@ -124,11 +112,4 @@ private static List<MessageReference> mergeMessageReferences(
}
return messageReferences.stream().toList();
}

private static Set<Message> getMessages(ChannelObject channel) {
return Optional.ofNullable(channel)
.map(ChannelObject::getMessages)
.map(MessageHelper::messageObjectToSet)
.orElseGet(HashSet::new);
}
}
Loading

0 comments on commit 67cd62d

Please sign in to comment.