Skip to content

Commit

Permalink
AsyncAPI v3 Migration Fixes (#533)
Browse files Browse the repository at this point in the history
* fix: Multiple fixes related with Channel Messages

Some of the tests are failing due to some AssertJ recursion issue. Help is welcome

* fix: Minor properties fixes

* fix: Fixes to AMQP

Some fixes to the AMQP plugin and example (Still not finished)

* fix: Minor properties fixes

* fix: Migrated asyncapi.json examples to v3

* fix: Migration to AsyncAPI v3

As per https://www.asyncapi.com/docs/migration/migrating-to-v3

For a channel with multiple messages, you specify multiple key-value pairs. For a channel with just one message, you use a single key-value pair.

That means that the use of 'oneOf' is removed.

* fix: Improved support for Channel Messages

Fixed different bugs and limitations with the publishing of Messages in a Channel with the AsyncAPI v3 spec

* fix: Fixed Channel Merger

When merging Channels, the messages were not merged. This is now fixed.

* fix: Added basic support for Operations

Still plenty of errors, but Operations are now appearing in the AsyncAPI v3 output.

* fix: AsyncAPI module test
  • Loading branch information
ctasada authored Jan 12, 2024
1 parent b7ede27 commit c44f5b7
Show file tree
Hide file tree
Showing 58 changed files with 1,571 additions and 903 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,14 @@ public class MessageObject extends ExtendableObject implements Message {
*/
@JsonProperty(value = "traits")
private List<MessageTrait> traits;

/*
* Override the getMessageId to guarantee that there's always a value. Defaults to 'name'
*/
public String getMessageId() {
if (messageId == null) {
return this.name;
}
return messageId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,10 @@ public String getRef() {
/**
* Convenient Builder to create a Message reference to an existing Message
* @param message Message to create the reference to. This Message MUST have a 'messageId' field
* @return a Message with the 'ref' field pointing to "#/components/messages/{messageId"
* @return a Message with the 'ref' field pointing to "#/components/messages/{messageName}"
*/
public static MessageReference fromMessage(MessageObject message) {
var messageId = message.getMessageId();
if (messageId == null) {
throw new IllegalArgumentException("The message must have a 'messageId' defined");
}
return new MessageReference("#/components/messages/" + messageId);
return fromMessage(message.getName());
}

public static MessageReference fromMessage(String messageName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void shouldCreateSimpleAsyncAPI() throws IOException {
var channelUserSignedup = ChannelObject.builder()
.channelId("userSignedup")
.address("user/signedup")
.messages(Map.of(userSignUpMessage.getMessageId(), MessageReference.fromMessage(userSignUpMessage)))
.messages(Map.of(userSignUpMessage.getMessageId(), MessageReference.fromMessage("UserSignedUp")))
.build();

AsyncAPI asyncAPI = AsyncAPI.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package io.github.stavshamir.springwolf.asyncapi;

import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.ChannelObject;
import io.github.stavshamir.springwolf.asyncapi.v3.model.operation.Operation;

import java.util.Map;

Expand All @@ -11,9 +12,16 @@
public interface ChannelsService {

/**
* Detects all available AsyncAPI ChannelItem in the spring context.
* Detects all available AsyncAPI ChannelObject in the spring context.
*
* @return Map of channel names mapping to detected ChannelItems
*/
Map<String, ChannelObject> findChannels();

/**
* Detects all available AsyncAPI Operation in the spring context.
*
* @return Map of operation names mapping to detected Operations
*/
Map<String, Operation> findOperations();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.github.stavshamir.springwolf.asyncapi.types.AsyncAPI;
import io.github.stavshamir.springwolf.asyncapi.types.Components;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.ChannelObject;
import io.github.stavshamir.springwolf.asyncapi.v3.model.operation.Operation;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.schemas.SchemasService;
Expand Down Expand Up @@ -64,6 +65,8 @@ protected synchronized void initAsyncAPI() {
// SchemasService.
Map<String, ChannelObject> channels = channelsService.findChannels();

Map<String, Operation> operations = channelsService.findOperations();

Components components = Components.builder()
.schemas(schemasService.getDefinitions())
.build();
Expand All @@ -74,6 +77,7 @@ protected synchronized void initAsyncAPI() {
.defaultContentType(docket.getDefaultContentType())
.servers(docket.getServers())
.channels(channels)
.operations(operations)
.components(components)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelMerger;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.ChannelObject;
import io.github.stavshamir.springwolf.asyncapi.v3.model.operation.Operation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -31,12 +32,28 @@ public Map<String, ChannelObject> findChannels() {

for (ChannelsScanner scanner : channelsScanners) {
try {
Map<String, ChannelObject> channels = scanner.scan();
Map<String, ChannelObject> channels = scanner.scanChannels();
foundChannelItems.addAll(channels.entrySet());
} catch (Exception e) {
log.error("An error was encountered during channel scanning with {}: {}", scanner, e.getMessage(), e);
}
}
return ChannelMerger.merge(foundChannelItems);
return ChannelMerger.mergeChannels(foundChannelItems);
}

// FIXME
@Override
public Map<String, Operation> findOperations() {
List<Map.Entry<String, Operation>> foundOperations = new ArrayList<>();
for (ChannelsScanner scanner : channelsScanners) {
try {
Map<String, Operation> channels = scanner.scanOperations();
foundOperations.addAll(channels.entrySet());
} catch (Exception e) {
log.error("An error was encountered during operation scanning with {}: {}", scanner, e.getMessage(), e);
}
}

return ChannelMerger.mergeOperations(foundOperations);
}
}
Original file line number Diff line number Diff line change
@@ -1,51 +1,40 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.stavshamir.springwolf.asyncapi;

import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.Message;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.MessageObject;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

@Slf4j
public class MessageHelper {
private static final String ONE_OF = "oneOf";

private static final Comparator<MessageObject> byMessageName = Comparator.comparing(MessageObject::getName);

private static final Supplier<Set<MessageObject>> messageSupplier = () -> new TreeSet<>(byMessageName);

public static Object toMessageObjectOrComposition(Set<MessageObject> messages) {
return switch (messages.size()) {
case 0 -> throw new IllegalArgumentException("messages must not be empty");
case 1 -> messages.toArray()[0];
default -> Map.of(
ONE_OF, new ArrayList<>(messages.stream().collect(Collectors.toCollection(messageSupplier))));
};
}
private MessageHelper() {}

@SuppressWarnings("unchecked")
public static Set<MessageObject> messageObjectToSet(Object messageObject) {
if (messageObject instanceof MessageObject message) {
return new HashSet<>(Collections.singletonList(message));
public static Map<String, Message> toMessagesMap(Set<MessageObject> messages) {
if (messages.isEmpty()) {
throw new IllegalArgumentException("messages must not be empty");
}

if (messageObject instanceof Map) {
List<MessageObject> messages = ((Map<String, List<MessageObject>>) messageObject).get(ONE_OF);
return new HashSet<>(messages);
}
return new ArrayList<>(messages.stream().collect(Collectors.toCollection(messageSupplier)))
.stream().collect(Collectors.toMap(MessageObject::getMessageId, Function.identity()));
}

log.warn(
"Message object must contain either a Message or a Map<String, Set<Message>, but contained: {}",
messageObject.getClass());
return new HashSet<>();
// FIXME: Do we need this method?
@SuppressWarnings("unchecked")
public static Set<Message> messageObjectToSet(Map<String, Message> messages) {
return new HashSet<>(messages.values());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import io.github.stavshamir.springwolf.asyncapi.MessageHelper;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.Channel;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.ChannelObject;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.Message;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.MessageObject;
import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.MessageReference;
import io.github.stavshamir.springwolf.asyncapi.v3.model.operation.Operation;

import java.util.HashMap;
Expand All @@ -16,6 +18,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.github.stavshamir.springwolf.asyncapi.MessageHelper.toMessagesMap;

/**
* Util to merge multiple {@link Channel}s
*/
Expand All @@ -27,52 +31,103 @@ private ChannelMerger() {}
* Merges multiple channels by channel name
* <p>
* Given two channels for the same channel name, the first seen Channel is used
* If an operation is null, the next non-null operation is used
* Messages within operations are merged
* Messages within channels are merged
*
* @param channelEntries Ordered pairs of channel name to Channel
* @return A map of channelName to a single Channel
*/
public static Map<String, ChannelObject> merge(List<Map.Entry<String, ChannelObject>> channelEntries) {
public static Map<String, ChannelObject> mergeChannels(List<Map.Entry<String, ChannelObject>> channelEntries) {
Map<String, ChannelObject> mergedChannels = new HashMap<>();

for (Map.Entry<String, ChannelObject> entry : channelEntries) {
if (!mergedChannels.containsKey(entry.getKey())) {
mergedChannels.put(entry.getKey(), entry.getValue());
} else {
ChannelObject channel = mergedChannels.get(entry.getKey());
// channel.setPublish(mergeOperation(channel.getPublish(), entry.getValue().getPublish()));
// channel.setSubscribe(mergeOperation(channel.getSubscribe(), entry.getValue().getSubscribe()));
ChannelObject channel = mergeChannel(mergedChannels.get(entry.getKey()), entry.getValue());
mergedChannels.put(entry.getKey(), channel);
}
}

return mergedChannels;
}

/**
* Merges multiple operations by operation name
* <p>
* Given two operations for the same operation name, the first seen Operation is used
* If an operation is null, the next non-null operation is used
* Messages within operations are merged
*
* @param operationEntries Ordered pairs of operation name to Operation
* @return A map of operationName to a single Operation
*/
public static Map<String, Operation> mergeOperations(List<Map.Entry<String, Operation>> operationEntries) {
Map<String, Operation> mergedOperations = new HashMap<>();

for (Map.Entry<String, Operation> entry : operationEntries) {
if (!mergedOperations.containsKey(entry.getKey())) {
mergedOperations.put(entry.getKey(), entry.getValue());
} else {
Operation operation = mergeOperation(mergedOperations.get(entry.getKey()), entry.getValue());
mergedOperations.put(entry.getKey(), operation);
}
}

return mergedOperations;
}

private static ChannelObject mergeChannel(ChannelObject channel, ChannelObject otherChannel) {
ChannelObject mergedChannel = channel != null ? channel : otherChannel;

Set<MessageObject> mergedMessages = mergeMessages(getMessages(channel), getMessages(otherChannel));
if (!mergedMessages.isEmpty()) {
mergedChannel.setMessages(toMessagesMap(mergedMessages));
}
return mergedChannel;
}

private static Operation mergeOperation(Operation operation, Operation otherOperation) {
Operation mergedOperation = operation != null ? operation : otherOperation;

Set<MessageObject> mergedMessages = mergeMessages(getMessages(operation), getMessages(otherOperation));
// if (!mergedMessages.isEmpty()) {
// mergedOperation.setMessage(toMessageObjectOrComposition(mergedMessages)); FIXME
// }
List<MessageReference> mergedMessages =
mergeMessageReferences(operation.getMessages(), otherOperation.getMessages());
if (!mergedMessages.isEmpty()) {
mergedOperation.setMessages(mergedMessages);
}
return mergedOperation;
}

private static Set<MessageObject> mergeMessages(Set<MessageObject> messages, Set<MessageObject> otherMessages) {
Map<String, MessageObject> nameToMessage =
messages.stream().collect(Collectors.toMap(MessageObject::getName, Function.identity()));
private static Set<MessageObject> mergeMessages(Set<Message> messages, Set<Message> otherMessages) {
// FIXME: We will lose any MessageReference we get
Map<String, MessageObject> nameToMessage = messages.stream()
.filter(MessageObject.class::isInstance)
.map(MessageObject.class::cast)
.collect(Collectors.toMap(MessageObject::getName, Function.identity()));

for (MessageObject otherMessage : otherMessages) {
nameToMessage.putIfAbsent(otherMessage.getName(), otherMessage);
for (Message otherMessage : otherMessages) {
if (otherMessage instanceof MessageObject otherMessageObject) {
nameToMessage.putIfAbsent(otherMessageObject.getName(), otherMessageObject);
}
}

return new HashSet<>(nameToMessage.values());
}

private static Set<MessageObject> getMessages(Operation operation) {
return Optional.ofNullable(operation)
.map(Operation::getMessages)
private static List<MessageReference> mergeMessageReferences(
List<MessageReference> messages, List<MessageReference> otherMessages) {
var messageReferences = new HashSet<MessageReference>();
if (messages != null) {
messageReferences.addAll(messages);
}
if (otherMessages != null) {
messageReferences.addAll(otherMessages);
}
return messageReferences.stream().toList();
}

private static Set<Message> getMessages(ChannelObject channel) {
return Optional.ofNullable(channel)
.map(ChannelObject::getMessages)
.map(MessageHelper::messageObjectToSet)
.orElseGet(HashSet::new);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;

import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.ChannelObject;
import io.github.stavshamir.springwolf.asyncapi.v3.model.operation.Operation;

import java.util.Map;

Expand All @@ -10,5 +11,10 @@ public interface ChannelsScanner {
/**
* @return A mapping of channel names to their respective channel object for a given protocol.
*/
Map<String, ChannelObject> scan();
Map<String, ChannelObject> scanChannels();

/**
* @return A mapping of operation names to their respective operation object for a given protocol.
*/
Map<String, Operation> scanOperations();
}
Loading

0 comments on commit c44f5b7

Please sign in to comment.