Skip to content

Commit

Permalink
Merge pull request #446 from boozallen/430-test-data-delivery-spark-m…
Browse files Browse the repository at this point in the history
…odel

#430 FIx data delivery steps not injecting smallrye when running test-data-delivery-spark-model
  • Loading branch information
jaebchoi authored Oct 31, 2024
2 parents f758041 + 874c0fb commit c603056
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ public boolean hasVoidOutbound() {
return isVoid(getOutbound());
}

public boolean useEmitter() {
return hasNativeInbound() || hasVoidInbound();
}

public Set<String> getBaseImports() {
getBaseSignature();
addPersistImports();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.reactivestreams.Subscription;
#if (${step.outbound})
import org.eclipse.microprofile.reactive.messaging.Outgoing;
#end
#if ((${step.hasMessagingOutbound()} && ${step.hasNativeInbound()}))
#if ((${step.hasMessagingOutbound()} && ${step.useEmitter()}))
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
#end
Expand Down Expand Up @@ -137,7 +137,7 @@ public abstract class ${step.capitalizedName}Base extends AbstractPipelineStep {
#end
#end

#if ($step.hasMessagingOutbound() && $step.hasNativeInbound())
#if ($step.hasMessagingOutbound() && $step.useEmitter())
@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 20)
@Inject
@Channel(OUTGOING_CHANNEL)
Expand Down Expand Up @@ -182,7 +182,7 @@ public abstract class ${step.capitalizedName}Base extends AbstractPipelineStep {
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 20)
#end
#if (${step.hasMessagingOutbound()} && !${step.hasNativeInbound()})
#if (${step.hasMessagingOutbound()} && !${step.hasNativeInbound()} && !${step.hasVoidInbound()})
@Outgoing(OUTGOING_CHANNEL)
#end
${step.baseSignature} {
Expand All @@ -209,7 +209,7 @@ public abstract class ${step.capitalizedName}Base extends AbstractPipelineStep {
#if ($step.billOfMaterial && $step.billOfMaterial.enabled)
recordBOM();
#end
#if ($step.hasMessagingOutbound() && $step.hasNativeInbound())
#if ($step.hasMessagingOutbound() && $step.useEmitter())
${step.baseOutboundType} outboundPayload = executeStepImpl(#if (${step.baseInboundType}) inbound #end);
#if ($pipeline.getDataLineage())
eventEndTime = ZonedDateTime.now(ZoneOffset.UTC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,14 @@
}
}
},
{
"name": "VoidInboundAndMessagingOutboundAsync",
"type": "asynchronous",
"outbound": {
"type": "messaging",
"channelName": "outboundChannel"
}
},
{
"name": "MessagingOutboundWithCustomTypes",
"type": "synchronous",
Expand All @@ -394,6 +402,18 @@
}
}
},
{
"name": "MessagingOutboundWithCustomTypesAsync",
"type": "asynchronous",
"outbound": {
"type": "messaging",
"channelName": "outboundChannel",
"recordType": {
"name": "CustomRecord",
"package": "com.boozallen.aiops.mda.pattern.record"
}
}
},
{
"name": "MessagingOutboundWithCustomRec",
"type": "synchronous",
Expand All @@ -406,6 +426,18 @@
}
}
},
{
"name": "MessagingOutboundWithCustomRecAsync",
"type": "asynchronous",
"outbound": {
"type": "messaging",
"channelName": "outboundChannel",
"recordType": {
"name": "CustomRec",
"package": "com.boozallen.aiops.mda.pattern.record"
}
}
},
{
"name": "MessagingInboundWithCustomRec",
"type": "synchronous",
Expand Down

0 comments on commit c603056

Please sign in to comment.