Skip to content

Commit

Permalink
Cloud test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
genzgd committed Sep 24, 2024
1 parent 5c4388f commit 0475afd
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 64 deletions.
5 changes: 3 additions & 2 deletions clickhouse_connect/driver/httpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,11 @@ def ping(self):
logger.debug('ping failed', exc_info=True)
return False

def close_connections(self):
self.http.clear()

def close(self):
if self._owns_pool_manager:
self.http.clear()
all_managers.pop(self.http, None)

def close_connections(self):
self.http.clear()
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ build-backend = "setuptools.build_meta"
log_cli = true
log_cli_level = "INFO"
env_files = ["test.env"]
asyncio_default_fixture_loop_scope = "session"
65 changes: 34 additions & 31 deletions tests/integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
import random
import time
from subprocess import Popen, PIPE
from typing import Iterator, NamedTuple, Sequence, Optional
from typing import Iterator, NamedTuple, Sequence, Optional, Callable

from pytest import fixture

from clickhouse_connect import create_client
from clickhouse_connect import common
from clickhouse_connect.driver.common import coerce_bool
from clickhouse_connect.driver.exceptions import OperationalError
from clickhouse_connect.tools.testing import TableContext
from clickhouse_connect.driver.httpclient import HttpClient
from clickhouse_connect.driver import AsyncClient, Client
from clickhouse_connect.driver import AsyncClient, Client, create_client
from tests.helpers import PROJECT_ROOT_DIR


Expand Down Expand Up @@ -60,14 +59,40 @@ def test_db_fixture(test_config: TestConfig) -> Iterator[str]:
yield test_config.test_database or 'default'


@fixture(scope='session', name='test_create_client')
def test_create_client_fixture(test_config: TestConfig) -> Callable:
def f(**kwargs):
client = create_client(host=test_config.host,
port=test_config.port,
user=test_config.username,
password=test_config.password,
settings={'allow_suspicious_low_cardinality_types': 1},
client_name='int_tests/test',
**kwargs)
if client.min_version('22.8'):
client.set_client_setting('database_replicated_enforce_synchronous_settings', 1)
if client.min_version('24.8') and not test_config.cloud:
client.set_client_setting('allow_experimental_json_type', 1)
client.set_client_setting('allow_experimental_dynamic_type', 1)
client.set_client_setting('allow_experimental_variant_type', 1)
if test_config.insert_quorum:
client.set_client_setting('insert_quorum', test_config.insert_quorum)
elif test_config.cloud:
client.set_client_setting('select_sequential_consistency', 1)
client.database = test_config.test_database
return client

return f


@fixture(scope='session', name='test_table_engine')
def test_table_engine_fixture() -> Iterator[str]:
yield 'MergeTree'


# pylint: disable=too-many-branches
@fixture(scope='session', autouse=True, name='test_client')
def test_client_fixture(test_config: TestConfig, test_db: str) -> Iterator[Client]:
def test_client_fixture(test_config: TestConfig, test_create_client: Callable) -> Iterator[Client]:
compose_file = f'{PROJECT_ROOT_DIR}/docker-compose.yml'
if test_config.docker:
run_cmd(['docker', 'compose', '-f', compose_file, 'down', '-v'])
Expand All @@ -80,41 +105,19 @@ def test_client_fixture(test_config: TestConfig, test_db: str) -> Iterator[Clien
raise TestException(f'Failed to start docker: {up_result[2]}')
time.sleep(5)
tries = 0
if test_config.docker:
HttpClient.params = {'SQL_test_setting': 'value'}
HttpClient.valid_transport_settings.add('SQL_test')
while True:
tries += 1
try:
if test_config.docker:
HttpClient.params = {'SQL_test_setting': 'value'}
HttpClient.valid_transport_settings.add('SQL_test')
client = create_client(
host=test_config.host,
port=test_config.port,
username=test_config.username,
password=test_config.password,
query_limit=0,
compress=test_config.compress,
client_name='int_tests/test',
settings={'allow_suspicious_low_cardinality_types': True,
'insert_deduplicate': False,
'async_insert': 0}
)
client = test_create_client()
break
except OperationalError as ex:
if tries > 10:
raise TestException('Failed to connect to ClickHouse server after 30 seconds') from ex
time.sleep(3)
if client.min_version('22.8'):
client.set_client_setting('database_replicated_enforce_synchronous_settings', 1)
if client.min_version('24.8'):
client.set_client_setting('allow_experimental_json_type', 1)
client.set_client_setting('allow_experimental_dynamic_type', 1)
client.set_client_setting('allow_experimental_variant_type', 1)
if test_config.insert_quorum:
client.set_client_setting('insert_quorum', test_config.insert_quorum)
elif test_config.cloud:
client.set_client_setting('select_sequential_consistency', 1)
client.command(f'CREATE DATABASE IF NOT EXISTS {test_db}', use_database=False)
client.database = test_db
client.command(f'CREATE DATABASE IF NOT EXISTS {test_config.test_database}', use_database=False)
yield client

# client.command(f'DROP database IF EXISTS {test_db}', use_database=False)
Expand Down
8 changes: 3 additions & 5 deletions tests/integration_tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,10 @@ def test_arrow_map(test_client: Client, table_context: Callable):
'update_time DateTime DEFAULT now()']):
data = [[date(2023, 10, 15), 'C1', {'k': 2.5, 'd': 0, 'j': 0}],
[date(2023, 10, 16), 'C2', {'k': 3.5, 'd': 0, 'j': -.372}]]
insert_result = test_client.insert('test_arrow_map', data, column_names=('trade_date', 'code', 'kdj'))
assert 2 == insert_result.written_rows
test_client.insert('test_arrow_map', data, column_names=('trade_date', 'code', 'kdj'),
settings={'insert_deduplication_token': '10381'})
arrow_table = test_client.query_arrow('SELECT * FROM test_arrow_map ORDER BY trade_date',
use_strings=True)
print(arrow_table)
assert isinstance(arrow_table.schema, arrow.Schema)
insert_result = test_client.insert_arrow('test_arrow_map', arrow_table)
test_client.insert_arrow('test_arrow_map', arrow_table, settings={'insert_deduplication_token': '10382'})
assert 4 == test_client.command('SELECT count() FROM test_arrow_map')
assert 2 == insert_result.written_rows
5 changes: 3 additions & 2 deletions tests/integration_tests/test_inserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ def test_insert(test_client: Client, test_table_engine: str):
test_client.command('DROP TABLE IF EXISTS test_system_insert SYNC')
test_client.command(f'CREATE TABLE test_system_insert AS system.tables Engine {test_table_engine} ORDER BY name')
tables_result = test_client.query('SELECT * from system.tables')
insert_result = test_client.insert(table='test_system_insert', column_names='*', data=tables_result.result_set)
assert int(tables_result.summary['read_rows']) == insert_result.written_rows
test_client.insert(table='test_system_insert', column_names='*', data=tables_result.result_set)
copy_result = test_client.command('SELECT count() from test_system_insert')
assert tables_result.row_count == copy_result
test_client.command('DROP TABLE IF EXISTS test_system_insert')


Expand Down
4 changes: 2 additions & 2 deletions tests/integration_tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


def test_basic_json(test_client: Client, table_context: Callable):
if not test_client.min_version('24.8'):
if not test_client.get_client_setting('allow_experimental_json_type'):
pytest.skip(f'New JSON type not available in this version: {test_client.server_version}')
with table_context('new_json_basic', [
'key Int32',
Expand Down Expand Up @@ -48,7 +48,7 @@ def test_basic_json(test_client: Client, table_context: Callable):


def test_typed_json(test_client: Client, table_context: Callable):
if not test_client.min_version('24.8'):
if not test_client.get_client_setting('allow_experimental_json_type'):
pytest.skip(f'New JSON type not available in this version: {test_client.server_version}')
with table_context('new_json_typed', [
'key Int32',
Expand Down
7 changes: 6 additions & 1 deletion tests/integration_tests/test_multithreading.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import threading

import pytest

from clickhouse_connect.driver import Client
from clickhouse_connect.driver.exceptions import ProgrammingError
from tests.integration_tests.conftest import TestConfig


def test_threading_error(test_client: Client):
def test_threading_error(test_config: TestConfig, test_client: Client):
if test_config.cloud:
pytest.skip('Skipping threading test in ClickHouse Cloud')
thrown = None

class QueryThread (threading.Thread):
Expand Down
12 changes: 5 additions & 7 deletions tests/integration_tests/test_native.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,25 +167,23 @@ def test_tuple_inserts(test_client: Client, table_context: Callable):
with table_context('insert_tuple_test', ['key Int32', 'named Tuple(fl Float64, `ns space` Nullable(String))',
'unnamed Tuple(Float64, Nullable(String))']):
data = [[1, (3.55, 'str1'), (555, None)], [2, (-43.2, None), (0, 'str2')]]
result = test_client.insert('insert_tuple_test', data)
assert 2 == result.written_rows
test_client.insert('insert_tuple_test', data, settings={'insert_deduplication_token': 5772})

data = [[1, {'fl': 3.55, 'ns space': 'str1'}, (555, None)], [2, {'fl': -43.2}, (0, 'str2')]]
result = test_client.insert('insert_tuple_test', data)
assert 2 == result.written_rows

test_client.insert('insert_tuple_test', data, settings={'insert_deduplication_token': 5773})
query_result = test_client.query('SELECT * FROM insert_tuple_test ORDER BY key').result_rows
assert len(query_result) == 4
assert query_result[0] == query_result[1]
assert query_result[2] == query_result[3]


def test_point_inserts(test_client: Client, table_context: Callable):
with table_context('insert_point_test', ['key Int32', 'point Point']):
data = [[1, (3.55, 3.55)], [2, (4.55, 4.55)]]
result = test_client.insert('insert_point_test', data)
assert 2 == result.written_rows
test_client.insert('insert_point_test', data)

query_result = test_client.query('SELECT * FROM insert_point_test ORDER BY key').result_rows
assert len(query_result) == 2
assert query_result[0] == (1, (3.55, 3.55))
assert query_result[1] == (2, (4.55, 4.55))

Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from clickhouse_connect.driver.exceptions import DataError
from clickhouse_connect.driver.options import np, pd
from tests.helpers import random_query
from tests.integration_tests.conftest import TestConfig
from tests.integration_tests.datasets import null_ds, null_ds_columns, null_ds_types

pytestmark = pytest.mark.skipif(pd is None, reason='Pandas package not installed')
Expand Down Expand Up @@ -276,7 +277,9 @@ def test_pandas_null_strings(test_client: Client, table_context:Callable):
test_client.insert_df('test_pandas_null_strings', df)


def test_pandas_small_blocks(test_client: Client):
def test_pandas_small_blocks(test_config: TestConfig, test_client: Client):
if test_config.cloud:
pytest.skip('Skipping performance test in ClickHouse Cloud')
res = test_client.query_df('SELECT number, randomString(512) FROM numbers(1000000)',
settings={'max_block_size': 250})
assert len(res) == 1000000
3 changes: 1 addition & 2 deletions tests/integration_tests/test_raw_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ def test_raw_insert_compression(test_client: Client, table_context: Callable):
with open(data_file, mode='rb') as movies_file:
data = movies_file.read()
with table_context('test_gzip_movies', ['movie String', 'year UInt16', 'rating Decimal32(3)']):
insert_result = test_client.raw_insert('test_gzip_movies', None, data, fmt='CSV', compression='gzip',
test_client.raw_insert('test_gzip_movies', None, data, fmt='CSV', compression='gzip',
settings={'input_format_allow_errors_ratio': .2,
'input_format_allow_errors_num': 5}
)
assert 248 == insert_result.written_rows
res = test_client.query(
'SELECT count() as count, sum(rating) as rating, max(year) as year FROM test_gzip_movies').first_item
assert res['count'] == 248
Expand Down
34 changes: 25 additions & 9 deletions tests/integration_tests/test_session_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

import pytest

from typing import Callable

from clickhouse_connect.driver import create_async_client, create_client
from tests.integration_tests.conftest import TestConfig

SESSION_KEY = 'session_id'


def test_client_default_session_id(test_config: TestConfig):
def test_client_default_session_id(test_create_client: Callable):
# by default, the sync client will autogenerate the session id
client = create_client(host=test_config.host, port=test_config.port)
client = test_create_client()
session_id = client.get_client_setting(SESSION_KEY)
try:
uuid.UUID(session_id)
Expand All @@ -19,33 +21,42 @@ def test_client_default_session_id(test_config: TestConfig):
client.close()


def test_client_autogenerate_session_id(test_config: TestConfig):
client = create_client(host=test_config.host, port=test_config.port)
def test_client_autogenerate_session_id(test_create_client: Callable):
client = test_create_client()
session_id = client.get_client_setting(SESSION_KEY)
try:
uuid.UUID(session_id)
except ValueError:
pytest.fail(f"Invalid session_id: {session_id}")


def test_client_custom_session_id(test_config: TestConfig):
def test_client_custom_session_id(test_create_client: Callable):
session_id = 'custom_session_id'
client = create_client(host=test_config.host, port=test_config.port, session_id=session_id)
client = test_create_client(session_id=session_id)
assert client.get_client_setting(SESSION_KEY) == session_id
client.close()


@pytest.mark.asyncio
async def test_async_client_default_session_id(test_config: TestConfig):
# by default, the async client will NOT autogenerate the session id
async_client = await create_async_client(host=test_config.host, port=test_config.port)
async_client = await create_async_client(database=test_config.test_database,
host=test_config.host,
port=test_config.port,
user=test_config.username,
password=test_config.password)
assert async_client.get_client_setting(SESSION_KEY) is None
async_client.close()


@pytest.mark.asyncio
async def test_async_client_autogenerate_session_id(test_config: TestConfig):
async_client = await create_async_client(host=test_config.host, port=test_config.port, autogenerate_session_id=True)
async_client = await create_async_client(database=test_config.test_database,
host=test_config.host,
port=test_config.port,
user=test_config.username,
password=test_config.password,
autogenerate_session_id=True)
session_id = async_client.get_client_setting(SESSION_KEY)
try:
uuid.UUID(session_id)
Expand All @@ -57,6 +68,11 @@ async def test_async_client_autogenerate_session_id(test_config: TestConfig):
@pytest.mark.asyncio
async def test_async_client_custom_session_id(test_config: TestConfig):
session_id = 'custom_session_id'
async_client = await create_async_client(host=test_config.host, port=test_config.port, session_id=session_id)
async_client = await create_async_client(database=test_config.test_database,
host=test_config.host,
port=test_config.port,
user=test_config.username,
password=test_config.password,
session_id=session_id)
assert async_client.get_client_setting(SESSION_KEY) == session_id
async_client.close()
2 changes: 0 additions & 2 deletions tests/integration_tests/test_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ def test_csv_upload(test_client: Client, table_context: Callable):
insert_result = insert_file(test_client, 'test_csv_upload', data_file,
settings={'input_format_allow_errors_ratio': .2,
'input_format_allow_errors_num': 5})
assert 248 == insert_result.written_rows
res = test_client.query(
'SELECT count() as count, sum(rating) as rating, max(year) as year FROM test_csv_upload').first_item
assert res['count'] == 248
Expand All @@ -25,7 +24,6 @@ def test_parquet_upload(test_config: TestConfig, test_client: Client, table_cont
with table_context(full_table, ['movie String', 'year UInt16', 'rating Float64']):
insert_result = insert_file(test_client, full_table, data_file, 'Parquet',
settings={'output_format_parquet_string_as_string': 1})
assert 250 == insert_result.written_rows
res = test_client.query(
f'SELECT count() as count, sum(rating) as rating, max(year) as year FROM {full_table}').first_item
assert res['count'] == 250
Expand Down

0 comments on commit 0475afd

Please sign in to comment.