Skip to content

Commit

Permalink
add a bit of cluster magic
Browse files Browse the repository at this point in the history
  • Loading branch information
fuziontech committed Aug 23, 2023
1 parent 71b2d43 commit 66652c8
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
41 changes: 41 additions & 0 deletions housewatch/clickhouse/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from typing import Dict, Optional
from clickhouse_pool import ChPool
from clickhouse_driver import Client
from housewatch.clickhouse.queries.sql import EXISTING_TABLES_SQL
from housewatch.utils import str_to_bool
from django.core.cache import cache
Expand All @@ -27,13 +28,52 @@
)


def run_query_on_shards(
query: str,
params: Dict[str, str | int] = {},
settings: Dict[str, str | int] = {},
query_id: Optional[str] = None,
substitute_params: bool = True,
cluster: Optional[str] = None,
):
from housewatch.clickhouse.clusters import get_node_per_shard

final_query = query % (params or {}) if substitute_params else query

nodes = get_node_per_shard(cluster)
responses = []
for shard, node in nodes:
client = Client(
host=node["host_address"],
database=settings.CLICKHOUSE_DATABASE,
user=settings.CLICKHOUSE_USER,
secure=settings.CLICKHOUSE_SECURE,
ca_certs=settings.CLICKHOUSE_CA,
verify=settings.CLICKHOUSE_VERIFY,
settings={"max_result_rows": "2000"},
send_receive_timeout=30,
password=settings.CLICKHOUSE_PASSWORD,
)
result = client.execute(final_query, settings=settings, with_column_types=True, query_id=query_id)
response = []
for res in result[0]:
item = {}
for index, key in enumerate(result[1]):
item[key[0]] = res[index]

response.append(item)
responses.append((shard, response))
return response


def run_query(
query: str,
params: Dict[str, str | int] = {},
settings: Dict[str, str | int] = {},
query_id: Optional[str] = None,
use_cache: bool = True, # defaulting to True for now for simplicity, but ideally we should default this to False
substitute_params: bool = True,
cluster: Optional[str] = None,
):
final_query = query % (params or {}) if substitute_params else query
query_hash = ""
Expand All @@ -43,6 +83,7 @@ def run_query(
cached_result = cache.get(query_hash)
if cached_result:
return json.loads(cached_result)

with pool.get_client() as client:
result = client.execute(final_query, settings=settings, with_column_types=True, query_id=query_id)
response = []
Expand Down
20 changes: 18 additions & 2 deletions housewatch/clickhouse/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


def get_clusters():
QUERY = """Select * FROM system.clusters"""
QUERY = """Select cluster, shard_num, shard_weight, replica_num, host_name, host_address, port, is_local, user, default_database, errors_count, slowdowns_count, estimated_recovery_time FROM system.clusters"""
res = run_query(QUERY)
clusters = defaultdict(list)
for c_node in res:
Expand All @@ -15,5 +15,21 @@ def get_clusters():


def get_cluster(cluster):
QUERY = """Select * FROM system.clusters WHERE cluster = '%(cluster_name)s' """
QUERY = """Select cluster, shard_num, shard_weight, replica_num, host_name, host_address, port, is_local, user, default_database, errors_count, slowdowns_count, estimated_recovery_time FROM system.clusters WHERE cluster = '%(cluster_name)s' """
return run_query(QUERY, {"cluster_name": cluster})


def get_shards(cluster):
cluster = get_cluster(cluster)
nodes = defaultdict(list)
for node in cluster:
nodes[node["shard_number"]].append(node)
return nodes


def get_node_per_shard(cluster):
shards = get_shards(cluster)
nodes = []
for shard, n in shards.items():
nodes.append((shard, n[0]))
return nodes

0 comments on commit 66652c8

Please sign in to comment.