diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java index 5bed7f9c940..e421228abd0 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import static java.lang.String.format; @@ -141,12 +142,18 @@ public void registerFactory(DataSinkFactory factory) { @Nullable private DataSourceFactory getSourceFactory(DataFlowStartMessage request) { - return sourceFactories.stream().filter(s -> s.canHandle(request)).findFirst().orElse(null); + return sourceFactories.stream() + .filter(s -> Objects.equals(s.supportedType(), request.getSourceDataAddress().getType())) + .findFirst() + .orElse(null); } @Nullable private DataSinkFactory getSinkFactory(DataFlowStartMessage request) { - return sinkFactories.stream().filter(s -> s.canHandle(request)).findFirst().orElse(null); + return sinkFactories.stream() + .filter(s -> Objects.equals(s.supportedType(), request.getDestinationDataAddress().getType())) + .findFirst() + .orElse(null); } @NotNull diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java index 31be90bdc90..ffe0779360d 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java @@ -23,7 +23,6 @@ import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.types.domain.DataAddress; -import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtensionContext; @@ -45,7 +44,6 @@ import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -54,18 +52,14 @@ class PipelineServiceImplTest { - Monitor monitor = mock(); - PipelineServiceImpl service = new PipelineServiceImpl(monitor); - DataFlowStartMessage request = DataFlowStartMessage.Builder.newInstance() - .id("1") - .processId("1") - .sourceDataAddress(DataAddress.Builder.newInstance().type("test").build()) - .destinationDataAddress(DataAddress.Builder.newInstance().type("test").build()) - .build(); - DataSourceFactory sourceFactory = mock(DataSourceFactory.class); - DataSinkFactory sinkFactory = mock(DataSinkFactory.class); - DataSource source = mock(DataSource.class); - DataSink sink = mock(DataSink.class); + private final Monitor monitor = mock(); + + private final DataSourceFactory sourceFactory = mock(); + + private final DataSinkFactory sinkFactory = mock(); + private final DataSource source = mock(); + private final DataSink sink = mock(); + private final PipelineServiceImpl service = new PipelineServiceImpl(monitor); @BeforeEach void setUp() { @@ -75,26 +69,25 @@ void setUp() { @Test void transfer_invokesSink() { - when(sourceFactory.canHandle(request)).thenReturn(true); - when(sourceFactory.createSource(request)).thenReturn(source); - when(sinkFactory.canHandle(request)).thenReturn(true); - when(sinkFactory.createSink(request)).thenReturn(sink); - when(sink.transfer(source)).thenReturn(completedFuture(StreamResult.success())); + when(sourceFactory.supportedType()).thenReturn("source"); + when(sourceFactory.createSource(any())).thenReturn(source); + when(sinkFactory.supportedType()).thenReturn("destination"); + when(sinkFactory.createSink(any())).thenReturn(sink); + when(sink.transfer(any())).thenReturn(completedFuture(StreamResult.success())); - service.transfer(request); + var future = service.transfer(dataFlow("source", "destination").toRequest()); - verify(sink).transfer(eq(source)); + assertThat(future).succeedsWithin(5, TimeUnit.SECONDS).satisfies(result -> { + assertThat(result).isSucceeded(); + }); + verify(sink).transfer(source); } @Test - void transfer_withCustomSink_shouldNotInvokeSinkFactory() throws Exception { - var flowRequest = DataFlow.Builder.newInstance().id("dataFlowId") - .source(DataAddress.Builder.newInstance().type("source").build()) - .destination(DataAddress.Builder.newInstance().type("custom-destination").build()) - .build() - .toRequest(); - - when(sourceFactory.canHandle(any())).thenReturn(true); + void transfer_withCustomSink_shouldNotInvokeSinkFactory() { + var flowRequest = dataFlow("source", "custom-destination").toRequest(); + + when(sourceFactory.supportedType()).thenReturn("source"); when(sourceFactory.createSource(any())).thenReturn(source); var customSink = new DataSink() { @@ -114,13 +107,10 @@ public CompletableFuture> transfer(DataSource source) { @Test void terminate_shouldCloseDataSource() throws Exception { - var dataFlow = DataFlow.Builder.newInstance().id("dPIataFlowId") - .source(DataAddress.Builder.newInstance().type("source").build()) - .destination(DataAddress.Builder.newInstance().type("destination").build()) - .build(); - when(sourceFactory.canHandle(any())).thenReturn(true); + var dataFlow = dataFlow("source", "destination"); + when(sourceFactory.supportedType()).thenReturn("source"); when(sourceFactory.createSource(any())).thenReturn(source); - when(sinkFactory.canHandle(any())).thenReturn(true); + when(sinkFactory.supportedType()).thenReturn("destination"); when(sinkFactory.createSink(any())).thenReturn(sink); when(sink.transfer(any())).thenReturn(completedFuture(StreamResult.success())); @@ -134,13 +124,10 @@ void terminate_shouldCloseDataSource() throws Exception { @Test void terminate_shouldFail_whenSourceClosureFails() throws Exception { - var dataFlow = DataFlow.Builder.newInstance().id("dataFlowId") - .source(DataAddress.Builder.newInstance().type("source").build()) - .destination(DataAddress.Builder.newInstance().type("destination").build()) - .build(); - when(sourceFactory.canHandle(any())).thenReturn(true); + var dataFlow = dataFlow("source", "destination"); + when(sourceFactory.supportedType()).thenReturn("source"); when(sourceFactory.createSource(any())).thenReturn(source); - when(sinkFactory.canHandle(any())).thenReturn(true); + when(sinkFactory.supportedType()).thenReturn("destination"); when(sinkFactory.createSink(any())).thenReturn(sink); when(sink.transfer(any())).thenReturn(completedFuture(StreamResult.success())); doThrow(IOException.class).when(source).close(); @@ -154,10 +141,7 @@ void terminate_shouldFail_whenSourceClosureFails() throws Exception { @Test void terminate_shouldFail_whenTransferDoesNotExist() { - var dataFlow = DataFlow.Builder.newInstance().id("dataFlowId") - .source(DataAddress.Builder.newInstance().type("source").build()) - .destination(DataAddress.Builder.newInstance().type("destination").build()) - .build(); + var dataFlow = dataFlow("source", "destination"); var result = service.terminate(dataFlow); @@ -167,16 +151,21 @@ void terminate_shouldFail_whenTransferDoesNotExist() { @ParameterizedTest @ArgumentsSource(CanHandleArguments.class) - void canHandle_returnsTrue_onlyIfSourceAndSinkCanHandle( - boolean sourceFactoryResponse, - boolean sinkFactoryResponse, - boolean expectedResult - ) { - when(sourceFactory.canHandle(request)).thenReturn(sourceFactoryResponse); - when(sinkFactory.canHandle(request)).thenReturn(sinkFactoryResponse); - - assertThat(service.canHandle(request)) - .isEqualTo(expectedResult); + void canHandle_shouldReturnTrue_whenSourceAndDestinationCanBeHandled(String source, String destination, boolean expected) { + when(sourceFactory.supportedType()).thenReturn("source"); + when(sinkFactory.supportedType()).thenReturn("destination"); + + boolean result = service.canHandle(dataFlow(source, destination).toRequest()); + + assertThat(result).isEqualTo(expected); + } + + private DataFlow dataFlow(String sourceType, String destinationType) { + return DataFlow.Builder.newInstance() + .id("1") + .source(DataAddress.Builder.newInstance().type(sourceType).build()) + .destination(DataAddress.Builder.newInstance().type(destinationType).build()) + .build(); } private static class CanHandleArguments implements ArgumentsProvider { @@ -184,10 +173,10 @@ private static class CanHandleArguments implements ArgumentsProvider { @Override public Stream provideArguments(ExtensionContext extensionContext) { return Stream.of( - arguments(true, true, true), - arguments(true, false, false), - arguments(false, true, false), - arguments(false, false, false) + arguments("source", "destination", true), + arguments("unsupported_source", "destination", false), + arguments("source", "unsupported_destination", false), + arguments("unsupported_source", "unsupported_destination", false) ); } } diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceIntegrationTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceIntegrationTest.java index 9f551c33a48..86c7f4ae71b 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceIntegrationTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceIntegrationTest.java @@ -89,6 +89,11 @@ private static class FixedEndpoint implements DataSinkFactory { sink = new MemorySink(); } + @Override + public String supportedType() { + return "any"; + } + @Override public boolean canHandle(DataFlowStartMessage request) { return true; @@ -116,6 +121,11 @@ private static class InputStreamDataFactory implements DataSourceFactory { this("bytes"); } + @Override + public String supportedType() { + return "any"; + } + @Override public boolean canHandle(DataFlowStartMessage request) { return true; diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSinkFactory.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSinkFactory.java index a4ee9fd1a38..e03aba577a3 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSinkFactory.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSinkFactory.java @@ -54,6 +54,11 @@ public HttpDataSinkFactory(EdcHttpClient httpClient, this.requestFactory = requestFactory; } + @Override + public String supportedType() { + return HTTP_DATA_TYPE; + } + @Override public boolean canHandle(DataFlowStartMessage request) { return HTTP_DATA_TYPE.equals(request.getDestinationDataAddress().getType()); diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSourceFactory.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSourceFactory.java index 12212583432..29fb1c4cef4 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSourceFactory.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSourceFactory.java @@ -46,6 +46,11 @@ public HttpDataSourceFactory(EdcHttpClient httpClient, HttpRequestParamsProvider this.requestFactory = requestFactory; } + @Override + public String supportedType() { + return HTTP_DATA_TYPE; + } + @Override public boolean canHandle(DataFlowStartMessage request) { return HTTP_DATA_TYPE.equals(request.getSourceDataAddress().getType()); diff --git a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java index af1dd8738c8..5016452e9a7 100644 --- a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java +++ b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java @@ -65,11 +65,13 @@ void setUp() { factory = new HttpDataSinkFactory(httpClient, executorService, 5, monitor, provider, requestFactory); } + @Deprecated(since = "0.6.2") @Test void verifyCanHandle() { assertThat(factory.canHandle(TestFunctions.createRequest(HTTP_DATA_TYPE).build())).isTrue(); } + @Deprecated(since = "0.6.2") @Test void verifyCannotHandle() { assertThat(factory.canHandle(TestFunctions.createRequest("dummy").build())).isFalse(); diff --git a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSourceFactoryTest.java b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSourceFactoryTest.java index 1d32bc83e09..05080713e94 100644 --- a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSourceFactoryTest.java +++ b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSourceFactoryTest.java @@ -51,11 +51,13 @@ void setUp() { factory = new HttpDataSourceFactory(httpClient, provider, monitor, requestFactory); } + @Deprecated(since = "0.6.2") @Test void verifyCanHandle() { assertThat(factory.canHandle(TestFunctions.createRequest(HTTP_DATA_TYPE).build())).isTrue(); } + @Deprecated(since = "0.6.2") @Test void verifyCannotHandle() { assertThat(factory.canHandle(TestFunctions.createRequest("dummy").build())).isFalse(); diff --git a/extensions/data-plane/data-plane-kafka/src/main/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSinkFactory.java b/extensions/data-plane/data-plane-kafka/src/main/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSinkFactory.java index ebd7074b33c..d4594134e84 100644 --- a/extensions/data-plane/data-plane-kafka/src/main/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSinkFactory.java +++ b/extensions/data-plane/data-plane-kafka/src/main/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSinkFactory.java @@ -48,6 +48,11 @@ public KafkaDataSinkFactory(ExecutorService executorService, Monitor monitor, Ka this.partitionSize = partitionSize; } + @Override + public String supportedType() { + return KAFKA_TYPE; + } + @Override public boolean canHandle(DataFlowStartMessage dataRequest) { return KAFKA_TYPE.equalsIgnoreCase(dataRequest.getDestinationDataAddress().getType()); @@ -79,4 +84,4 @@ public DataSink createSink(DataFlowStartMessage request) { .executorService(executorService) .build(); } -} \ No newline at end of file +} diff --git a/extensions/data-plane/data-plane-kafka/src/main/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSourceFactory.java b/extensions/data-plane/data-plane-kafka/src/main/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSourceFactory.java index 142793654a4..6c42df5bcb7 100644 --- a/extensions/data-plane/data-plane-kafka/src/main/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSourceFactory.java +++ b/extensions/data-plane/data-plane-kafka/src/main/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSourceFactory.java @@ -54,6 +54,11 @@ public KafkaDataSourceFactory(Monitor monitor, KafkaPropertiesFactory properties this.clock = clock; } + @Override + public String supportedType() { + return KAFKA_TYPE; + } + @Override public boolean canHandle(DataFlowStartMessage dataRequest) { return KAFKA_TYPE.equalsIgnoreCase(dataRequest.getSourceDataAddress().getType()); diff --git a/extensions/data-plane/data-plane-kafka/src/test/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSinkFactoryTest.java b/extensions/data-plane/data-plane-kafka/src/test/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSinkFactoryTest.java index ad0e0d926fe..a3fd32c717a 100644 --- a/extensions/data-plane/data-plane-kafka/src/test/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSinkFactoryTest.java +++ b/extensions/data-plane/data-plane-kafka/src/test/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSinkFactoryTest.java @@ -47,6 +47,7 @@ public void setUp() { factory = new KafkaDataSinkFactory(mock(ExecutorService.class), mock(Monitor.class), propertiesFactory, 1); } + @Deprecated(since = "0.6.2") @Test void verifyCanHandle() { assertThat(factory.canHandle(createRequest("kafka", emptyMap()))).isTrue(); diff --git a/extensions/data-plane/data-plane-kafka/src/test/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSourceFactoryTest.java b/extensions/data-plane/data-plane-kafka/src/test/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSourceFactoryTest.java index edc1e5784ed..9bfa6a7c2d2 100644 --- a/extensions/data-plane/data-plane-kafka/src/test/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSourceFactoryTest.java +++ b/extensions/data-plane/data-plane-kafka/src/test/java/org/eclipse/edc/connector/dataplane/kafka/pipeline/KafkaDataSourceFactoryTest.java @@ -47,6 +47,7 @@ public void setUp() { factory = new KafkaDataSourceFactory(mock(Monitor.class), propertiesFactory, mock(Clock.class)); } + @Deprecated(since = "0.6.2") @Test void verifyCanHandle() { assertThat(factory.canHandle(createRequest("kafka", emptyMap()))).isTrue(); diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/DataSinkFactory.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/DataSinkFactory.java index d8923072e09..5ca834510d1 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/DataSinkFactory.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/DataSinkFactory.java @@ -23,10 +23,22 @@ */ public interface DataSinkFactory { + /** + * Return the supported DataAddress type. + * + * @return supported DataAddress type. + */ + String supportedType(); + /** * Returns true if this factory can create a {@link DataSink} for the request. + * + * @deprecated please use {@link #supportedType()} instead. */ - boolean canHandle(DataFlowStartMessage request); + @Deprecated(since = "0.6.2") + default boolean canHandle(DataFlowStartMessage request) { + return false; + } /** * Creates a sink to send data to. diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/DataSourceFactory.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/DataSourceFactory.java index a84fdf65cbf..478c8319354 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/DataSourceFactory.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/DataSourceFactory.java @@ -23,10 +23,22 @@ */ public interface DataSourceFactory { + /** + * Return the supported DataAddress type. + * + * @return supported DataAddress type. + */ + String supportedType(); + /** * Returns true if this factory can create a {@link DataSource} for the request. + * + * @deprecated please use {@link #supportedType()} instead. */ - boolean canHandle(DataFlowStartMessage request); + @Deprecated(since = "0.6.2") + default boolean canHandle(DataFlowStartMessage request) { + return false; + } /** * Creates a source to access data to be sent.