Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New Destination: ClickHouse #7620

Merged
merged 20 commits into from
Dec 13, 2021

Conversation

burmecia
Copy link
Contributor

@burmecia burmecia commented Nov 4, 2021

What

How

Recommended reading order

  1. bootstrap.md
  2. spec.json
  3. ClickhouseDestination.java
  4. ClickhouseSqlOperations.java

Test Runs

Integration test

Note: Some of the normalization and dbt test cases are disabled because,

  • old version of dbt generate incompatible test SQL for ClickHouse, accepted_values test doesn't work in all database engines dbt-labs/dbt-core#3905
  • the normalization container needs native port, while destination container needs HTTP port, we can't inject the port switch statement into DestinationAcceptanceTest.runSync() method. But I think that should be fine because there are more test cases in the following normalization test are covered.

Command used:
airbyte$ ./gradlew :airbyte-integrations:connectors:destination-clickhouse:integrationTest

image

Normalization test

Command used:
airbyte/airbyte-integrations/bases/base-normalization$ NORMALIZATION_TEST_TARGET=clickhouse pytest ./integration_tests/test_normalization.py

clickhouse-test

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/SUMMARY.md
    • docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • docs/integrations/README.md
    • airbyte-integrations/builds.md
  • PR name follows PR naming conventions
  • Connector added to connector index like described here

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • Credentials added to Github CI. Instructions.
  • /test connector=connectors/<name> command is passing.
  • New Connector version released on Dockerhub by running the /publish command described here

Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions
  • Connector version bumped like described here

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • Credentials added to Github CI. Instructions.
  • /test connector=connectors/<name> command is passing.
  • New Connector version released on Dockerhub by running the /publish command described here

Connector Generator

  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed.

@github-actions github-actions bot added area/connectors Connector related issues area/documentation Improvements or additions to documentation area/platform issues related to the platform area/worker Related to worker normalization labels Nov 4, 2021
@marcosmarxm
Copy link
Member

This is amazing @burmecia !!! I'll ask the team to review this contribution in mid-time can you run ./gradlew format?

@alexandr-shegeda alexandr-shegeda linked an issue Nov 4, 2021 that may be closed by this pull request
@burmecia
Copy link
Contributor Author

burmecia commented Nov 4, 2021

This is amazing @burmecia !!! I'll ask the team to review this contribution in mid-time can you run ./gradlew format?

Sorry I forgot it, it is done now.

Copy link
Contributor

@ChristopheDuong ChristopheDuong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution!!

Few suggestions to keep the code cleaner

@@ -27,6 +27,7 @@
.put("airbyte/destination-postgres-strict-encrypt", ImmutablePair.of(BASE_NORMALIZATION_IMAGE_NAME, DestinationType.POSTGRES))
.put("airbyte/destination-redshift", ImmutablePair.of(BASE_NORMALIZATION_IMAGE_NAME, DestinationType.REDSHIFT))
.put("airbyte/destination-snowflake", ImmutablePair.of(BASE_NORMALIZATION_IMAGE_NAME, DestinationType.SNOWFLAKE))
.put("airbyte/destination-clickhouse", ImmutablePair.of(BASE_NORMALIZATION_IMAGE_NAME, DestinationType.CLICKHOUSE))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.put("airbyte/destination-clickhouse", ImmutablePair.of(BASE_NORMALIZATION_IMAGE_NAME, DestinationType.CLICKHOUSE))
.put("airbyte/destination-clickhouse", ImmutablePair.of("airbyte/normalization-clickhouse", DestinationType.CLICKHOUSE))

which means this PR is also missing some changes in: https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/bases/base-normalization/docker-compose.build.yaml to publish the normalization docker image for clickhouse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, done.

Comment on lines 676 to 678
select *,
{{ case_begin }} {{ airbyte_end_at }} is null {{ cdc_active_row }} {{ case_then }} 1 {{ case_else }} 0 {{ case_end }} as {{ active_row }}
from (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of doing nested select (Subquery), it would be best to keep using the CTE syntax (already heavily used in this step)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the suggestion, I've put it in a new CTE

Comment on lines 749 to 763
lag_begin = "lag"
lag_end = ""
if self.destination_type == DestinationType.CLICKHOUSE:
# ClickHouse doesn't support lag() yet, this is a workaround solution
# Ref: https://clickhouse.com/docs/en/sql-reference/window-functions/
lag_begin = "anyOrNull"
lag_end = "ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING"

case_begin = "case when"
case_then = "then"
case_else = "else"
case_end = "end"
if self.destination_type == DestinationType.CLICKHOUSE:
# ClickHouse doesn't have CASE WHEN, use multiIf instead
# Ref: https://clickhouse.com/docs/en/sql-reference/functions/conditional-functions/#multiif
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a lot of "hacks" because clickhouse doesn't support some SQL syntax...

Maybe it'd be worth breaking the whole generate_scd_type_2_model method into a special clickhouse__generate_scd_type_2_model instead? (with its own jinja template)

It'll make it easier to refactor normalization code when splitting the code base per destination type and break this part of the code into its own class probably where the function is overriden

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but put it in a separate clickhouse__generate_scd_type_2_model function will create a large code duplication because most of them are still in common. There are actually only 4 ClickHouse specific parts:

  1. lag() function
  2. case when
  3. cast
  4. left outer join null

All the other parts are same as other database. Maybe need some code refactoring on generate_scd_type_2_model in the future.

db.close();
}

// @Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not clear, why do we have this line commented? Whether add a comment or uncomment it, please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry forgot removing it, done.

Comment on lines 45 to 46
if (config.has("password")) {
configBuilder.put("password", config.get("password").asText());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like "password" good candidate for constant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


public class ClickhouseSqlOperations extends JdbcSqlOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseSqlOperations.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like LOGGER is unused in the class. So please remove or add logging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will use it for logging info.

Comment on lines 83 to 89
} finally {
try {
if (tmpFile != null) {
Files.delete(tmpFile.toPath());
}
} catch (final IOException e) {
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Optional] Throwing an exception from within a finally block will mask any exception which was previously thrown in the try or catch block, and the masked’s exception message and stack trace will be lost.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I copied that code from Postgres connector:

If you have better resolution, I'd happy to change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this: (I'm implementing similar pattern for Destination MariaDB ColumnStore now)

Exception primaryException = null;
try {
  // your code
} catch (final Exception e) {
  primaryException = e;
  throw new RuntimeException(primaryException);
} finally {
  try {
    // your code
  } catch (final IOException e) {
    if (primaryException != null) e.addSuppressed(primaryException);
    throw new RuntimeException(e);
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @koji-m, that's great.


private static final String DB_NAME = "default";

private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseDestinationAcceptanceTest.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use LOGGER here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

private static final String DB_NAME = "default";

private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseDestinationAcceptanceTest.class);
private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use JSON_FORMAT in the class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();

private ClickHouseContainer db;
private Network network;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like not used field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

network removed.

Comment on lines 39 to 40
private ProcessFactory processFactory;
private TestDestinationEnv testEnv;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like both of these fields are not used in the class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.


@Override
protected JsonNode getFailCheckConfig() {
String ipAddress = db.getContainerInfo().getNetworkSettings().getIpAddress();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What purpose of this variable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a leftover when I was struggling with container's connectivity doing check test, it can be removed. See testcontainers/testcontainers-java#452.

@alexandertsukanov
Copy link
Contributor

Hi, @burmecia! I had run the next command:

./gradlew clean :airbyte-integrations:connectors:destination-clickhouse:integrationTest

Despite your screenshot above, I don't have success to passing all Java integration tests:

image

What OS are you using?
Thanks.

Copy link
Contributor

@alexandertsukanov alexandertsukanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for PR left some comments.


public class ClickhouseSqlOperations extends JdbcSqlOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseSqlOperations.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOGGER is never used
It can be used to log exception on try/catch block or for record size logging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

final String schemaName,
final String tmpTableName)
throws SQLException {
if (records.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could log record size here:
LOGGER.info("actual size of batch: {}", records.size());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


private static final String DB_NAME = "default";

private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseDestinationAcceptanceTest.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOGGER and JSON_FORMAT are not used, pls remove them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();

private ClickHouseContainer db;
private Network network;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

network, processFactory, testEnv are not used, pls remove them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

db.close();
}

// @Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls remove commented // @test and check the test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@mkhokh-33
Copy link
Contributor

mkhokh-33 commented Nov 5, 2021

Thanks for PR, added some comments

@burmecia
Copy link
Contributor Author

burmecia commented Nov 6, 2021

Hi, @burmecia! I had run the next command:

./gradlew clean :airbyte-integrations:connectors:destination-clickhouse:integrationTest

Despite your screenshot above, I don't have success to passing all Java integration tests:

image

What OS are you using? Thanks.

I am on Mac, but I use Ubuntu VM to run the test because docker's host network is not working well on Mac, see my discussion with them testcontainers/testcontainers-java#452. And also remember to build the destination image before run the test.
Screen Shot 2021-11-06 at 12 57 55 pm

@burmecia
Copy link
Contributor Author

I've disabled dbt support and also fixed the reserved keywords bug for #7786, can you try run it again?

@alexandertsukanov
Copy link
Contributor

Hi, @burmecia ! Looks like some tests still fail. Please, see #7786

@burmecia
Copy link
Contributor Author

I've disabled dbt in expect test result, can you try it again? I've tried below test command and it is working on my laptop.

./gradlew :airbyte-integrations:connectors:destination-clickhouse-strict-encrypt:integrationTest

image

@alexandertsukanov
Copy link
Contributor

alexandertsukanov commented Nov 17, 2021

@burmecia
Looks like the build is green for destination-clickhouse-strict-encrypt #7786
Only normalization tests left.

@ChristopheDuong
Copy link
Contributor

If it makes it easier, it might be a good idea to separate this PR into two instead?

  • destination one
  • normalization one

@alexandertsukanov
Copy link
Contributor

If it makes it easier, it might be a good idea to separate this PR into two instead?

  • destination one
  • normalization one

@ChristopheDuong I believe this won't make any difference as the main purpose of PR to verify all tests are passed on CI (as we can't to run /test, /publish on forked repos). For me is more easier to have only one branch as I can to fetch and merge contributors commits to it.

@burmecia I see normalization fails here:

> Task :airbyte-integrations:bases:base-normalization:mypyCheck FAILED
	 normalization/transform_catalog/reserved_keywords.py:2536:1: error: Need type
	 annotation for 'CLICKHOUSE' (hint: "CLICKHOUSE: Dict[<type>, <type>] = ...") 
	 [var-annotated]
	     CLICKHOUSE = ***
	     ^

Do we need this variable?

# In ClickHouse, keywords are not reserved.
# Ref: https://clickhouse.com/docs/en/sql-reference/syntax/#syntax-keywords
CLICKHOUSE = set()

@alexandertsukanov
Copy link
Contributor

Hi, @burmecia any response here? Looks like some conflicts in this PR.
CC: @marcosmarxm, @sherifnada

@burmecia
Copy link
Contributor Author

alright, I added missing type hints, can you try it again?

@alexandertsukanov
Copy link
Contributor

@burmecia could you fix conflicts, please? 🙂 After, I will run tests.

@alexandertsukanov
Copy link
Contributor

alexandertsukanov commented Nov 29, 2021

Looks like destination tests are all in green. But normalization fails https://github.com/airbytehq/airbyte/runs/4352870241?check_suite_focus=true#step:8:13303. Looks like this tests not related to @burmecia changes and normalization tests fails in master as well. Take to the account this fact there are not more comments from my side and approve from my side. Thanks for contribution.
CC: @ChristopheDuong , @sherifnada , @mkhokh-33

@ChristopheDuong
Copy link
Contributor

normalization tests fails in master as well.

Tests should be passing fine on master (maybe there seems to have some errors with credentials when you ran it)

@marcosmarxm marcosmarxm temporarily deployed to more-secrets December 13, 2021 22:23 Inactive
Copy link
Member

@marcosmarxm marcosmarxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome work @burmecia sorry the long delay to merge your contribution.

@marcosmarxm marcosmarxm merged commit bbcd461 into airbytehq:master Dec 13, 2021
schlattk pushed a commit to schlattk/airbyte that referenced this pull request Jan 4, 2022
* add ClickHouse destination

* update docs

* format code

* code improvement as per code review

* add ssh tunneling and ssl/tls support and code enhancement

* merge from master

* disable testCustomDbtTransformationsFailure test

* fix string format bug

* fix reserved keywords bug and disable dbt

* disable dbt in expect result

* add type hints

* bump connector version

Co-authored-by: Alexander Tsukanov <alexander.tsukanovvv@gmail.com>
Co-authored-by: Marcos Marx <marcosmarxm@gmail.com>
dbt_config = {
"type": "clickhouse",
"host": config["host"],
"port": config["port"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be changed to hardcode 9000 instead?

see this thread in slack: https://airbytehq.slack.com/archives/C01MFR03D5W/p1641461068011100

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

New Destination: ClickHouse
9 participants