Skip to content

Commit

Permalink
Unavailable connects suppressing (#4061)
Browse files Browse the repository at this point in the history
1. suppressing connect errors when returning list of all connectors
2. minor refactoring of KafkaConnectService.getAllConnectors
  • Loading branch information
iliax authored Aug 1, 2023
1 parent 476cbfb commit 895d27a
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse
connectorPluginConfigValidationResponse);

default FullConnectorInfoDTO fullConnectorInfoFromTuple(InternalConnectInfo connectInfo) {
default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) {
ConnectorDTO connector = connectInfo.getConnector();
List<TaskDTO> tasks = connectInfo.getTasks();
int failedTasksCount = (int) tasks.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
Expand All @@ -39,7 +38,6 @@
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

@Service
@Slf4j
Expand All @@ -61,39 +59,22 @@ public Flux<ConnectDTO> getConnects(KafkaCluster cluster) {
public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
@Nullable final String search) {
return getConnects(cluster)
.flatMap(connect -> getConnectorNames(cluster, connect.getName()).map(cn -> Tuples.of(connect.getName(), cn)))
.flatMap(pair -> getConnector(cluster, pair.getT1(), pair.getT2()))
.flatMap(connector ->
getConnectorConfig(cluster, connector.getConnect(), connector.getName())
.map(config -> InternalConnectInfo.builder()
.connector(connector)
.config(config)
.build()
)
)
.flatMap(connectInfo -> {
ConnectorDTO connector = connectInfo.getConnector();
return getConnectorTasks(cluster, connector.getConnect(), connector.getName())
.collectList()
.map(tasks -> InternalConnectInfo.builder()
.connector(connector)
.config(connectInfo.getConfig())
.tasks(tasks)
.build()
);
})
.flatMap(connectInfo -> {
ConnectorDTO connector = connectInfo.getConnector();
return getConnectorTopics(cluster, connector.getConnect(), connector.getName())
.map(ct -> InternalConnectInfo.builder()
.connector(connector)
.config(connectInfo.getConfig())
.tasks(connectInfo.getTasks())
.topics(ct.getTopics())
.build()
);
})
.map(kafkaConnectMapper::fullConnectorInfoFromTuple)
.flatMap(connect ->
getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
.flatMap(connectorName ->
Mono.zip(
getConnector(cluster, connect.getName(), connectorName),
getConnectorConfig(cluster, connect.getName(), connectorName),
getConnectorTasks(cluster, connect.getName(), connectorName).collectList(),
getConnectorTopics(cluster, connect.getName(), connectorName)
).map(tuple ->
InternalConnectInfo.builder()
.connector(tuple.getT1())
.config(tuple.getT2())
.tasks(tuple.getT3())
.topics(tuple.getT4().getTopics())
.build())))
.map(kafkaConnectMapper::fullConnectorInfo)
.filter(matchesSearchTerm(search));
}

Expand Down Expand Up @@ -132,6 +113,11 @@ public Flux<String> getConnectorNames(KafkaCluster cluster, String connectName)
.flatMapMany(Flux::fromIterable);
}

// returns empty flux if there was an error communicating with Connect
public Flux<String> getConnectorNamesWithErrorsSuppress(KafkaCluster cluster, String connectName) {
return getConnectorNames(cluster, connectName).onErrorComplete();
}

@SneakyThrows
private List<String> parseConnectorsNamesStringToList(String json) {
return objectMapper.readValue(json, new TypeReference<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ConnectorsExporter {

Flux<DataEntityList> export(KafkaCluster cluster) {
return kafkaConnectService.getConnects(cluster)
.flatMap(connect -> kafkaConnectService.getConnectorNames(cluster, connect.getName())
.flatMap(connect -> kafkaConnectService.getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
.flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName))
.flatMap(connectorDTO ->
kafkaConnectService.getConnectorTopics(cluster, connect.getName(), connectorDTO.getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public void initialize(@NotNull ConfigurableApplicationContext context) {
System.setProperty("kafka.clusters.0.kafkaConnect.0.userName", "kafka-connect");
System.setProperty("kafka.clusters.0.kafkaConnect.0.password", "kafka-connect");
System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget());
System.setProperty("kafka.clusters.0.kafkaConnect.1.name", "notavailable");
System.setProperty("kafka.clusters.0.kafkaConnect.1.address", "http://notavailable:6666");
System.setProperty("kafka.clusters.0.masking.0.type", "REPLACE");
System.setProperty("kafka.clusters.0.masking.0.replacement", "***");
System.setProperty("kafka.clusters.0.masking.0.topicValuesPattern", "masking-test-.*");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void exportsConnectorsAsDataTransformers() {
when(kafkaConnectService.getConnects(CLUSTER))
.thenReturn(Flux.just(connect));

when(kafkaConnectService.getConnectorNames(CLUSTER, connect.getName()))
when(kafkaConnectService.getConnectorNamesWithErrorsSuppress(CLUSTER, connect.getName()))
.thenReturn(Flux.just(sinkConnector.getName(), sourceConnector.getName()));

when(kafkaConnectService.getConnector(CLUSTER, connect.getName(), sinkConnector.getName()))
Expand Down

0 comments on commit 895d27a

Please sign in to comment.