Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into fix/package-import
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted committed Mar 21, 2022
2 parents cb0e360 + c33ec58 commit f966fe0
Show file tree
Hide file tree
Showing 15 changed files with 380 additions and 35 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.5.0
current_version = 1.6.0
commit = True
tag = True

Expand Down
5 changes: 4 additions & 1 deletion backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,18 @@ The following configuration options are available:

#### General

- `graph.update_interval` Update the graph every X seconds (integer, **required**, default: `300`)
- `graph.update_interval` Update the graph every X seconds (int, **required**, default: `300`)
- `graph.layout_arguments` Arguments passed to graphviz layout (string, **required**, default: `-Grankdir=LR -Gnodesep=0.8 -Gpad=10`)
- `graph.pipeline_distance` Increase/decrease vertical space between pipeline graphs by X pixels (int, **required**, default: `500`)
- `graph.resolve.input_pattern_topics.all` If true topics that match (extra) input pattern(s) are connected to the streaming app in the graph containing all pipelines (bool, **required**, default: `false`)
- `graph.resolve.input_pattern_topics.pipelines` If true topics that match (extra) input pattern(s) are connected to the streaming app in pipeline graphs (bool, **required**, default: `false`)

#### Kafka

- `kafka.enable` Enable Kafka (bool, default: `false`)
- `kafka.config` librdkafka configuration properties ([reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)) (dict, default: `{"bootstrap.servers": "localhost:9092"}`)
- `kafka.displayed_information` Configuration options of Kafka topics displayed in the frontend (list of dict)
- `kafka.topic_names_cache.ttl` Cache for retrieving all topic names (used when input topic patterns are resolved) (int, default: `3600`)

#### Kafka Connect

Expand Down
2 changes: 1 addition & 1 deletion backend/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "streams-explorer"
version = "1.5.0"
version = "1.6.0"
description = "Explore Data Pipelines in Apache Kafka."
readme = "README.md"
authors = ["bakdata"]
Expand Down
21 changes: 16 additions & 5 deletions backend/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ graph:
update_interval: 300
layout_arguments: "-Grankdir=LR -Gnodesep=0.8 -Gpad=10"
pipeline_distance: 500
resolve:
input_pattern_topics:
# If true topics that match (extra) input pattern(s) are connected to the streaming app in the graph containing all pipelines
all: false
# If true topics that match (extra) input pattern(s) are connected to the streaming app in pipeline graphs
pipelines: false

k8s:
deployment:
Expand Down Expand Up @@ -35,17 +41,22 @@ kafka:
displayed_information:
- name: "Cleanup Policy"
key: "cleanup.policy"
# cache for retrieving all topic names (used when input topic patterns are resolved)
topic_names_cache:
ttl: 3600

node_info:
# cache time-to-live in seconds
cache_ttl: 3600

## (optional) configure Kafka Connect url and displayed information
# kafkaconnect:
# url: "http://localhost:8083"
# displayed_information:
# - name: "Transformer"
# key: "transforms.changeTopic.regex"
# kafkaconnect:
# url: "http://localhost:8083"
# displayed_information:
# - name: "Transformer"
# key: "transforms.changeTopic.regex"
# - name: "Type"
# key: "transforms.changeTopic.type"

## (optional) configure Schema Registry for topic information (supports Karapace and Confluent Schema Registry)
# schemaregistry:
Expand Down
9 changes: 9 additions & 0 deletions backend/streams_explorer/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@
Validator("graph.update_interval", must_exist=True, is_type_of=int),
Validator("graph.layout_arguments", must_exist=True, is_type_of=str),
Validator("graph.pipeline_distance", must_exist=True, is_type_of=int),
Validator(
"graph.resolve.input_pattern_topics.all", must_exist=True, is_type_of=bool
),
Validator(
"graph.resolve.input_pattern_topics.pipelines",
must_exist=True,
is_type_of=bool,
),
Validator("k8s.deployment.cluster", must_exist=True, is_type_of=bool),
Validator("k8s.deployment.context", is_type_of=str),
Validator("k8s.deployment.namespaces", must_exist=True, is_type_of=list),
Expand All @@ -31,6 +39,7 @@
)
),
Validator("kafka.displayed_information", is_type_of=list, default=[]),
Validator("kafka.topic_names_cache.ttl", is_type_of=int, default=3600),
Validator("node_info.cache_ttl", must_exist=True, is_type_of=int),
Validator("kafkaconnect.url", default=None),
Validator("kafkaconnect.displayed_information", is_type_of=list, default=[]),
Expand Down
2 changes: 2 additions & 0 deletions backend/streams_explorer/core/k8s_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ def __init__(self, k8s_object: K8sObject):
self.id: str
self.name: str
self.input_topics: List[str] = []
self.input_pattern: Optional[str] = None
self.output_topic: Optional[str] = None
self.error_topic: Optional[str] = None
self.extra_input_topics: List[str] = []
self.extra_output_topics: List[str] = []
self.extra_input_patterns: List[str] = []
self.attributes: Dict[str, str] = {}
self.setup()

Expand Down
4 changes: 4 additions & 0 deletions backend/streams_explorer/core/k8s_config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ def parse_config(self, name: str, value: str):
self.config.extra_input_topics = self.parse_extra_topics(value)
elif name == "EXTRA_OUTPUT_TOPICS":
self.config.extra_output_topics = self.parse_extra_topics(value)
elif name == "INPUT_PATTERN":
self.config.input_pattern = value
elif name == "EXTRA_INPUT_PATTERNS":
self.config.extra_input_patterns = self.parse_extra_topics(value)
else:
self.config.extra[name] = value

Expand Down
137 changes: 128 additions & 9 deletions backend/streams_explorer/core/services/dataflow_graph.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import re
from collections import defaultdict
from enum import Enum
from typing import Dict, List, Optional, Set, Type

import networkx as nx
Expand All @@ -7,6 +10,7 @@

from streams_explorer.core.config import settings
from streams_explorer.core.k8s_app import ATTR_PIPELINE, K8sApp
from streams_explorer.core.services.kafka_admin_client import KafkaAdminClient
from streams_explorer.core.services.metric_providers import MetricProvider
from streams_explorer.models.graph import GraphEdge, GraphNode, Metric
from streams_explorer.models.kafka_connector import (
Expand All @@ -22,13 +26,23 @@ class NodeNotFound(Exception):
pass


class NodeDataFields(str, Enum):
NODE_TYPE = "node_type"
LABEL = "label"


class DataFlowGraph:
def __init__(self, metric_provider: Type[MetricProvider]):
def __init__(self, metric_provider: Type[MetricProvider], kafka: KafkaAdminClient):
self.graph = nx.DiGraph()
self.json_graph: dict = {}
self.pipelines: Dict[str, nx.DiGraph] = {}
self.json_pipelines: Dict[str, dict] = {}
self.metric_provider_class = metric_provider
self.kafka = kafka

self._topic_pattern_queue: Dict[str, Set[str]] = defaultdict(
set
) # topic pattern -> set of target node ids

async def store_json_graph(self):
self.json_graph = await self.get_positioned_graph()
Expand All @@ -54,20 +68,25 @@ def _add_streaming_app(self, graph: nx.DiGraph, app: K8sApp):
node_type=NodeTypesEnum.STREAMING_APP,
**app.attributes,
)

for input_topic in app.input_topics:
self._add_topic(graph, input_topic)
self._add_input_topic(graph, app.id, input_topic)
if app.output_topic:
self._add_topic(graph, app.output_topic)
self._add_output_topic(graph, app.id, app.output_topic)
if app.error_topic:
self._add_error_topic(graph, app.id, app.error_topic)
for input_topic in app.input_topics:
self._add_topic(graph, input_topic)
self._add_input_topic(graph, app.id, input_topic)
if app.input_pattern:
self._enqueue_input_pattern(app.input_pattern, app.id)
for extra_input in app.extra_input_topics:
self._add_topic(graph, extra_input)
self._add_input_topic(graph, app.id, extra_input)
for extra_output in app.extra_output_topics:
self._add_topic(graph, extra_output)
self._add_output_topic(graph, app.id, extra_output)
for extra_pattern in app.extra_input_patterns:
self._enqueue_input_pattern(extra_pattern, app.id)

def add_connector(self, connector: KafkaConnector, pipeline: Optional[str] = None):
graph = self.graph
Expand Down Expand Up @@ -99,12 +118,21 @@ def add_connector(self, connector: KafkaConnector, pipeline: Optional[str] = Non
self.add_connector(connector, pipeline=pipeline)

def add_source(self, source: Source):
node = (source.name, {"label": source.name, "node_type": source.node_type})
node = (
source.name,
{
NodeDataFields.LABEL: source.name,
NodeDataFields.NODE_TYPE: source.node_type,
},
)
edge = (source.name, source.target)
self.add_to_graph(node, edge)

def add_sink(self, sink: Sink):
node = (sink.name, {"label": sink.name, "node_type": sink.node_type})
node = (
sink.name,
{NodeDataFields.LABEL: sink.name, NodeDataFields.NODE_TYPE: sink.node_type},
)
edge = (sink.source, sink.name)
self.add_to_graph(node, edge, reverse=True)

Expand Down Expand Up @@ -144,7 +172,7 @@ async def get_metrics(self) -> List[Metric]:

def get_node_type(self, id: str) -> str:
try:
return self.graph.nodes[id].get("node_type")
return self.graph.nodes[id][NodeDataFields.NODE_TYPE]
except KeyError:
raise NodeNotFound()

Expand All @@ -170,6 +198,15 @@ def find_associated_pipelines(
def _add_topic(graph: nx.DiGraph, name: str):
graph.add_node(name, label=name, node_type=NodeTypesEnum.TOPIC)

@staticmethod
def _filter_topic_node_ids(graph: nx.DiGraph) -> Set[str]:
return {
node_id
for node_id, data in graph.nodes(data=True)
if data[NodeDataFields.NODE_TYPE] == NodeTypesEnum.TOPIC
or data[NodeDataFields.NODE_TYPE] == NodeTypesEnum.ERROR_TOPIC
}

@staticmethod
def _add_input_topic(graph: nx.DiGraph, app_id: str, topic_name: str):
graph.add_edge(topic_name, app_id)
Expand All @@ -183,6 +220,86 @@ def _add_output_topic(
self._add_topic(graph, topic_name)
graph.add_edge(app_id, topic_name)

def _enqueue_input_pattern(self, pattern: str, node_id: str):
"""
Enqueue a input topic pattern for an app or Kafka Connector
"""
self._topic_pattern_queue[pattern].add(node_id)

def apply_input_pattern_edges(self):
topics = DataFlowGraph._filter_topic_node_ids(self.graph)
kafka_topics = self.kafka.get_all_topic_names() if self.kafka.enabled else set()
for pattern, node_ids in self._topic_pattern_queue.items():
regex = re.compile(pattern)
matching_graph_known_topics = set(filter(regex.match, topics))
# for unknown topics (unkown means not already present in the graph)
# in the graph we have to create the topic node
matching_unknown_kafka_topics = set(
filter(regex.match, kafka_topics)
).difference(matching_graph_known_topics)
for node_id in node_ids: # node_id can be an app or a kafka connector
pipeline = self.graph.nodes[node_id].get(ATTR_PIPELINE)
# handle matching topics that are already present in the graph
self.handle_matching_topics(
matching_graph_known_topics,
node_id,
pattern,
pipeline,
add_topic=False,
)
# handle unknown topics that are not present in the graph
self.handle_matching_topics(
matching_unknown_kafka_topics,
node_id,
pattern,
pipeline,
add_topic=True,
)

def handle_matching_topics(
self,
matching_unknown_kafka_topics: Set[str],
node_id: str,
pattern: str,
pipeline: str,
add_topic: bool = False,
):
for matched_topic in matching_unknown_kafka_topics:
if add_topic:
self._add_topic(self.graph, matched_topic)
if pipeline is not None:
self.resolve_topic_pattern_in_pipeline(
matched_topic, node_id, pipeline, pattern
)
self.resolve_topic_pattern_in_all_graph(matched_topic, node_id, pattern)

def resolve_topic_pattern_in_all_graph(
self, matched_topic: str, node_id: str, pattern: str
):
# resolve topic pattern in overall graph containing all pipelines
if settings.graph.resolve.input_pattern_topics.all:
# connect topic to graph
self.graph.add_edge(matched_topic, node_id)
else:
self.add_pattern_as_topic(self.graph, node_id, pattern)

def add_pattern_as_topic(self, graph: nx.DiGraph, node_id: str, pattern: str):
# visualize the pattern as topic
self._add_topic(graph, pattern)
graph.add_edge(pattern, node_id)

def resolve_topic_pattern_in_pipeline(
self, matched_topic: str, node_id: str, pipeline: str, pattern: str
):
# resolve topic patterns in pipelines
if settings.graph.resolve.input_pattern_topics.pipelines:
if matched_topic not in self.pipelines[pipeline].nodes:
node = self.graph.nodes[matched_topic]
self.pipelines[pipeline].add_node(matched_topic, **node)
self.pipelines[pipeline].add_edge(matched_topic, node_id)
else:
self.add_pattern_as_topic(self.pipelines[pipeline], node_id, pattern)

@staticmethod
def _add_error_topic(
graph: nx.DiGraph,
Expand All @@ -191,8 +308,10 @@ def _add_error_topic(
):
graph.add_node(
topic_name,
label=topic_name,
node_type=NodeTypesEnum.ERROR_TOPIC,
**{
NodeDataFields.LABEL: topic_name,
NodeDataFields.NODE_TYPE: NodeTypesEnum.ERROR_TOPIC,
},
)
graph.add_edge(app_id, topic_name)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import concurrent.futures
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Set

from cachetools.func import ttl_cache
from confluent_kafka.admin import (
AdminClient,
ConfigEntry,
Expand All @@ -12,7 +13,7 @@
from streams_explorer.core.config import settings


class Kafka:
class KafkaAdminClient:
def __init__(self):
self._enabled: bool = settings.kafka.enable
self._client: AdminClient
Expand Down Expand Up @@ -53,6 +54,10 @@ def get_topic_partitions(
return None
return metadata.partitions

@ttl_cache(maxsize=1, ttl=settings.kafka.topic_names_cache.ttl)
def get_all_topic_names(self) -> Set[str]:
return set(self._client.list_topics().topics.keys())

@staticmethod
def format_values(values: List[ConfigEntry]) -> Dict[str, Any]:
return {
Expand Down
2 changes: 2 additions & 0 deletions backend/streams_explorer/models/k8s_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ class K8sConfig:
input_topics: List[str] = field(default_factory=list) # required for streaming app
output_topic: Optional[str] = None # required for streaming app
error_topic: Optional[str] = None
input_pattern: Optional[str] = None
extra_input_topics: List[str] = field(default_factory=list)
extra_output_topics: List[str] = field(default_factory=list)
extra: Dict[str, str] = field(default_factory=dict)
extra_input_patterns: List[str] = field(default_factory=list)
Loading

0 comments on commit f966fe0

Please sign in to comment.