Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add supportedType method on source and sink factories #4151

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

import static java.lang.String.format;
Expand Down Expand Up @@ -141,12 +142,18 @@ public void registerFactory(DataSinkFactory factory) {

@Nullable
private DataSourceFactory getSourceFactory(DataFlowStartMessage request) {
return sourceFactories.stream().filter(s -> s.canHandle(request)).findFirst().orElse(null);
return sourceFactories.stream()
.filter(s -> Objects.equals(s.supportedType(), request.getSourceDataAddress().getType()))
.findFirst()
.orElse(null);
}

@Nullable
private DataSinkFactory getSinkFactory(DataFlowStartMessage request) {
return sinkFactories.stream().filter(s -> s.canHandle(request)).findFirst().orElse(null);
return sinkFactories.stream()
.filter(s -> Objects.equals(s.supportedType(), request.getDestinationDataAddress().getType()))
.findFirst()
.orElse(null);
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
Expand All @@ -45,7 +44,6 @@
import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand All @@ -54,18 +52,14 @@

class PipelineServiceImplTest {

Monitor monitor = mock();
PipelineServiceImpl service = new PipelineServiceImpl(monitor);
DataFlowStartMessage request = DataFlowStartMessage.Builder.newInstance()
.id("1")
.processId("1")
.sourceDataAddress(DataAddress.Builder.newInstance().type("test").build())
.destinationDataAddress(DataAddress.Builder.newInstance().type("test").build())
.build();
DataSourceFactory sourceFactory = mock(DataSourceFactory.class);
DataSinkFactory sinkFactory = mock(DataSinkFactory.class);
DataSource source = mock(DataSource.class);
DataSink sink = mock(DataSink.class);
private final Monitor monitor = mock();

private final DataSourceFactory sourceFactory = mock();

private final DataSinkFactory sinkFactory = mock();
private final DataSource source = mock();
private final DataSink sink = mock();
private final PipelineServiceImpl service = new PipelineServiceImpl(monitor);

@BeforeEach
void setUp() {
Expand All @@ -75,26 +69,25 @@ void setUp() {

@Test
void transfer_invokesSink() {
when(sourceFactory.canHandle(request)).thenReturn(true);
when(sourceFactory.createSource(request)).thenReturn(source);
when(sinkFactory.canHandle(request)).thenReturn(true);
when(sinkFactory.createSink(request)).thenReturn(sink);
when(sink.transfer(source)).thenReturn(completedFuture(StreamResult.success()));
when(sourceFactory.supportedType()).thenReturn("source");
when(sourceFactory.createSource(any())).thenReturn(source);
when(sinkFactory.supportedType()).thenReturn("destination");
when(sinkFactory.createSink(any())).thenReturn(sink);
when(sink.transfer(any())).thenReturn(completedFuture(StreamResult.success()));

service.transfer(request);
var future = service.transfer(dataFlow("source", "destination").toRequest());

verify(sink).transfer(eq(source));
assertThat(future).succeedsWithin(5, TimeUnit.SECONDS).satisfies(result -> {
assertThat(result).isSucceeded();
});
verify(sink).transfer(source);
}

@Test
void transfer_withCustomSink_shouldNotInvokeSinkFactory() throws Exception {
var flowRequest = DataFlow.Builder.newInstance().id("dataFlowId")
.source(DataAddress.Builder.newInstance().type("source").build())
.destination(DataAddress.Builder.newInstance().type("custom-destination").build())
.build()
.toRequest();

when(sourceFactory.canHandle(any())).thenReturn(true);
void transfer_withCustomSink_shouldNotInvokeSinkFactory() {
var flowRequest = dataFlow("source", "custom-destination").toRequest();

when(sourceFactory.supportedType()).thenReturn("source");
when(sourceFactory.createSource(any())).thenReturn(source);

var customSink = new DataSink() {
Expand All @@ -114,13 +107,10 @@ public CompletableFuture<StreamResult<Object>> transfer(DataSource source) {

@Test
void terminate_shouldCloseDataSource() throws Exception {
var dataFlow = DataFlow.Builder.newInstance().id("dPIataFlowId")
.source(DataAddress.Builder.newInstance().type("source").build())
.destination(DataAddress.Builder.newInstance().type("destination").build())
.build();
when(sourceFactory.canHandle(any())).thenReturn(true);
var dataFlow = dataFlow("source", "destination");
when(sourceFactory.supportedType()).thenReturn("source");
when(sourceFactory.createSource(any())).thenReturn(source);
when(sinkFactory.canHandle(any())).thenReturn(true);
when(sinkFactory.supportedType()).thenReturn("destination");
when(sinkFactory.createSink(any())).thenReturn(sink);
when(sink.transfer(any())).thenReturn(completedFuture(StreamResult.success()));

Expand All @@ -134,13 +124,10 @@ void terminate_shouldCloseDataSource() throws Exception {

@Test
void terminate_shouldFail_whenSourceClosureFails() throws Exception {
var dataFlow = DataFlow.Builder.newInstance().id("dataFlowId")
.source(DataAddress.Builder.newInstance().type("source").build())
.destination(DataAddress.Builder.newInstance().type("destination").build())
.build();
when(sourceFactory.canHandle(any())).thenReturn(true);
var dataFlow = dataFlow("source", "destination");
when(sourceFactory.supportedType()).thenReturn("source");
when(sourceFactory.createSource(any())).thenReturn(source);
when(sinkFactory.canHandle(any())).thenReturn(true);
when(sinkFactory.supportedType()).thenReturn("destination");
when(sinkFactory.createSink(any())).thenReturn(sink);
when(sink.transfer(any())).thenReturn(completedFuture(StreamResult.success()));
doThrow(IOException.class).when(source).close();
Expand All @@ -154,10 +141,7 @@ void terminate_shouldFail_whenSourceClosureFails() throws Exception {

@Test
void terminate_shouldFail_whenTransferDoesNotExist() {
var dataFlow = DataFlow.Builder.newInstance().id("dataFlowId")
.source(DataAddress.Builder.newInstance().type("source").build())
.destination(DataAddress.Builder.newInstance().type("destination").build())
.build();
var dataFlow = dataFlow("source", "destination");

var result = service.terminate(dataFlow);

Expand All @@ -167,27 +151,32 @@ void terminate_shouldFail_whenTransferDoesNotExist() {

@ParameterizedTest
@ArgumentsSource(CanHandleArguments.class)
void canHandle_returnsTrue_onlyIfSourceAndSinkCanHandle(
boolean sourceFactoryResponse,
boolean sinkFactoryResponse,
boolean expectedResult
) {
when(sourceFactory.canHandle(request)).thenReturn(sourceFactoryResponse);
when(sinkFactory.canHandle(request)).thenReturn(sinkFactoryResponse);

assertThat(service.canHandle(request))
.isEqualTo(expectedResult);
void canHandle_shouldReturnTrue_whenSourceAndDestinationCanBeHandled(String source, String destination, boolean expected) {
when(sourceFactory.supportedType()).thenReturn("source");
when(sinkFactory.supportedType()).thenReturn("destination");

boolean result = service.canHandle(dataFlow(source, destination).toRequest());

assertThat(result).isEqualTo(expected);
}

private DataFlow dataFlow(String sourceType, String destinationType) {
return DataFlow.Builder.newInstance()
.id("1")
.source(DataAddress.Builder.newInstance().type(sourceType).build())
.destination(DataAddress.Builder.newInstance().type(destinationType).build())
.build();
}

private static class CanHandleArguments implements ArgumentsProvider {

@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext extensionContext) {
return Stream.of(
arguments(true, true, true),
arguments(true, false, false),
arguments(false, true, false),
arguments(false, false, false)
arguments("source", "destination", true),
arguments("unsupported_source", "destination", false),
arguments("source", "unsupported_destination", false),
arguments("unsupported_source", "unsupported_destination", false)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ private static class FixedEndpoint implements DataSinkFactory {
sink = new MemorySink();
}

@Override
public String supportedType() {
return "any";
}

@Override
public boolean canHandle(DataFlowStartMessage request) {
return true;
Expand Down Expand Up @@ -116,6 +121,11 @@ private static class InputStreamDataFactory implements DataSourceFactory {
this("bytes");
}

@Override
public String supportedType() {
return "any";
}

@Override
public boolean canHandle(DataFlowStartMessage request) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public HttpDataSinkFactory(EdcHttpClient httpClient,
this.requestFactory = requestFactory;
}

@Override
public String supportedType() {
return HTTP_DATA_TYPE;
}

@Override
public boolean canHandle(DataFlowStartMessage request) {
return HTTP_DATA_TYPE.equals(request.getDestinationDataAddress().getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public HttpDataSourceFactory(EdcHttpClient httpClient, HttpRequestParamsProvider
this.requestFactory = requestFactory;
}

@Override
public String supportedType() {
return HTTP_DATA_TYPE;
}

@Override
public boolean canHandle(DataFlowStartMessage request) {
return HTTP_DATA_TYPE.equals(request.getSourceDataAddress().getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ void setUp() {
factory = new HttpDataSinkFactory(httpClient, executorService, 5, monitor, provider, requestFactory);
}

@Deprecated(since = "0.6.2")
@Test
void verifyCanHandle() {
assertThat(factory.canHandle(TestFunctions.createRequest(HTTP_DATA_TYPE).build())).isTrue();
}

@Deprecated(since = "0.6.2")
@Test
void verifyCannotHandle() {
assertThat(factory.canHandle(TestFunctions.createRequest("dummy").build())).isFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ void setUp() {
factory = new HttpDataSourceFactory(httpClient, provider, monitor, requestFactory);
}

@Deprecated(since = "0.6.2")
@Test
void verifyCanHandle() {
assertThat(factory.canHandle(TestFunctions.createRequest(HTTP_DATA_TYPE).build())).isTrue();
}

@Deprecated(since = "0.6.2")
@Test
void verifyCannotHandle() {
assertThat(factory.canHandle(TestFunctions.createRequest("dummy").build())).isFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public KafkaDataSinkFactory(ExecutorService executorService, Monitor monitor, Ka
this.partitionSize = partitionSize;
}

@Override
public String supportedType() {
return KAFKA_TYPE;
}

@Override
public boolean canHandle(DataFlowStartMessage dataRequest) {
return KAFKA_TYPE.equalsIgnoreCase(dataRequest.getDestinationDataAddress().getType());
Expand Down Expand Up @@ -79,4 +84,4 @@ public DataSink createSink(DataFlowStartMessage request) {
.executorService(executorService)
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public KafkaDataSourceFactory(Monitor monitor, KafkaPropertiesFactory properties
this.clock = clock;
}

@Override
public String supportedType() {
return KAFKA_TYPE;
}

@Override
public boolean canHandle(DataFlowStartMessage dataRequest) {
return KAFKA_TYPE.equalsIgnoreCase(dataRequest.getSourceDataAddress().getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void setUp() {
factory = new KafkaDataSinkFactory(mock(ExecutorService.class), mock(Monitor.class), propertiesFactory, 1);
}

@Deprecated(since = "0.6.2")
@Test
void verifyCanHandle() {
assertThat(factory.canHandle(createRequest("kafka", emptyMap()))).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void setUp() {
factory = new KafkaDataSourceFactory(mock(Monitor.class), propertiesFactory, mock(Clock.class));
}

@Deprecated(since = "0.6.2")
@Test
void verifyCanHandle() {
assertThat(factory.canHandle(createRequest("kafka", emptyMap()))).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,22 @@
*/
public interface DataSinkFactory {

/**
* Return the supported DataAddress type.
*
* @return supported DataAddress type.
*/
String supportedType();

/**
* Returns true if this factory can create a {@link DataSink} for the request.
*
* @deprecated please use {@link #supportedType()} instead.
*/
boolean canHandle(DataFlowStartMessage request);
@Deprecated(since = "0.6.2")
default boolean canHandle(DataFlowStartMessage request) {
return false;
}

/**
* Creates a sink to send data to.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,22 @@
*/
public interface DataSourceFactory {

/**
* Return the supported DataAddress type.
*
* @return supported DataAddress type.
*/
String supportedType();

/**
* Returns true if this factory can create a {@link DataSource} for the request.
*
* @deprecated please use {@link #supportedType()} instead.
*/
boolean canHandle(DataFlowStartMessage request);
@Deprecated(since = "0.6.2")
default boolean canHandle(DataFlowStartMessage request) {
return false;
}

/**
* Creates a source to access data to be sent.
Expand Down
Loading