diff --git a/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/java/JavaStep.java b/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/java/JavaStep.java index c2f5a67b2..7571ef598 100644 --- a/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/java/JavaStep.java +++ b/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/java/JavaStep.java @@ -158,6 +158,10 @@ public boolean hasVoidOutbound() { return isVoid(getOutbound()); } + public boolean useEmitter() { + return hasNativeInbound() || hasVoidInbound(); + } + public Set getBaseImports() { getBaseSignature(); addPersistImports(); diff --git a/foundation/foundation-mda/src/main/resources/templates/data-delivery-spark/asynchronous.processor.base.java.vm b/foundation/foundation-mda/src/main/resources/templates/data-delivery-spark/asynchronous.processor.base.java.vm index 2df072dd7..f545d0f3b 100644 --- a/foundation/foundation-mda/src/main/resources/templates/data-delivery-spark/asynchronous.processor.base.java.vm +++ b/foundation/foundation-mda/src/main/resources/templates/data-delivery-spark/asynchronous.processor.base.java.vm @@ -26,7 +26,7 @@ import org.reactivestreams.Subscription; #if (${step.outbound}) import org.eclipse.microprofile.reactive.messaging.Outgoing; #end -#if ((${step.hasMessagingOutbound()} && ${step.hasNativeInbound()})) +#if ((${step.hasMessagingOutbound()} && ${step.useEmitter()})) import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; #end @@ -137,7 +137,7 @@ public abstract class ${step.capitalizedName}Base extends AbstractPipelineStep { #end #end -#if ($step.hasMessagingOutbound() && $step.hasNativeInbound()) +#if ($step.hasMessagingOutbound() && $step.useEmitter()) @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 20) @Inject @Channel(OUTGOING_CHANNEL) @@ -182,7 +182,7 @@ public abstract class ${step.capitalizedName}Base extends AbstractPipelineStep { @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING) @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 20) #end - #if (${step.hasMessagingOutbound()} && !${step.hasNativeInbound()}) + #if (${step.hasMessagingOutbound()} && !${step.hasNativeInbound()} && !${step.hasVoidInbound()}) @Outgoing(OUTGOING_CHANNEL) #end ${step.baseSignature} { @@ -209,7 +209,7 @@ public abstract class ${step.capitalizedName}Base extends AbstractPipelineStep { #if ($step.billOfMaterial && $step.billOfMaterial.enabled) recordBOM(); #end - #if ($step.hasMessagingOutbound() && $step.hasNativeInbound()) + #if ($step.hasMessagingOutbound() && $step.useEmitter()) ${step.baseOutboundType} outboundPayload = executeStepImpl(#if (${step.baseInboundType}) inbound #end); #if ($pipeline.getDataLineage()) eventEndTime = ZonedDateTime.now(ZoneOffset.UTC); diff --git a/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/pipelines/SparkJavaDataDeliveryPatterns.json b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/pipelines/SparkJavaDataDeliveryPatterns.json index 4c4e820c3..bcfdfc58b 100644 --- a/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/pipelines/SparkJavaDataDeliveryPatterns.json +++ b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/pipelines/SparkJavaDataDeliveryPatterns.json @@ -382,6 +382,14 @@ } } }, + { + "name": "VoidInboundAndMessagingOutboundAsync", + "type": "asynchronous", + "outbound": { + "type": "messaging", + "channelName": "outboundChannel" + } + }, { "name": "MessagingOutboundWithCustomTypes", "type": "synchronous", @@ -394,6 +402,18 @@ } } }, + { + "name": "MessagingOutboundWithCustomTypesAsync", + "type": "asynchronous", + "outbound": { + "type": "messaging", + "channelName": "outboundChannel", + "recordType": { + "name": "CustomRecord", + "package": "com.boozallen.aiops.mda.pattern.record" + } + } + }, { "name": "MessagingOutboundWithCustomRec", "type": "synchronous", @@ -406,6 +426,18 @@ } } }, + { + "name": "MessagingOutboundWithCustomRecAsync", + "type": "asynchronous", + "outbound": { + "type": "messaging", + "channelName": "outboundChannel", + "recordType": { + "name": "CustomRec", + "package": "com.boozallen.aiops.mda.pattern.record" + } + } + }, { "name": "MessagingInboundWithCustomRec", "type": "synchronous",