From f53b6656c886bca5097120a43072e9da9454d2d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Victor=20K=C3=BCnstler?= Date: Thu, 17 Mar 2022 08:48:33 +0100 Subject: [PATCH 1/2] Resolve (extra) input topic patterns (#224) * Support input pattern configs * Add support for kafka(-only) topics --- README.md | 5 +- backend/settings.yaml | 21 ++- backend/streams_explorer/core/config.py | 9 + backend/streams_explorer/core/k8s_app.py | 2 + .../core/k8s_config_parser.py | 4 + .../core/services/dataflow_graph.py | 137 +++++++++++++++- .../{kafka.py => kafka_admin_client.py} | 9 +- backend/streams_explorer/models/k8s_config.py | 2 + backend/streams_explorer/streams_explorer.py | 10 +- backend/tests/test_dataflow_graph.py | 155 +++++++++++++++++- backend/tests/test_kafka.py | 18 +- backend/tests/test_streams_explorer.py | 13 +- backend/tests/utils.py | 26 +++ 13 files changed, 378 insertions(+), 33 deletions(-) rename backend/streams_explorer/core/services/{kafka.py => kafka_admin_client.py} (85%) diff --git a/README.md b/README.md index 0f587840..07a219de 100644 --- a/README.md +++ b/README.md @@ -119,15 +119,18 @@ The following configuration options are available: #### General -- `graph.update_interval` Update the graph every X seconds (integer, **required**, default: `300`) +- `graph.update_interval` Update the graph every X seconds (int, **required**, default: `300`) - `graph.layout_arguments` Arguments passed to graphviz layout (string, **required**, default: `-Grankdir=LR -Gnodesep=0.8 -Gpad=10`) - `graph.pipeline_distance` Increase/decrease vertical space between pipeline graphs by X pixels (int, **required**, default: `500`) +- `graph.resolve.input_pattern_topics.all` If true topics that match (extra) input pattern(s) are connected to the streaming app in the graph containing all pipelines (bool, **required**, default: `false`) +- `graph.resolve.input_pattern_topics.pipelines` If true topics that match (extra) input pattern(s) are connected to the streaming app in pipeline graphs (bool, **required**, default: `false`) #### Kafka - `kafka.enable` Enable Kafka (bool, default: `false`) - `kafka.config` librdkafka configuration properties ([reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)) (dict, default: `{"bootstrap.servers": "localhost:9092"}`) - `kafka.displayed_information` Configuration options of Kafka topics displayed in the frontend (list of dict) +- `kafka.topic_names_cache.ttl` Cache for retrieving all topic names (used when input topic patterns are resolved) (int, default: `3600`) #### Kafka Connect diff --git a/backend/settings.yaml b/backend/settings.yaml index 6f3f8236..5eecb89d 100644 --- a/backend/settings.yaml +++ b/backend/settings.yaml @@ -3,6 +3,12 @@ graph: update_interval: 300 layout_arguments: "-Grankdir=LR -Gnodesep=0.8 -Gpad=10" pipeline_distance: 500 + resolve: + input_pattern_topics: + # If true topics that match (extra) input pattern(s) are connected to the streaming app in the graph containing all pipelines + all: false + # If true topics that match (extra) input pattern(s) are connected to the streaming app in pipeline graphs + pipelines: false k8s: deployment: @@ -35,17 +41,22 @@ kafka: displayed_information: - name: "Cleanup Policy" key: "cleanup.policy" + # cache for retrieving all topic names (used when input topic patterns are resolved) + topic_names_cache: + ttl: 3600 node_info: # cache time-to-live in seconds cache_ttl: 3600 ## (optional) configure Kafka Connect url and displayed information -# kafkaconnect: -# url: "http://localhost:8083" -# displayed_information: -# - name: "Transformer" -# key: "transforms.changeTopic.regex" +# kafkaconnect: +# url: "http://localhost:8083" +# displayed_information: +# - name: "Transformer" +# key: "transforms.changeTopic.regex" +# - name: "Type" +# key: "transforms.changeTopic.type" ## (optional) configure Schema Registry for topic information (supports Karapace and Confluent Schema Registry) # schemaregistry: diff --git a/backend/streams_explorer/core/config.py b/backend/streams_explorer/core/config.py index b33309f2..4245fc59 100644 --- a/backend/streams_explorer/core/config.py +++ b/backend/streams_explorer/core/config.py @@ -13,6 +13,14 @@ Validator("graph.update_interval", must_exist=True, is_type_of=int), Validator("graph.layout_arguments", must_exist=True, is_type_of=str), Validator("graph.pipeline_distance", must_exist=True, is_type_of=int), + Validator( + "graph.resolve.input_pattern_topics.all", must_exist=True, is_type_of=bool + ), + Validator( + "graph.resolve.input_pattern_topics.pipelines", + must_exist=True, + is_type_of=bool, + ), Validator("k8s.deployment.cluster", must_exist=True, is_type_of=bool), Validator("k8s.deployment.context", is_type_of=str), Validator("k8s.deployment.namespaces", must_exist=True, is_type_of=list), @@ -31,6 +39,7 @@ ) ), Validator("kafka.displayed_information", is_type_of=list, default=[]), + Validator("kafka.topic_names_cache.ttl", is_type_of=int, default=3600), Validator("node_info.cache_ttl", must_exist=True, is_type_of=int), Validator("kafkaconnect.url", default=None), Validator("kafkaconnect.displayed_information", is_type_of=list, default=[]), diff --git a/backend/streams_explorer/core/k8s_app.py b/backend/streams_explorer/core/k8s_app.py index e2454e8d..0388a48f 100644 --- a/backend/streams_explorer/core/k8s_app.py +++ b/backend/streams_explorer/core/k8s_app.py @@ -33,10 +33,12 @@ def __init__(self, k8s_object: K8sObject): self.id: str self.name: str self.input_topics: List[str] = [] + self.input_pattern: Optional[str] = None self.output_topic: Optional[str] = None self.error_topic: Optional[str] = None self.extra_input_topics: List[str] = [] self.extra_output_topics: List[str] = [] + self.extra_input_patterns: List[str] = [] self.attributes: Dict[str, str] = {} self.setup() diff --git a/backend/streams_explorer/core/k8s_config_parser.py b/backend/streams_explorer/core/k8s_config_parser.py index 848361ba..4c3e73c2 100644 --- a/backend/streams_explorer/core/k8s_config_parser.py +++ b/backend/streams_explorer/core/k8s_config_parser.py @@ -52,6 +52,10 @@ def parse_config(self, name: str, value: str): self.config.extra_input_topics = self.parse_extra_topics(value) elif name == "EXTRA_OUTPUT_TOPICS": self.config.extra_output_topics = self.parse_extra_topics(value) + elif name == "INPUT_PATTERN": + self.config.input_pattern = value + elif name == "EXTRA_INPUT_PATTERNS": + self.config.extra_input_patterns = self.parse_extra_topics(value) else: self.config.extra[name] = value diff --git a/backend/streams_explorer/core/services/dataflow_graph.py b/backend/streams_explorer/core/services/dataflow_graph.py index eda8935c..ece36c85 100644 --- a/backend/streams_explorer/core/services/dataflow_graph.py +++ b/backend/streams_explorer/core/services/dataflow_graph.py @@ -1,3 +1,6 @@ +import re +from collections import defaultdict +from enum import Enum from typing import Dict, List, Optional, Set, Type import networkx as nx @@ -7,6 +10,7 @@ from streams_explorer.core.config import settings from streams_explorer.core.k8s_app import ATTR_PIPELINE, K8sApp +from streams_explorer.core.services.kafka_admin_client import KafkaAdminClient from streams_explorer.core.services.metric_providers import MetricProvider from streams_explorer.models.graph import GraphEdge, GraphNode, Metric from streams_explorer.models.kafka_connector import ( @@ -22,13 +26,23 @@ class NodeNotFound(Exception): pass +class NodeDataFields(str, Enum): + NODE_TYPE = "node_type" + LABEL = "label" + + class DataFlowGraph: - def __init__(self, metric_provider: Type[MetricProvider]): + def __init__(self, metric_provider: Type[MetricProvider], kafka: KafkaAdminClient): self.graph = nx.DiGraph() self.json_graph: dict = {} self.pipelines: Dict[str, nx.DiGraph] = {} self.json_pipelines: Dict[str, dict] = {} self.metric_provider_class = metric_provider + self.kafka = kafka + + self._topic_pattern_queue: Dict[str, Set[str]] = defaultdict( + set + ) # topic pattern -> set of target node ids async def store_json_graph(self): self.json_graph = await self.get_positioned_graph() @@ -54,20 +68,25 @@ def _add_streaming_app(self, graph: nx.DiGraph, app: K8sApp): node_type=NodeTypesEnum.STREAMING_APP, **app.attributes, ) + + for input_topic in app.input_topics: + self._add_topic(graph, input_topic) + self._add_input_topic(graph, app.id, input_topic) if app.output_topic: self._add_topic(graph, app.output_topic) self._add_output_topic(graph, app.id, app.output_topic) if app.error_topic: self._add_error_topic(graph, app.id, app.error_topic) - for input_topic in app.input_topics: - self._add_topic(graph, input_topic) - self._add_input_topic(graph, app.id, input_topic) + if app.input_pattern: + self._enqueue_input_pattern(app.input_pattern, app.id) for extra_input in app.extra_input_topics: self._add_topic(graph, extra_input) self._add_input_topic(graph, app.id, extra_input) for extra_output in app.extra_output_topics: self._add_topic(graph, extra_output) self._add_output_topic(graph, app.id, extra_output) + for extra_pattern in app.extra_input_patterns: + self._enqueue_input_pattern(extra_pattern, app.id) def add_connector(self, connector: KafkaConnector, pipeline: Optional[str] = None): graph = self.graph @@ -99,12 +118,21 @@ def add_connector(self, connector: KafkaConnector, pipeline: Optional[str] = Non self.add_connector(connector, pipeline=pipeline) def add_source(self, source: Source): - node = (source.name, {"label": source.name, "node_type": source.node_type}) + node = ( + source.name, + { + NodeDataFields.LABEL: source.name, + NodeDataFields.NODE_TYPE: source.node_type, + }, + ) edge = (source.name, source.target) self.add_to_graph(node, edge) def add_sink(self, sink: Sink): - node = (sink.name, {"label": sink.name, "node_type": sink.node_type}) + node = ( + sink.name, + {NodeDataFields.LABEL: sink.name, NodeDataFields.NODE_TYPE: sink.node_type}, + ) edge = (sink.source, sink.name) self.add_to_graph(node, edge, reverse=True) @@ -144,7 +172,7 @@ async def get_metrics(self) -> List[Metric]: def get_node_type(self, id: str) -> str: try: - return self.graph.nodes[id].get("node_type") + return self.graph.nodes[id][NodeDataFields.NODE_TYPE] except KeyError: raise NodeNotFound() @@ -170,6 +198,15 @@ def find_associated_pipelines( def _add_topic(graph: nx.DiGraph, name: str): graph.add_node(name, label=name, node_type=NodeTypesEnum.TOPIC) + @staticmethod + def _filter_topic_node_ids(graph: nx.DiGraph) -> Set[str]: + return { + node_id + for node_id, data in graph.nodes(data=True) + if data[NodeDataFields.NODE_TYPE] == NodeTypesEnum.TOPIC + or data[NodeDataFields.NODE_TYPE] == NodeTypesEnum.ERROR_TOPIC + } + @staticmethod def _add_input_topic(graph: nx.DiGraph, app_id: str, topic_name: str): graph.add_edge(topic_name, app_id) @@ -183,6 +220,86 @@ def _add_output_topic( self._add_topic(graph, topic_name) graph.add_edge(app_id, topic_name) + def _enqueue_input_pattern(self, pattern: str, node_id: str): + """ + Enqueue a input topic pattern for an app or Kafka Connector + """ + self._topic_pattern_queue[pattern].add(node_id) + + def apply_input_pattern_edges(self): + topics = DataFlowGraph._filter_topic_node_ids(self.graph) + kafka_topics = self.kafka.get_all_topic_names() if self.kafka.enabled else set() + for pattern, node_ids in self._topic_pattern_queue.items(): + regex = re.compile(pattern) + matching_graph_known_topics = set(filter(regex.match, topics)) + # for unknown topics (unkown means not already present in the graph) + # in the graph we have to create the topic node + matching_unknown_kafka_topics = set( + filter(regex.match, kafka_topics) + ).difference(matching_graph_known_topics) + for node_id in node_ids: # node_id can be an app or a kafka connector + pipeline = self.graph.nodes[node_id].get(ATTR_PIPELINE) + # handle matching topics that are already present in the graph + self.handle_matching_topics( + matching_graph_known_topics, + node_id, + pattern, + pipeline, + add_topic=False, + ) + # handle unknown topics that are not present in the graph + self.handle_matching_topics( + matching_unknown_kafka_topics, + node_id, + pattern, + pipeline, + add_topic=True, + ) + + def handle_matching_topics( + self, + matching_unknown_kafka_topics: Set[str], + node_id: str, + pattern: str, + pipeline: str, + add_topic: bool = False, + ): + for matched_topic in matching_unknown_kafka_topics: + if add_topic: + self._add_topic(self.graph, matched_topic) + if pipeline is not None: + self.resolve_topic_pattern_in_pipeline( + matched_topic, node_id, pipeline, pattern + ) + self.resolve_topic_pattern_in_all_graph(matched_topic, node_id, pattern) + + def resolve_topic_pattern_in_all_graph( + self, matched_topic: str, node_id: str, pattern: str + ): + # resolve topic pattern in overall graph containing all pipelines + if settings.graph.resolve.input_pattern_topics.all: + # connect topic to graph + self.graph.add_edge(matched_topic, node_id) + else: + self.add_pattern_as_topic(self.graph, node_id, pattern) + + def add_pattern_as_topic(self, graph: nx.DiGraph, node_id: str, pattern: str): + # visualize the pattern as topic + self._add_topic(graph, pattern) + graph.add_edge(pattern, node_id) + + def resolve_topic_pattern_in_pipeline( + self, matched_topic: str, node_id: str, pipeline: str, pattern: str + ): + # resolve topic patterns in pipelines + if settings.graph.resolve.input_pattern_topics.pipelines: + if matched_topic not in self.pipelines[pipeline].nodes: + node = self.graph.nodes[matched_topic] + self.pipelines[pipeline].add_node(matched_topic, **node) + self.pipelines[pipeline].add_edge(matched_topic, node_id) + else: + self.add_pattern_as_topic(self.pipelines[pipeline], node_id, pattern) + @staticmethod def _add_error_topic( graph: nx.DiGraph, @@ -191,8 +308,10 @@ def _add_error_topic( ): graph.add_node( topic_name, - label=topic_name, - node_type=NodeTypesEnum.ERROR_TOPIC, + **{ + NodeDataFields.LABEL: topic_name, + NodeDataFields.NODE_TYPE: NodeTypesEnum.ERROR_TOPIC, + }, ) graph.add_edge(app_id, topic_name) diff --git a/backend/streams_explorer/core/services/kafka.py b/backend/streams_explorer/core/services/kafka_admin_client.py similarity index 85% rename from backend/streams_explorer/core/services/kafka.py rename to backend/streams_explorer/core/services/kafka_admin_client.py index 7192435a..f0ea50eb 100644 --- a/backend/streams_explorer/core/services/kafka.py +++ b/backend/streams_explorer/core/services/kafka_admin_client.py @@ -1,6 +1,7 @@ import concurrent.futures -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Set +from cachetools.func import ttl_cache from confluent_kafka.admin import ( AdminClient, ConfigEntry, @@ -12,7 +13,7 @@ from streams_explorer.core.config import settings -class Kafka: +class KafkaAdminClient: def __init__(self): self._enabled: bool = settings.kafka.enable self._client: AdminClient @@ -53,6 +54,10 @@ def get_topic_partitions( return None return metadata.partitions + @ttl_cache(maxsize=1, ttl=settings.kafka.topic_names_cache.ttl) + def get_all_topic_names(self) -> Set[str]: + return set(self._client.list_topics().topics.keys()) + @staticmethod def format_values(values: List[ConfigEntry]) -> Dict[str, Any]: return { diff --git a/backend/streams_explorer/models/k8s_config.py b/backend/streams_explorer/models/k8s_config.py index 4726ff0b..df7611a3 100644 --- a/backend/streams_explorer/models/k8s_config.py +++ b/backend/streams_explorer/models/k8s_config.py @@ -9,6 +9,8 @@ class K8sConfig: input_topics: List[str] = field(default_factory=list) # required for streaming app output_topic: Optional[str] = None # required for streaming app error_topic: Optional[str] = None + input_pattern: Optional[str] = None extra_input_topics: List[str] = field(default_factory=list) extra_output_topics: List[str] = field(default_factory=list) extra: Dict[str, str] = field(default_factory=dict) + extra_input_patterns: List[str] = field(default_factory=list) diff --git a/backend/streams_explorer/streams_explorer.py b/backend/streams_explorer/streams_explorer.py index d003e5bf..01fe4976 100644 --- a/backend/streams_explorer/streams_explorer.py +++ b/backend/streams_explorer/streams_explorer.py @@ -14,7 +14,7 @@ get_displayed_information_topic, ) from streams_explorer.core.services.dataflow_graph import DataFlowGraph, NodeTypesEnum -from streams_explorer.core.services.kafka import Kafka +from streams_explorer.core.services.kafka_admin_client import KafkaAdminClient from streams_explorer.core.services.kafkaconnect import KafkaConnect from streams_explorer.core.services.linking_services import LinkingService from streams_explorer.core.services.metric_providers import MetricProvider @@ -37,9 +37,11 @@ def __init__( ): self.applications: Dict[str, K8sApp] = {} self.kafka_connectors: List[KafkaConnector] = [] - self.data_flow = DataFlowGraph(metric_provider=metric_provider) + self.kafka = KafkaAdminClient() + self.data_flow = DataFlowGraph( + metric_provider=metric_provider, kafka=self.kafka + ) self.linking_service = linking_service - self.kafka = Kafka() def setup(self): self.__setup_k8s_environment() @@ -221,3 +223,5 @@ def __create_graph(self): for sink in sinks: self.data_flow.add_sink(sink) + + self.data_flow.apply_input_pattern_edges() diff --git a/backend/tests/test_dataflow_graph.py b/backend/tests/test_dataflow_graph.py index d8662bb9..2c2f22a2 100644 --- a/backend/tests/test_dataflow_graph.py +++ b/backend/tests/test_dataflow_graph.py @@ -1,8 +1,11 @@ +from typing import Set + import pytest from streams_explorer.core.config import settings from streams_explorer.core.k8s_app import ATTR_PIPELINE, K8sApp from streams_explorer.core.services.dataflow_graph import DataFlowGraph +from streams_explorer.core.services.kafka_admin_client import KafkaAdminClient from streams_explorer.core.services.metric_providers import MetricProvider from streams_explorer.models.kafka_connector import ( KafkaConnector, @@ -19,7 +22,7 @@ class TestDataFlowGraph: @pytest.fixture() def df(self) -> DataFlowGraph: - return DataFlowGraph(metric_provider=MetricProvider) + return DataFlowGraph(metric_provider=MetricProvider, kafka=KafkaAdminClient()) @pytest.mark.asyncio async def test_positioned_pipeline_graph_not_found(self, df: DataFlowGraph): @@ -63,6 +66,156 @@ def test_add_streaming_app(self, df: DataFlowGraph): assert df.graph.has_edge("test-app", "extra-output1") assert df.graph.has_edge("test-app", "extra-output2") + def test_resolve_input_pattern(self, df: DataFlowGraph): + df.add_streaming_app( + K8sApp.factory( + get_streaming_app_deployment(error_topic="fake-dead-letter-topic") + ) + ) + + assert len(df.graph.nodes) == 4 + + settings.graph.resolve.input_pattern_topics.all = True + df.add_streaming_app( + K8sApp.factory( + get_streaming_app_deployment( + name="test-app2", + input_topics=None, + output_topic="output-topic2", + error_topic="fake2-dead-letter-topic", + input_pattern=".*-dead-letter-topic", + ) + ) + ) + df.apply_input_pattern_edges() + assert len(df.graph.nodes) == 7 + assert df.graph.has_edge("fake-dead-letter-topic", "test-app2") + assert df.graph.has_edge( + "fake2-dead-letter-topic", "test-app2" + ), "Should match on app's own error topic" + assert df.graph.has_edge("test-app2", "output-topic2") + assert df.graph.has_edge("test-app2", "fake2-dead-letter-topic") + + def test_resolve_input_patterns_for_topics_in_kafka(self, monkeypatch): + kafka = KafkaAdminClient() + kafka._enabled = True + + def get_all_topic_names() -> Set[str]: + return {"another-dead-letter-topic", "another-non-matching-topic"} + + monkeypatch.setattr(kafka, "get_all_topic_names", get_all_topic_names) + df = DataFlowGraph(metric_provider=MetricProvider, kafka=kafka) + + settings.graph.resolve.input_pattern_topics.all = True + df.add_streaming_app( + K8sApp.factory( + get_streaming_app_deployment( + name="test-app2", + input_topics=None, + output_topic="output-topic2", + error_topic="fake2-dead-letter-topic", + input_pattern=".*-dead-letter-topic", + ) + ) + ) + df.apply_input_pattern_edges() + assert len(df.graph.nodes) == 4 + assert df.graph.has_edge("another-dead-letter-topic", "test-app2") + assert df.graph.has_edge("fake2-dead-letter-topic", "test-app2") + assert df.graph.has_node("another-dead-letter-topic") + assert not df.graph.has_node("another-non-matching-topic") + + def test_no_resolve_input_pattern(self, df: DataFlowGraph): + df.add_streaming_app( + K8sApp.factory( + get_streaming_app_deployment(error_topic="fake-dead-letter-topic") + ) + ) + + assert len(df.graph.nodes) == 4 + + settings.graph.resolve.input_pattern_topics.all = False + df.add_streaming_app( + K8sApp.factory( + get_streaming_app_deployment( + name="test-app2", + input_topics=None, + output_topic="output-topic2", + error_topic="fake2-dead-letter-topic", + input_pattern=".*-dead-letter-topic", + ) + ) + ) + df.apply_input_pattern_edges() + assert len(df.graph.nodes) == 8 + assert df.graph.has_edge(".*-dead-letter-topic", "test-app2") + assert not df.graph.has_edge("fake2-dead-letter-topic", "test-app2") + assert df.graph.has_edge("test-app2", "output-topic2") + assert df.graph.has_edge("test-app2", "fake2-dead-letter-topic") + + def test_resolve_extra_input_patterns(self, df: DataFlowGraph): + df.add_streaming_app( + K8sApp.factory( + get_streaming_app_deployment( + multiple_outputs="out=output-non-match-topic", + error_topic="fake-dead-letter-topic", + ) + ) + ) + + assert len(df.graph.nodes) == 5 + assert df.graph.has_edge("input-topic", "test-app") + assert df.graph.has_edge("test-app", "output-topic") + assert df.graph.has_edge("test-app", "fake-dead-letter-topic") + + settings.graph.resolve.input_pattern_topics.all = True + df.add_streaming_app( + K8sApp.factory( + get_streaming_app_deployment( + name="test-app2", + input_topics="output-topic", + output_topic="another-topic", + error_topic="fake2-dead-letter-topic", + extra_input_patterns="fake1=.*-dead-letter-topic,fake2=.*-output-topic", + ) + ) + ) + df.apply_input_pattern_edges() + assert len(df.graph.nodes) == 8 + assert df.graph.has_edge("fake-dead-letter-topic", "test-app2") + assert df.graph.has_edge( + "fake2-dead-letter-topic", "test-app2" + ), "Should match on app's own error topic" + assert not df.graph.has_edge("another-topic", "test-app2") + assert df.graph.has_edge("fake-dead-letter-topic", "test-app2") + + def test_no_resolve_extra_input_patterns(self, df: DataFlowGraph): + settings.graph.resolve.input_pattern_topics.all = False + df.add_streaming_app( + K8sApp.factory( + get_streaming_app_deployment( + multiple_outputs="out=output-non-match-topic", + error_topic="fake-dead-letter-topic", + ) + ) + ) + + df.add_streaming_app( + K8sApp.factory( + get_streaming_app_deployment( + name="test-app2", + input_topics="output-topic", + output_topic="output-topic2", + error_topic="fake2-dead-letter-topic", + extra_input_patterns="fake1=.*-dead-letter-topic,fake2=.*output-topic", + ) + ) + ) + df.apply_input_pattern_edges() + assert len(df.graph.nodes) == 10 + assert df.graph.has_edge(".*-dead-letter-topic", "test-app2") + assert df.graph.has_edge(".*output-topic", "test-app2") + def test_add_connector(self, df: DataFlowGraph): sink_connector = KafkaConnector( name="test-sink-connector", diff --git a/backend/tests/test_kafka.py b/backend/tests/test_kafka.py index fbf2b001..b97fdaf9 100644 --- a/backend/tests/test_kafka.py +++ b/backend/tests/test_kafka.py @@ -6,7 +6,7 @@ from dynaconf.validator import ValidationError from streams_explorer.core.config import settings -from streams_explorer.core.services.kafka import Kafka +from streams_explorer.core.services.kafka_admin_client import KafkaAdminClient test_topic = "test-topic" @@ -36,12 +36,12 @@ def test_format_values(self): ConfigEntry(name="key", value="value"), ConfigEntry(name="sensitive", value="supersecret", is_sensitive=True), ] - assert Kafka.format_values(raw) == {"key": "value"} - assert Kafka.format_values([]) == {} + assert KafkaAdminClient.format_values(raw) == {"key": "value"} + assert KafkaAdminClient.format_values([]) == {} @pytest.fixture() - def kafka(self, monkeypatch) -> Kafka: - kafka = Kafka() + def kafka(self, monkeypatch) -> KafkaAdminClient: + kafka = KafkaAdminClient() def mock_get_resource(resource: ConfigResource, *_) -> List[ConfigEntry]: if resource == ConfigResource(ConfigResource.Type.TOPIC, test_topic): @@ -51,7 +51,7 @@ def mock_get_resource(resource: ConfigResource, *_) -> List[ConfigEntry]: ] return [] - monkeypatch.setattr(kafka, "_Kafka__get_resource", mock_get_resource) + monkeypatch.setattr(kafka, "_KafkaAdminClient__get_resource", mock_get_resource) @dataclass class MockTopicMetadata: @@ -65,18 +65,18 @@ def mock_get_topic(topic: str) -> Optional[MockTopicMetadata]: return MockTopicMetadata(test_topic, partitions) return None - monkeypatch.setattr(kafka, "_Kafka__get_topic", mock_get_topic) + monkeypatch.setattr(kafka, "_KafkaAdminClient__get_topic", mock_get_topic) return kafka - def test_get_topic_config(self, kafka: Kafka): + def test_get_topic_config(self, kafka: KafkaAdminClient): assert kafka.get_topic_config(test_topic) == { "cleanup.policy": "delete", "retention.ms": "-1", } assert kafka.get_topic_config("doesnt-exist") == {} - def test_get_topic_partitions(self, kafka: Kafka): + def test_get_topic_partitions(self, kafka: KafkaAdminClient): partitions = kafka.get_topic_partitions(test_topic) assert type(partitions) is dict assert len(partitions) == 10 diff --git a/backend/tests/test_streams_explorer.py b/backend/tests/test_streams_explorer.py index 76bfc4bb..113a9ae2 100644 --- a/backend/tests/test_streams_explorer.py +++ b/backend/tests/test_streams_explorer.py @@ -1,4 +1,4 @@ -from typing import List, Optional +from typing import List, Optional, Set import pytest from kubernetes.client import V1beta1CronJob @@ -143,6 +143,9 @@ def get_topic_partitions(_, topic) -> Optional[dict]: return {i: _ for i in range(5)} return None + def get_all_topic_names(_) -> Set[str]: + return set() + mocker.patch( "streams_explorer.core.services.kafkaconnect.KafkaConnect.get_connectors", get_connectors, @@ -156,13 +159,17 @@ def get_topic_partitions(_, topic) -> Optional[dict]: lambda config: config, ) mocker.patch( - "streams_explorer.core.services.kafka.Kafka.get_topic_config", + "streams_explorer.core.services.kafka_admin_client.KafkaAdminClient.get_topic_config", get_topic_config, ) mocker.patch( - "streams_explorer.core.services.kafka.Kafka.get_topic_partitions", + "streams_explorer.core.services.kafka_admin_client.KafkaAdminClient.get_topic_partitions", get_topic_partitions, ) + mocker.patch( + "streams_explorer.core.services.kafka_admin_client.KafkaAdminClient.get_all_topic_names", + get_all_topic_names, + ) return explorer diff --git a/backend/tests/utils.py b/backend/tests/utils.py index 1603024b..64260336 100644 --- a/backend/tests/utils.py +++ b/backend/tests/utils.py @@ -31,8 +31,10 @@ def get_streaming_app_deployment( input_topics: Optional[str] = "input-topic", output_topic: Optional[str] = "output-topic", error_topic: Optional[str] = "error-topic", + input_pattern: Optional[str] = None, multiple_inputs: Optional[str] = None, multiple_outputs: Optional[str] = None, + extra_input_patterns: Optional[str] = None, extra: Dict[str, str] = {}, env_prefix: str = "APP_", pipeline: Optional[str] = None, @@ -43,8 +45,10 @@ def get_streaming_app_deployment( input_topics, output_topic, error_topic, + input_pattern=input_pattern, multiple_inputs=multiple_inputs, multiple_outputs=multiple_outputs, + extra_input_patterns=extra_input_patterns, extra=extra, env_prefix=env_prefix, consumer_group=consumer_group, @@ -60,8 +64,10 @@ def get_streaming_app_stateful_set( input_topics: str = "input-topic", output_topic: str = "output-topic", error_topic: str = "error-topic", + input_pattern: Optional[str] = None, multiple_inputs: Optional[str] = None, multiple_outputs: Optional[str] = None, + extra_input_patterns: Optional[str] = None, extra: Dict[str, str] = {}, env_prefix: str = "APP_", pipeline: Optional[str] = None, @@ -73,8 +79,10 @@ def get_streaming_app_stateful_set( input_topics, output_topic, error_topic, + input_pattern, multiple_inputs=multiple_inputs, multiple_outputs=multiple_outputs, + extra_input_patterns=extra_input_patterns, extra=extra, env_prefix=env_prefix, consumer_group=consumer_group, @@ -138,8 +146,10 @@ def get_env( input_topics: Optional[str], output_topic: Optional[str], error_topic: Optional[str], + input_pattern: Optional[str] = None, multiple_inputs: Optional[str] = None, multiple_outputs: Optional[str] = None, + extra_input_patterns: Optional[str] = None, extra: Dict[str, str] = {}, env_prefix: str = "APP_", ) -> List[V1EnvVar]: @@ -150,6 +160,8 @@ def get_env( env.append(V1EnvVar(name=env_prefix + "OUTPUT_TOPIC", value=output_topic)) if error_topic: env.append(V1EnvVar(name=env_prefix + "ERROR_TOPIC", value=error_topic)) + if input_pattern: + env.append(V1EnvVar(name=env_prefix + "INPUT_PATTERN", value=input_pattern)) if multiple_inputs: env.append( V1EnvVar(name=env_prefix + "EXTRA_INPUT_TOPICS", value=multiple_inputs) @@ -158,6 +170,12 @@ def get_env( env.append( V1EnvVar(name=env_prefix + "EXTRA_OUTPUT_TOPICS", value=multiple_outputs) ) + if extra_input_patterns: + env.append( + V1EnvVar( + name=env_prefix + "EXTRA_INPUT_PATTERNS", value=extra_input_patterns + ) + ) if extra: for k, v in extra.items(): env.append(V1EnvVar(name=env_prefix + k, value=v)) @@ -174,6 +192,7 @@ def get_args( error_topic: Optional[str], multiple_inputs: Optional[str], multiple_outputs: Optional[str], + extra_input_patterns: Optional[str], extra: Dict[str, str], ) -> List[str]: args = [] @@ -187,6 +206,8 @@ def get_args( args.append(_create_arg("extra-input-topics", multiple_inputs)) if multiple_outputs: args.append(_create_arg("extra-output-topics", multiple_outputs)) + if extra_input_patterns: + args.append(_create_arg("extra-input-patterns", extra_input_patterns)) if extra: for k, v in extra.items(): args.append(_create_arg(k, v)) @@ -197,8 +218,10 @@ def get_template( input_topics: Optional[str], output_topic: Optional[str], error_topic: Optional[str], + input_pattern: Optional[str], multiple_inputs: Optional[str], multiple_outputs: Optional[str], + extra_input_patterns: Optional[str], extra: Dict[str, str], env_prefix: str = "APP_", consumer_group: Optional[str] = None, @@ -211,8 +234,10 @@ def get_template( input_topics, output_topic, error_topic, + input_pattern, multiple_inputs, multiple_outputs, + extra_input_patterns=extra_input_patterns, env_prefix=env_prefix, extra=extra, ) @@ -223,6 +248,7 @@ def get_template( error_topic, multiple_inputs, multiple_outputs, + extra_input_patterns, extra, ) container = V1Container(name="test-container", env=env, args=args) From c33ec581ff672127f93c2520b17a7f0962601aae Mon Sep 17 00:00:00 2001 From: bakdata-bot <31185348+bakdata-bot@users.noreply.github.com> Date: Thu, 17 Mar 2022 08:02:07 +0000 Subject: [PATCH 2/2] =?UTF-8?q?Bump=20version:=201.5.0=20=E2=86=92=201.6.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- backend/pyproject.toml | 2 +- backend/streams_explorer/__init__.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 8f6134a2..798372be 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.5.0 +current_version = 1.6.0 commit = True tag = True diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 509bd46e..824e3992 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "streams-explorer" -version = "1.5.0" +version = "1.6.0" description = "Explore Data Pipelines in Apache Kafka." readme = "../README.md" authors = ["bakdata"] diff --git a/backend/streams_explorer/__init__.py b/backend/streams_explorer/__init__.py index 383e9567..9fc92a31 100644 --- a/backend/streams_explorer/__init__.py +++ b/backend/streams_explorer/__init__.py @@ -1,2 +1,2 @@ """Streams Explorer.""" -__version__ = "1.5.0" +__version__ = "1.6.0"