From ac46a73e611e2df267abf590d24b8f9ab2b928a6 Mon Sep 17 00:00:00 2001 From: Annie Liang <64233642+xinlian12@users.noreply.github.com> Date: Thu, 31 Oct 2024 18:37:12 -0700 Subject: [PATCH] resolveFewTODOCommentsInKafka (#42715) * resolve TODO comments --------- Co-authored-by: annie-mac --- .../dev/Confluent_Cloud_Setup.md | 2 +- .../dev/README_Sink.md | 50 +++++---- .../dev/README_Source.md | 54 ++++----- .../azure-cosmos-kafka-connect/dev/setup.md | 12 +- sdk/cosmos/azure-cosmos-kafka-connect/pom.xml | 53 ++++++++- .../src/docker/resources/sink.example.json | 2 +- .../kafka/connect/CosmosSinkConnector.java | 102 ++++++++++++++++- .../kafka/connect/CosmosSourceConnector.java | 103 ++++++++++++++++++ .../implementation/CosmosClientStore.java | 20 +++- .../implementation/KafkaCosmosUtils.java | 54 +++++++++ .../implementation/sink/CosmosSinkTask.java | 9 +- .../sink/CosmosSinkTaskConfig.java | 35 +++++- .../sink/ItemWriteStrategy.java | 2 +- .../implementation/sink/StructToJsonMap.java | 1 - .../source/CosmosSourceTask.java | 16 +-- .../source/CosmosSourceTaskConfig.java | 40 +++++++ .../implementation/source/JsonToStruct.java | 1 - .../source/MetadataMonitorThread.java | 9 +- .../connect/CosmosSinkConnectorTest.java | 101 ++++++++++++++++- .../connect/CosmosSourceConnectorTest.java | 84 ++++++++++++++ 20 files changed, 666 insertions(+), 84 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosUtils.java diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/Confluent_Cloud_Setup.md b/sdk/cosmos/azure-cosmos-kafka-connect/dev/Confluent_Cloud_Setup.md index c1a2a94c3e0e9..c4ea18c4e9077 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/dev/Confluent_Cloud_Setup.md +++ b/sdk/cosmos/azure-cosmos-kafka-connect/dev/Confluent_Cloud_Setup.md @@ -9,7 +9,7 @@ This guide walks through setting up Confluent Cloud using Docker containers. - Java 11+ ([download](https://www.oracle.com/java/technologies/javase-jdk11-downloads.html)) - Maven ([download](https://maven.apache.org/download.cgi)) - Docker ([download](https://www.docker.com/products/docker-desktop)) -- CosmosDB [Setting up an Azure Cosmos DB Instance] +- CosmosDB [Setting up an Azure Cosmos DB Instance](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/CosmosDB_Setup.md) ## Setup diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Sink.md b/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Sink.md index afb548e716f80..c22c638f3b64e 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Sink.md +++ b/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Sink.md @@ -16,9 +16,9 @@ The connector polls data from Kafka to write to container(s) in the database bas ### Prerequisites -- It is recommended to start with the Confluent Platform (recommended to use this [setup] as this gives you a complete environment to work with. +- It is recommended to start with the Confluent Platform (recommended to use this [setup](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/Confluent_Platform_Setup.md) as this gives you a complete environment to work with. - If you do not wish to use Confluent Platform, then you need to install and configure Zookeper, Apache Kafka, Kafka Connect, yourself. You will also need to install and configure the Cosmos DB connectors manually. -- Cosmos DB Instance ([setup guide] +- Cosmos DB Instance ([setup guide](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/CosmosDB_Setup.md) - Bash shell - Will not work in Cloud Shell or WSL1 - Java 11+ ([download](https://www.oracle.com/java/technologies/javase-jdk11-downloads.html)) @@ -33,6 +33,10 @@ Otherwise, you can download the JAR file from the latest [Release](https://mvnre You can also package a new JAR file from the source code. ```bash +# clone the azure-cosmos repo if you haven't done so already +git clone https://github.com/Azure/azure-sdk-for-java.git +cd sdk/cosmos + mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos,azure-cosmos-tests -am clean install mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-kafka-connect clean install @@ -87,28 +91,28 @@ Create the Cosmos DB Sink Connector in Kafka Connect The following JSON body defines the config for the Cosmos DB Sink Connector. -> Note: You will need to fill out the values for `azure.cosmos.account.endpoint` and `azure.cosmos.account.key`, which you should have saved from the [Cosmos DB setup guide]. +> Note: You will need to fill out the values for `azure.cosmos.account.endpoint` and `azure.cosmos.account.key`, which you should have saved from the [Cosmos DB setup guide](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/CosmosDB_Setup.md). Refer to the [sink properties](#sink-configuration-properties) section for more information on each of these configuration properties. ```json { - "name": "cosmos-kafka-connectv2", - "config": { - "connector.class": "com.azure.cosmos.kafka.connect.CosmosSinkConnector", - "tasks.max": "5", - "topics": "GreenTaxiRecords", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter.schemas.enable": "false", - "azure.cosmos.account.endpoint":"", - "azure.cosmos.account.key":"", - "azure.cosmos.applicationName": "", - "azure.cosmos.sink.database.name":"SampleDatabase", - "azure.cosmos.sink.containers.topicMap":"SampleTestContainer#SampleTestContainerSink" - } + "name": "cosmosdb-sink-connector-v2", + "config": { + "connector.class": "com.azure.cosmos.kafka.connect.CosmosSinkConnector", + "tasks.max": "5", + "topics": "{topic}", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "azure.cosmos.account.endpoint":"{endpoint}", + "azure.cosmos.account.key":"{masterKey}", + "azure.cosmos.applicationName": "{applicationName}", + "azure.cosmos.sink.database.name":"{databaseName}", + "azure.cosmos.sink.containers.topicMap":"{topic}#{container}" + } } ``` @@ -157,14 +161,14 @@ Alternatively, use the Connect REST API. curl -X DELETE http://localhost:8083/connectors/cosmosdb-sink-connector ``` -To delete the created Azure Cosmos DB service and its resource group using Azure CLI, refer to these [steps]. +To delete the created Azure Cosmos DB service and its resource group using Azure CLI, refer to these [steps](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/CosmosDB_Setup.md#cleanup). ## Sink configuration properties -The following settings are used to configure the Cosmos DB Kafka Sink Connector. These configuration values determine which Kafka topics data is consumed, which Cosmos DB containers data is written into and formats to serialize the data. For an example configuration file with the default values, refer to [this config]. +The following settings are used to configure the Cosmos DB Kafka Sink Connector. These configuration values determine which Kafka topics data is consumed, which Cosmos DB containers data is written into and formats to serialize the data. For an example configuration file with the default values, refer to [this config](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/resources/sink.example.json). -- [Generic Configs For Sink And Source] -- [Configs only for Sink] +- [Generic Configs For Sink And Source](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md#generic-configurations) +- [Configs only for Sink](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md#sink-connector-configurations) Data will always be written to the Cosmos DB as JSON without any schema. @@ -245,7 +249,7 @@ Refer to the [InsertUUID repository](https://github.com/confluentinc/kafka-conne Using both the `InsertField` and `Cast` SMTs, you can add specify the TTL on each item created in Cosmos DB. -> Note: You will need to enable TTL on the Cosmos DB container to enable TTL at an item level. Refer to the [Cosmos DB setup guide] or the [Cosmos DB docs](https://learn.microsoft.com/azure/cosmos-db/nosql/how-to-time-to-live?tabs=dotnet-sdk-v3#enable-time-to-live-on-a-container-using-azure-portal) for more information on setting the TTL. +> Note: You will need to enable TTL on the Cosmos DB container to enable TTL at an item level. Refer to the [Cosmos DB setup guide](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/CosmosDB_Setup.md) or the [Cosmos DB docs](https://learn.microsoft.com/azure/cosmos-db/nosql/how-to-time-to-live?tabs=dotnet-sdk-v3#enable-time-to-live-on-a-container-using-azure-portal) for more information on setting the TTL. Inside your Sink connector config, add the following properties to set the TTL (in seconds). In this following example, the TTL is set to 100 seconds. diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Source.md b/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Source.md index 5a090e801627a..f4cffd72daf8f 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Source.md +++ b/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Source.md @@ -14,7 +14,7 @@ The Azure Cosmos DB Source connector provides the capability to read data from t - It is recommended to start with the Confluent Platform (recommended to use this setup) as this gives you a complete environment to work with. - If you do not wish to use Confluent Platform, then you need to install and configure Zookeper, Apache Kafka, Kafka Connect, yourself. You will also need to install and configure the Cosmos DB connectors manually. -- Cosmos DB Instance ([setup guide]) +- Cosmos DB Instance ([setup guide](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/CosmosDB_Setup.md) - Bash shell - Will not work in Cloud Shell or WSL1 - Java 11+ ([download](https://www.oracle.com/java/technologies/javase-jdk11-downloads.html)) @@ -29,6 +29,10 @@ Otherwise, you can use JAR file from latest [Release](https://mvnrepository.com/ You can also package a new JAR file from the source code. ```bash +# clone the azure-cosmos repo if you haven't done so already +git clone https://github.com/Azure/azure-sdk-for-java.git +cd sdk/cosmos + mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos,azure-cosmos-tests -am clean install mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-kafka-connect clean install @@ -49,7 +53,7 @@ To create a topic inside Control Center, see [here](https://docs.confluent.io/pl The following JSON body defines the config for the Cosmos DB Source Connector. ->Note: You will need to replace placeholder values for below properties which you should have saved from the [Cosmos DB setup guide]. +>Note: You will need to replace placeholder values for below properties which you should have saved from the [Cosmos DB setup guide](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/CosmosDB_Setup.md). >`azure.cosmos.account.endpoint` >`azure.cosmos.account.key` @@ -57,24 +61,24 @@ Refer to the [source properties](#source-configuration-properties) section for m ```json { - "name": "cosmosdb-source-connectorv2", - "config": { - "connector.class": "com.azure.cosmos.kafka.connect.CosmosSourceConnector", - "tasks.max": "5", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter.schemas.enable": "false", - "azure.cosmos.account.endpoint":"{endpoint}", - "azure.cosmos.account.key":"{masterKey}", - "azure.cosmos.application.name": "{applicationName}", - "azure.cosmos.source.database.name":"{database}", - "azure.cosmos.source.containers.includedList":"{container}", - "azure.cosmos.source.changeFeed.maxItemCountHint":"500", - "azure.cosmos.source.containers.topicMap":"{topic}#{container}", - "azure.cosmos.source.metadata.storage.type":"Cosmos", - "azure.cosmos.source.metadata.storage.name":"{metadataContainerName}" - } + "name": "cosmosdb-source-connector-v2", + "config": { + "connector.class": "com.azure.cosmos.kafka.connect.CosmosSourceConnector", + "tasks.max": "5", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "azure.cosmos.account.endpoint":"{endpoint}", + "azure.cosmos.account.key":"{masterKey}", + "azure.cosmos.application.name": "{applicationName}", + "azure.cosmos.source.database.name":"{database}", + "azure.cosmos.source.containers.includedList":"{container}", + "azure.cosmos.source.changeFeed.maxItemCountHint":"500", + "azure.cosmos.source.containers.topicMap":"{topic}#{container}", + "azure.cosmos.source.metadata.storage.type":"Cosmos", + "azure.cosmos.source.metadata.storage.name":"{metadataContainerName}" + } } ``` @@ -104,7 +108,7 @@ curl -H "Content-Type: application/json" -X POST -d @ ### Insert document in to Cosmos DB -Use [Cosmos DB setup guide] to create and set up Cosmos DB Instance. +Use [Cosmos DB setup guide](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/CosmosDB_Setup.md) to create and set up Cosmos DB Instance. - Sign into the [Azure portal](https://portal.azure.com/learn.docs.microsoft.com) using the account you activated. - On the Azure portal menu (left hand side blue lines at the top), select All services. @@ -157,11 +161,11 @@ curl -X DELETE http://localhost:8083/connectors/cosmosdb-source-connector ``` -To delete the created Azure Cosmos DB service and its resource group using Azure CLI, refer to these [steps]. +To delete the created Azure Cosmos DB service and its resource group using Azure CLI, refer to these [steps](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/CosmosDB_Setup.md#cleanup). ## Source configuration properties -The following settings are used to configure the Cosmos DB Kafka Source Connector. These configuration values determine which Cosmos DB container is consumed, which Kafka topics data is written into and formats to serialize the data. For an example configuration file with the default values, refer to [this config]. +The following settings are used to configure the Cosmos DB Kafka Source Connector. These configuration values determine which Cosmos DB container is consumed, which Kafka topics data is written into and formats to serialize the data. For an example configuration file with the default values, refer to [this config](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/resources/source.example.json). -- [Generic Configs For Sink And Source] -- [Configs only for Source] +- [Generic Configs For Sink And Source](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md#generic-configurations) +- [Configs only for Source](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md#source-connector-configurations) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/setup.md b/sdk/cosmos/azure-cosmos-kafka-connect/dev/setup.md index 6ad7ce91d2a4f..7ec4890772054 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/dev/setup.md +++ b/sdk/cosmos/azure-cosmos-kafka-connect/dev/setup.md @@ -50,7 +50,7 @@ CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3 CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3 CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3 ``` -Please follow [Confluent_Cloud_Setup] to setup a confluent cloud cluster. +Please follow [Confluent_Cloud_Setup](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/Confluent_Cloud_Setup.md) to setup a confluent cloud cluster. Integration tests are having ITest suffix. Use following command to run integration tests: ```bash mvn -e -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-kafka-connect test package -Pkafka-integration @@ -58,8 +58,8 @@ mvn -e -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.s ### Configure Confluent Platform, Cosmos DB and validate Kafka Connectors -- [Confluent Platform Setup] -- [Confluent Cloud Setup] -- [Setting up an Azure Cosmos DB Instance] -- [Kafka Connect Cosmos DB Sink Connector] -- [Kafka Connect Cosmos DB Source Connector] +- [Confluent Platform Setup](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/Confluent_Platform_Setup.md) +- [Confluent Cloud Setup](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/Confluent_Cloud_Setup.md) +- [Setting up an Azure Cosmos DB Instance](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/CosmosDB_Setup.md) +- [Kafka Connect Cosmos DB Sink Connector](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Sink.md) +- [Kafka Connect Cosmos DB Source Connector](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Source.md) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml b/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml index 8b3ef78fc5831..07647e076e1a8 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml +++ b/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml @@ -45,6 +45,7 @@ Licensed under the MIT License. UTF-8 true azure_cosmos_kafka_connect + azurecosmoskafkaconnect @@ -55,6 +56,7 @@ Licensed under the MIT License. --add-opens com.azure.cosmos/com.azure.cosmos.implementation=ALL-UNNAMED --add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED --add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=ALL-UNNAMED + --add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=com.azure.cosmos.kafka.connect --add-opens com.azure.cosmos/com.azure.cosmos.implementation.faultinjection=ALL-UNNAMED --add-opens com.azure.cosmos/com.azure.cosmos.implementation.guava25.base=ALL-UNNAMED --add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED @@ -293,6 +295,7 @@ Licensed under the MIT License. io.confluent:kafka-connect-maven-plugin:[0.12.0] com.jayway.jsonpath:json-path:[2.9.0] org.sourcelab:kafka-connect-client:[4.0.4] + org.apache.maven.plugins:maven-antrun-plugin:[3.1.0] @@ -356,7 +359,7 @@ Licensed under the MIT License. io.netty - ${shadingPrefix}.io.netty + ${shadingPrefixNetty}.io.netty org.codehaus @@ -451,6 +454,54 @@ Licensed under the MIT License. + + org.apache.maven.plugins + maven-antrun-plugin + 3.1.0 + + + 01-copy-readme-to-javadocTemp + prepare-package + + + Deleting existing ${project.basedir}/javadocTemp + + + + Copying ${project.basedir}/README.md to + ${project.basedir}/javadocTemp/README.md + + + + + + run + + + + 03-repack + package + + run + + + + + + + + + + + + + + + + + + + diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/resources/sink.example.json b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/resources/sink.example.json index 782d9fc45f8cd..6640b1e24a504 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/resources/sink.example.json +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/resources/sink.example.json @@ -3,7 +3,7 @@ "config": { "connector.class": "com.azure.cosmos.kafka.connect.CosmosSinkConnector", "tasks.max": "5", - "topics": "GreenTaxiRecords", + "topics": "{topic}", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java index 406993fee3f37..0ee10a07d754c 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java @@ -4,13 +4,21 @@ package com.azure.cosmos.kafka.connect; import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosAsyncDatabase; import com.azure.cosmos.implementation.apachecommons.lang.RandomUtils; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore; +import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlConfig; import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosUtils; import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig; import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkContainersConfig; import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask; import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTaskConfig; +import com.azure.cosmos.models.PartitionKey; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigValue; @@ -18,10 +26,12 @@ import org.apache.kafka.connect.sink.SinkConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; @@ -39,6 +49,7 @@ public final class CosmosSinkConnector extends SinkConnector { private CosmosSinkConfig sinkConfig; private String connectorName; + private CosmosAsyncClient cosmosClient; @Override public void start(Map props) { @@ -46,12 +57,12 @@ public void start(Map props) { this.sinkConfig = new CosmosSinkConfig(props); this.connectorName = props.containsKey(CONNECTOR_NAME) ? props.get(CONNECTOR_NAME).toString() : "EMPTY"; CosmosSinkContainersConfig containersConfig = this.sinkConfig.getContainersConfig(); - CosmosAsyncClient cosmosAsyncClient = CosmosClientStore.getCosmosClient(this.sinkConfig.getAccountConfig(), this.connectorName); + this.cosmosClient = + CosmosClientStore.getCosmosClient(this.sinkConfig.getAccountConfig(), this.connectorName); validateDatabaseAndContainers( new ArrayList<>(containersConfig.getTopicToContainerMap().values()), - cosmosAsyncClient, + this.cosmosClient, containersConfig.getDatabaseName()); - cosmosAsyncClient.close(); } @Override @@ -63,6 +74,10 @@ public Class taskClass() { public List> taskConfigs(int maxTasks) { LOGGER.info("Setting task configurations with maxTasks {}", maxTasks); List> configs = new ArrayList<>(); + + String clientMetadataCachesString = getClientMetadataCachesSnapshotString(); + String throughputControlClientMetadataCachesString = getThroughputControlClientMetadataCachesSnapshotString(); + for (int i = 0; i < maxTasks; i++) { Map taskConfigs = this.sinkConfig.originalsStrings(); taskConfigs.put(CosmosSinkTaskConfig.SINK_TASK_ID, @@ -70,14 +85,95 @@ public List> taskConfigs(int maxTasks) { "sink", this.connectorName, RandomUtils.nextInt(1, 9999999))); + if (StringUtils.isNotEmpty(clientMetadataCachesString)) { + taskConfigs.put( + CosmosSinkTaskConfig.COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT, + clientMetadataCachesString); + } + + if (StringUtils.isNotEmpty(throughputControlClientMetadataCachesString)) { + taskConfigs.put( + CosmosSinkTaskConfig.THROUGHPUT_CONTROL_COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT, + throughputControlClientMetadataCachesString); + } configs.add(taskConfigs); } return configs; } + private String getClientMetadataCachesSnapshotString() { + CosmosSinkContainersConfig containersConfig = this.sinkConfig.getContainersConfig(); + List containerNames = new ArrayList<>(containersConfig.getTopicToContainerMap().values()); + CosmosAsyncDatabase database = this.cosmosClient.getDatabase(containersConfig.getDatabaseName()); + + // read a random item from each container to populate the collection cache + for (String containerName : containerNames) { + CosmosAsyncContainer container = database.getContainer(containerName); + readRandomItemFromContainer(container); + } + + // read a random item from throughput control container if it is enabled and use the same account config as the cosmos client + CosmosThroughputControlConfig cosmosThroughputControlConfig = this.sinkConfig.getThroughputControlConfig(); + if (cosmosThroughputControlConfig.isThroughputControlEnabled()) { + if (cosmosThroughputControlConfig.getThroughputControlAccountConfig() == null) { + CosmosAsyncContainer throughputControlContainer = + this.cosmosClient + .getDatabase(cosmosThroughputControlConfig.getGlobalThroughputControlDatabaseName()) + .getContainer(cosmosThroughputControlConfig.getGlobalThroughputControlContainerName()); + readRandomItemFromContainer(throughputControlContainer); + } + } + + return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(this.cosmosClient); + } + + private String getThroughputControlClientMetadataCachesSnapshotString() { + CosmosAsyncClient throughputControlClient = null; + CosmosThroughputControlConfig throughputControlConfig = this.sinkConfig.getThroughputControlConfig(); + + try { + if (throughputControlConfig.isThroughputControlEnabled() + && throughputControlConfig.getThroughputControlAccountConfig() != null) { + throughputControlClient = CosmosClientStore.getCosmosClient( + throughputControlConfig.getThroughputControlAccountConfig(), + this.connectorName + ); + } + + if (throughputControlClient != null) { + readRandomItemFromContainer( + throughputControlClient + .getDatabase(throughputControlConfig.getGlobalThroughputControlDatabaseName()) + .getContainer(throughputControlConfig.getGlobalThroughputControlContainerName())); + } + return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(throughputControlClient); + + } finally { + if (throughputControlClient != null) { + throughputControlClient.close(); + } + } + } + + private void readRandomItemFromContainer(CosmosAsyncContainer container) { + if (container != null) { + container.readItem(UUID.randomUUID().toString(), new PartitionKey(UUID.randomUUID().toString()), JsonNode.class) + .onErrorResume(throwable -> { + if (!KafkaCosmosExceptionsHelper.isNotFoundException(throwable)) { + LOGGER.warn("Failed to read item from container {}", container.getId(), throwable); + } + return Mono.empty(); + }) + .block(); + } + } + @Override public void stop() { + if (this.cosmosClient != null) { + this.cosmosClient.close(); + } LOGGER.info("Kafka Cosmos sink connector {} is stopped."); } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java index 47f4252b1d2ad..84eb8c91b8278 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java @@ -5,13 +5,17 @@ import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosAsyncDatabase; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.apachecommons.lang.RandomUtils; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair; import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore; import com.azure.cosmos.kafka.connect.implementation.CosmosMasterKeyAuthConfig; +import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlConfig; import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants; import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosUtils; import com.azure.cosmos.kafka.connect.implementation.source.CosmosMetadataStorageType; import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig; import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceContainersConfig; @@ -29,8 +33,10 @@ import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosContainerResponse; import com.azure.cosmos.models.FeedRange; +import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.PartitionKeyDefinition; import com.azure.cosmos.models.ThroughputProperties; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigValue; @@ -49,6 +55,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Collectors; @@ -237,6 +244,9 @@ private List> getFeedRangeTaskConfigs(List> feedRangeTaskConfigs = new ArrayList<>(); + String clientMetadataCachesString = getClientMetadataCachesSnapshotString(); + String throughputControlClientMetadataCachesString = getThroughputControlClientMetadataCachesSnapshotString(); + partitionedTaskUnits.forEach(feedRangeTaskUnits -> { Map taskConfigs = this.config.originalsStrings(); taskConfigs.putAll( @@ -246,6 +256,18 @@ private List> getFeedRangeTaskConfigs(List getContainersTopicMap(List containerNames = + this.monitorThread + .getAllContainers() + .map(cosmosContainerProperties -> + cosmosContainerProperties + .stream() + .map(CosmosContainerProperties::getId) + .collect(Collectors.toList())) + .block(); + CosmosAsyncDatabase database = this.cosmosClient.getDatabase(containersConfig.getDatabaseName()); + + // read a random item from each container to populate the collection cache + for (String containerName : containerNames) { + CosmosAsyncContainer container = database.getContainer(containerName); + this.readRandomItemFromContainer(container); + } + + // read a random item from throughput control container if it is enabled and use the same account config as the cosmos client + CosmosThroughputControlConfig cosmosThroughputControlConfig = this.config.getThroughputControlConfig(); + if (cosmosThroughputControlConfig.isThroughputControlEnabled()) { + if (cosmosThroughputControlConfig.getThroughputControlAccountConfig() == null) { + CosmosAsyncContainer throughputControlContainer = + this.cosmosClient + .getDatabase(cosmosThroughputControlConfig.getGlobalThroughputControlDatabaseName()) + .getContainer(cosmosThroughputControlConfig.getGlobalThroughputControlContainerName()); + readRandomItemFromContainer(throughputControlContainer); + } + } + + // read a random item from metadata container if COSMOS storage type is used + if (this.config.getMetadataConfig().getStorageType() == CosmosMetadataStorageType.COSMOS) { + readRandomItemFromContainer(database.getContainer(this.config.getMetadataConfig().getStorageName())); + } + + return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(this.cosmosClient); + } + + private String getThroughputControlClientMetadataCachesSnapshotString() { + CosmosAsyncClient throughputControlClient = null; + try { + CosmosThroughputControlConfig throughputControlConfig = this.config.getThroughputControlConfig(); + if (throughputControlConfig.isThroughputControlEnabled() + && throughputControlConfig.getThroughputControlAccountConfig() != null) { + throughputControlClient = CosmosClientStore.getCosmosClient( + this.config.getThroughputControlConfig().getThroughputControlAccountConfig(), + this.connectorName + ); + } + + if (throughputControlClient != null) { + this.readRandomItemFromContainer( + throughputControlClient + .getDatabase(throughputControlConfig.getGlobalThroughputControlDatabaseName()) + .getContainer(throughputControlConfig.getGlobalThroughputControlContainerName()) + ); + } + + return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(throughputControlClient); + } finally { + if (throughputControlClient != null) { + throughputControlClient.close(); + } + } + } + + private void readRandomItemFromContainer(CosmosAsyncContainer container) { + if (container != null) { + container.readItem(UUID.randomUUID().toString(), new PartitionKey(UUID.randomUUID().toString()), JsonNode.class) + .onErrorResume(throwable -> { + if (!KafkaCosmosExceptionsHelper.isNotFoundException(throwable)) { + LOGGER.warn("Failed to read item from container {}", container.getId(), throwable); + } + return Mono.empty(); + }) + .block(); + } + } + @Override public Config validate(Map connectorConfigs) { Config config = super.validate(connectorConfigs); diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java index 4bddb0605e954..2ee5588e88512 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java @@ -7,6 +7,8 @@ import com.azure.cosmos.CosmosClientBuilder; import com.azure.cosmos.GatewayConnectionConfig; import com.azure.cosmos.ThrottlingRetryOptions; +import com.azure.cosmos.implementation.CosmosClientMetadataCachesSnapshot; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.identity.ClientSecretCredential; import com.azure.identity.ClientSecretCredentialBuilder; @@ -26,7 +28,17 @@ public class CosmosClientStore { ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironment.AZURE_GERMANY, "https://login.microsoftonline.de/"); } - public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfig, String sourceName) { + public static CosmosAsyncClient getCosmosClient( + CosmosAccountConfig accountConfig, + String sourceName) { + + return getCosmosClient(accountConfig, sourceName, null); + } + + public static CosmosAsyncClient getCosmosClient( + CosmosAccountConfig accountConfig, + String sourceName, + CosmosClientMetadataCachesSnapshot snapshot) { if (accountConfig == null) { return null; } @@ -60,6 +72,12 @@ public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfi throw new IllegalArgumentException("Authorization type " + accountConfig.getCosmosAuthConfig().getClass() + "is not supported"); } + if (snapshot != null) { + ImplementationBridgeHelpers.CosmosClientBuilderHelper + .getCosmosClientBuilderAccessor() + .setCosmosClientMetadataCachesSnapshot(cosmosClientBuilder, snapshot); + } + return cosmosClientBuilder.buildAsyncClient(); } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosUtils.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosUtils.java new file mode 100644 index 0000000000000..6009821ea1884 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosUtils.java @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.kafka.connect.implementation; + +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.implementation.CosmosClientMetadataCachesSnapshot; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Base64; + +public class KafkaCosmosUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCosmosUtils.class); + + public static CosmosClientMetadataCachesSnapshot getCosmosClientMetadataFromString(String metadataCacheString) { + if (StringUtils.isNotEmpty(metadataCacheString)) { + byte[] inputByteArray = Base64.getDecoder().decode(metadataCacheString); + try (ObjectInputStream objectInputStream = + new ObjectInputStream(new ByteArrayInputStream(inputByteArray))) { + + return (CosmosClientMetadataCachesSnapshot) objectInputStream.readObject(); + } catch (IOException | ClassNotFoundException e) { + LOGGER.warn("Failed to deserialize cosmos client metadata cache snapshot"); + return null; + } + } + + return null; + } + + public static String convertClientMetadataCacheSnapshotToString(CosmosAsyncClient client) { + if (client == null) { + return null; + } + + CosmosClientMetadataCachesSnapshot clientMetadataCachesSnapshot = new CosmosClientMetadataCachesSnapshot(); + clientMetadataCachesSnapshot.serialize(client); + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { + ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream); + outputStream.writeObject(clientMetadataCachesSnapshot); + return Base64.getEncoder().encodeToString(byteArrayOutputStream.toByteArray()); + } catch (IOException e) { + LOGGER.warn("Failed to serialize cosmos client metadata cache snapshot", e); + return null; + } + } +} diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java index 1eb1bbcebf4c6..ebc16a74f5012 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java @@ -36,7 +36,11 @@ public String version() { public void start(Map props) { LOGGER.info("Starting the kafka cosmos sink task"); this.sinkTaskConfig = new CosmosSinkTaskConfig(props); - this.cosmosClient = CosmosClientStore.getCosmosClient(this.sinkTaskConfig.getAccountConfig(), this.sinkTaskConfig.getTaskId()); + this.cosmosClient = + CosmosClientStore.getCosmosClient( + this.sinkTaskConfig.getAccountConfig(), + this.sinkTaskConfig.getTaskId(), + this.sinkTaskConfig.getClientMetadataCachesSnapshot()); LOGGER.info("The taskId is " + this.sinkTaskConfig.getTaskId()); this.throughputControlClient = this.getThroughputControlCosmosClient(); this.sinkRecordTransformer = new SinkRecordTransformer(this.sinkTaskConfig); @@ -62,7 +66,8 @@ private CosmosAsyncClient getThroughputControlCosmosClient() { // throughput control is using a different database account config return CosmosClientStore.getCosmosClient( this.sinkTaskConfig.getThroughputControlConfig().getThroughputControlAccountConfig(), - this.sinkTaskConfig.getTaskId()); + this.sinkTaskConfig.getTaskId(), + this.sinkTaskConfig.getThroughputControlClientMetadataCachesSnapshot()); } else { return this.cosmosClient; } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTaskConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTaskConfig.java index 7c4a0c5e2420a..875dddfb3bc73 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTaskConfig.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTaskConfig.java @@ -3,17 +3,30 @@ package com.azure.cosmos.kafka.connect.implementation.sink; +import com.azure.cosmos.implementation.CosmosClientMetadataCachesSnapshot; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosUtils; import org.apache.kafka.common.config.ConfigDef; import java.util.Map; public class CosmosSinkTaskConfig extends CosmosSinkConfig { public static final String SINK_TASK_ID = "azure.cosmos.sink.task.id"; + public static final String COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT = "azure.cosmos.client.metadata.caches.snapshot"; + public static final String THROUGHPUT_CONTROL_COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT = + "azure.cosmos.throughputControl.client.metadata.caches.snapshot"; private final String taskId; + private final CosmosClientMetadataCachesSnapshot clientMetadataCachesSnapshot; + private final CosmosClientMetadataCachesSnapshot throughputControlClientMetadataCachesSnapshot; public CosmosSinkTaskConfig(Map parsedConfigs) { super(getConfigDef(), parsedConfigs); this.taskId = this.getString(SINK_TASK_ID); + this.clientMetadataCachesSnapshot = + KafkaCosmosUtils.getCosmosClientMetadataFromString( + this.getString(COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT)); + this.throughputControlClientMetadataCachesSnapshot = + KafkaCosmosUtils.getCosmosClientMetadataFromString( + this.getString(THROUGHPUT_CONTROL_COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT)); } public static ConfigDef getConfigDef() { @@ -29,10 +42,30 @@ private static void defineTaskIdConfig(ConfigDef result) { SINK_TASK_ID, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, - ConfigDef.Importance.MEDIUM); + ConfigDef.Importance.MEDIUM) + .defineInternal( + COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW + ) + .defineInternal( + THROUGHPUT_CONTROL_COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW + ); } public String getTaskId() { return taskId; } + + public CosmosClientMetadataCachesSnapshot getClientMetadataCachesSnapshot() { + return clientMetadataCachesSnapshot; + } + + public CosmosClientMetadataCachesSnapshot getThroughputControlClientMetadataCachesSnapshot() { + return throughputControlClientMetadataCachesSnapshot; + } } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/ItemWriteStrategy.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/ItemWriteStrategy.java index 9f9a04b1c3ed1..2eda513697ccf 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/ItemWriteStrategy.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/ItemWriteStrategy.java @@ -11,7 +11,7 @@ public enum ItemWriteStrategy { ITEM_OVERWRITE_IF_NOT_MODIFIED("ItemOverwriteIfNotModified"), ITEM_PATCH("ItemPatch"); - // TODO[GA] Add ItemBulkUpdate + // TODO[Post-GA] Add ItemBulkUpdate private final String name; ItemWriteStrategy(String name) { diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/StructToJsonMap.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/StructToJsonMap.java index 25925e6116aa6..6d891de66561a 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/StructToJsonMap.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/StructToJsonMap.java @@ -15,7 +15,6 @@ import java.util.List; import java.util.Map; -// TODO[GA]: Double check logic here, copied over from V1 public class StructToJsonMap { public static Map toJsonMap(Struct struct) { diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java index 84eddeadafc5c..4c9ed45687d02 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java @@ -61,8 +61,11 @@ public void start(Map map) { this.taskUnitsQueue.addAll(this.taskConfig.getFeedRangeTaskUnits()); LOGGER.info("Creating the cosmos client"); - // TODO[GA]: optimize the client creation, client metadata cache? - this.cosmosClient = CosmosClientStore.getCosmosClient(this.taskConfig.getAccountConfig(), this.taskConfig.getTaskId()); + this.cosmosClient = + CosmosClientStore.getCosmosClient( + this.taskConfig.getAccountConfig(), + this.taskConfig.getTaskId(), + this.taskConfig.getCosmosClientMetadataCachesSnapshot()); this.throughputControlCosmosClient = this.getThroughputControlCosmosClient(); } @@ -72,7 +75,8 @@ private CosmosAsyncClient getThroughputControlCosmosClient() { // throughput control is using a different database account config return CosmosClientStore.getCosmosClient( this.taskConfig.getThroughputControlConfig().getThroughputControlAccountConfig(), - this.taskConfig.getTaskId()); + this.taskConfig.getTaskId(), + this.taskConfig.getThroughputControlCosmosClientMetadataCachesSnapshot()); } else { return this.cosmosClient; } @@ -121,10 +125,8 @@ public List poll() { } return results; } catch (Exception e) { - // for error cases, we should always the task back to the queue + // for error cases, we should always put the task back to the queue this.taskUnitsQueue.add(taskUnit); - - // TODO[Public Preview]: add checking for max retries checking throw KafkaCosmosExceptionsHelper.convertToConnectException(e, "PollTask failed"); } } @@ -263,8 +265,6 @@ private List handleSuccessfulResponse( } private Mono handleFeedRangeGone(FeedRangeTaskUnit feedRangeTaskUnit) { - //TODO (xinlian-public preview): Add more debug logs - // need to find out whether it is split or merge CosmosAsyncContainer container = this.cosmosClient diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTaskConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTaskConfig.java index 5bd455c02308e..ae17884c7075f 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTaskConfig.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTaskConfig.java @@ -3,8 +3,10 @@ package com.azure.cosmos.kafka.connect.implementation.source; +import com.azure.cosmos.implementation.CosmosClientMetadataCachesSnapshot; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosUtils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -22,10 +24,16 @@ public class CosmosSourceTaskConfig extends CosmosSourceConfig { public static final String SOURCE_METADATA_TASK_UNIT = "azure.cosmos.source.task.metadataTaskUnit"; public static final String SOURCE_FEED_RANGE_TASK_UNITS = "azure.cosmos.source.task.feedRangeTaskUnits"; public static final String SOURCE_TASK_ID = "azure.cosmos.source.task.id"; + public static final String COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT = "azure.cosmos.client.metadata.caches.snapshot"; + public static final String THROUGHPUT_CONTROL_COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT = + "azure.cosmos.throughputControl.client.metadata.caches.snapshot"; private final List feedRangeTaskUnits; private final MetadataTaskUnit metadataTaskUnit; private final String taskId; + private final CosmosClientMetadataCachesSnapshot cosmosClientMetadataCachesSnapshot; + private final CosmosClientMetadataCachesSnapshot throughputControlCosmosClientMetadataCachesSnapshot; + public CosmosSourceTaskConfig(Map parsedConfigs) { super(getConfigDef(), parsedConfigs); @@ -33,6 +41,8 @@ public CosmosSourceTaskConfig(Map parsedConfigs) { this.feedRangeTaskUnits = this.parseFeedRangeTaskUnits(); this.metadataTaskUnit = this.parseMetadataTaskUnit(); this.taskId = this.getString(SOURCE_TASK_ID); + this.cosmosClientMetadataCachesSnapshot = this.parseClientMetadataCache(); + this.throughputControlCosmosClientMetadataCachesSnapshot = this.parseThroughputControlClientMetadataCache(); } public static ConfigDef getConfigDef() { @@ -61,6 +71,18 @@ private static void defineTaskUnitsConfig(ConfigDef result) { ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM + ) + .defineInternal( + COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW + ) + .defineInternal( + THROUGHPUT_CONTROL_COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW ); } @@ -102,6 +124,16 @@ private MetadataTaskUnit parseMetadataTaskUnit() { return null; } + private CosmosClientMetadataCachesSnapshot parseClientMetadataCache() { + return KafkaCosmosUtils.getCosmosClientMetadataFromString( + this.getString(COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT)); + } + + private CosmosClientMetadataCachesSnapshot parseThroughputControlClientMetadataCache() { + return KafkaCosmosUtils.getCosmosClientMetadataFromString( + this.getString(THROUGHPUT_CONTROL_COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT)); + } + public static Map getFeedRangeTaskUnitsConfigMap(List feedRangeTaskUnits) { try { Map taskConfigMap = new HashMap<>(); @@ -148,4 +180,12 @@ public MetadataTaskUnit getMetadataTaskUnit() { public String getTaskId() { return taskId; } + + public CosmosClientMetadataCachesSnapshot getCosmosClientMetadataCachesSnapshot() { + return cosmosClientMetadataCachesSnapshot; + } + + public CosmosClientMetadataCachesSnapshot getThroughputControlCosmosClientMetadataCachesSnapshot() { + return throughputControlCosmosClientMetadataCachesSnapshot; + } } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/JsonToStruct.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/JsonToStruct.java index c73af26327b5c..cd3876f2b67bf 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/JsonToStruct.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/JsonToStruct.java @@ -93,7 +93,6 @@ private static Schema inferSchema(JsonNode jsonNode) { return structBuilder.build(); case STRING: return Schema.STRING_SCHEMA; - // TODO[GA]: do we need to support binary/pojo? case BINARY: case MISSING: case POJO: diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThread.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThread.java index 37f226d3004a5..9922e6fedab17 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThread.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThread.java @@ -30,14 +30,7 @@ public class MetadataMonitorThread extends Thread { private static final Logger LOGGER = LoggerFactory.getLogger(MetadataMonitorThread.class); - // TODO[GA]: using a threadPool with less threads or single thread - public static final Scheduler CONTAINERS_MONITORING_SCHEDULER = Schedulers.newBoundedElastic( - Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, - Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, - "cosmos-source-metadata-monitoring-bounded-elastic", - 60, - true - ); + public static final Scheduler CONTAINERS_MONITORING_SCHEDULER = Schedulers.single(); private final String connectorName; private final CosmosSourceContainersConfig sourceContainersConfig; diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSinkConnectorTest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSinkConnectorTest.java index 1ef299561e368..5a1411b023be9 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSinkConnectorTest.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSinkConnectorTest.java @@ -3,21 +3,30 @@ package com.azure.cosmos.kafka.connect; +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.implementation.CosmosClientMetadataCachesSnapshot; +import com.azure.cosmos.implementation.DocumentCollection; import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.TestConfigurations; +import com.azure.cosmos.implementation.caches.AsyncCache; import com.azure.cosmos.kafka.connect.implementation.CosmosAuthType; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosUtils; import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig; import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask; import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTaskConfig; import com.azure.cosmos.kafka.connect.implementation.sink.IdStrategyType; import com.azure.cosmos.kafka.connect.implementation.sink.ItemWriteStrategy; import com.azure.cosmos.kafka.connect.implementation.sink.patch.KafkaCosmosPatchOperationType; +import com.azure.cosmos.models.CosmosContainerProperties; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.config.types.Password; +import org.testcontainers.shaded.org.apache.commons.lang3.tuple.Pair; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.Arrays; @@ -26,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -57,7 +67,7 @@ public void config() { CosmosSinkConnector sinkConnector = new CosmosSinkConnector(); ConfigDef configDef = sinkConnector.config(); Map configs = configDef.configKeys(); - List> allValidConfigs = CosmosSinkConnectorTest.SinkConfigs.ALL_VALID_CONFIGS; + List> allValidConfigs = SinkConfigs.ALL_VALID_CONFIGS; for (KafkaCosmosConfigEntry sinkConfigEntry : allValidConfigs) { System.out.println(sinkConfigEntry.getName()); @@ -114,6 +124,95 @@ public void taskConfigs() { } } + @Test(groups = "kafka-emulator") + public void taskConfigsForClientMetadataCachesSnapshot() { + CosmosSinkConnector sinkConnector = new CosmosSinkConnector(); + String connectorName = "test"; + KafkaCosmosReflectionUtils.setConnectorName(sinkConnector, connectorName); + + String throughputControlContainerName = "throughputControl-" + UUID.randomUUID(); + // create throughput control container + CosmosAsyncClient client = null; + try { + client = new CosmosClientBuilder() + .key(KafkaCosmosTestConfigurations.MASTER_KEY) + .endpoint(KafkaCosmosTestConfigurations.HOST) + .buildAsyncClient(); + CosmosContainerProperties containerProperties = new CosmosContainerProperties(throughputControlContainerName, "/groupId"); + client.getDatabase(databaseName).createContainerIfNotExists(containerProperties).block(); + + Map sinkConfigMap = new HashMap<>(); + sinkConfigMap.put("azure.cosmos.account.endpoint", KafkaCosmosTestConfigurations.HOST); + sinkConfigMap.put("azure.cosmos.account.key", KafkaCosmosTestConfigurations.MASTER_KEY); + sinkConfigMap.put("azure.cosmos.sink.database.name", databaseName); + String topicMap = + String.join( + ",", + Arrays.asList( + singlePartitionContainerName + "#" + singlePartitionContainerName, + multiPartitionContainerName + "#" + multiPartitionContainerName)); + sinkConfigMap.put("azure.cosmos.sink.containers.topicMap", topicMap); + sinkConfigMap.put("azure.cosmos.throughputControl.enabled", "true"); + sinkConfigMap.put("azure.cosmos.throughputControl.account.endpoint", KafkaCosmosTestConfigurations.HOST); + sinkConfigMap.put("azure.cosmos.throughputControl.account.key", KafkaCosmosTestConfigurations.MASTER_KEY); + sinkConfigMap.put("azure.cosmos.throughputControl.group.name", "throughputControl-metadataCachesSnapshot"); + sinkConfigMap.put("azure.cosmos.throughputControl.targetThroughput", String.valueOf(400)); + sinkConfigMap.put("azure.cosmos.throughputControl.globalControl.database.name", databaseName); + sinkConfigMap.put("azure.cosmos.throughputControl.globalControl.container.name", throughputControlContainerName); + sinkConnector.start(sinkConfigMap); + + int maxTask = 2; + List> taskConfigs = sinkConnector.taskConfigs(maxTask); + assertThat(taskConfigs.size()).isEqualTo(maxTask); + validateTaskConfigsTaskId(taskConfigs, connectorName); + + for (Map taskConfig : taskConfigs) { + assertThat(taskConfig.get("azure.cosmos.account.endpoint")).isEqualTo(KafkaCosmosTestConfigurations.HOST); + assertThat(taskConfig.get("azure.cosmos.account.key")).isEqualTo(KafkaCosmosTestConfigurations.MASTER_KEY); + assertThat(taskConfig.get("azure.cosmos.sink.database.name")).isEqualTo(databaseName); + assertThat(taskConfig.get("azure.cosmos.sink.containers.topicMap")).isEqualTo(topicMap); + + // validate the client metadata cache snapshot is also populated in the task configs + List> metadataCachesPairList = new ArrayList<>(); + metadataCachesPairList.add( + Pair.of("azure.cosmos.client.metadata.caches.snapshot", singlePartitionContainerName)); + metadataCachesPairList.add( + Pair.of("azure.cosmos.client.metadata.caches.snapshot", multiPartitionContainerName)); + metadataCachesPairList.add( + Pair.of("azure.cosmos.throughputControl.client.metadata.caches.snapshot", throughputControlContainerName)); + + for (Pair metadataCachesPair : metadataCachesPairList) { + CosmosClientMetadataCachesSnapshot clientMetadataCachesSnapshot = + KafkaCosmosUtils.getCosmosClientMetadataFromString(taskConfig.get(metadataCachesPair.getLeft())); + assertThat(clientMetadataCachesSnapshot).isNotNull(); + AsyncCache collectionInfoCache = clientMetadataCachesSnapshot.getCollectionInfoByNameCache(); + + AtomicInteger invokedCount = new AtomicInteger(0); + String cacheKey = String.format("dbs/%s/colls/%s", databaseName, metadataCachesPair.getRight()); + collectionInfoCache.getAsync(cacheKey, null, () -> { + invokedCount.incrementAndGet(); + return Mono.just(new DocumentCollection()); + }).block(); + + assertThat(invokedCount.get()).isEqualTo(0); + } + } + } finally { + if (client != null) { + client.getDatabase(databaseName) + .getContainer(throughputControlContainerName) + .delete() + .onErrorResume(throwable -> { + logger.warn("Failed to delete container {}", throughputControlContainerName, throwable); + return Mono.empty(); + }) + .block(); + + client.close(); + } + } + } + @Test(groups = "unit") public void misFormattedConfig() { CosmosSinkConnector sinkConnector = new CosmosSinkConnector(); diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorTest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorTest.java index a39d4176d9397..9d06845e511f9 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorTest.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorTest.java @@ -5,10 +5,14 @@ import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.implementation.CosmosClientMetadataCachesSnapshot; +import com.azure.cosmos.implementation.DocumentCollection; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.implementation.caches.AsyncCache; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState; @@ -18,6 +22,7 @@ import com.azure.cosmos.implementation.query.CompositeContinuationToken; import com.azure.cosmos.kafka.connect.implementation.CosmosAuthType; import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosUtils; import com.azure.cosmos.kafka.connect.implementation.source.CosmosChangeFeedMode; import com.azure.cosmos.kafka.connect.implementation.source.CosmosChangeFeedStartFromMode; import com.azure.cosmos.kafka.connect.implementation.source.CosmosMetadataStorageType; @@ -46,7 +51,9 @@ import org.apache.kafka.common.config.types.Password; import org.apache.kafka.connect.source.SourceConnectorContext; import org.mockito.Mockito; +import org.testcontainers.shaded.org.apache.commons.lang3.tuple.Pair; import org.testng.annotations.Test; +import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.Arrays; @@ -54,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -166,6 +174,82 @@ public void getTaskConfigsWithoutPersistedOffset() throws JsonProcessingExceptio } } + @Test(groups = "kafka-emulator") + public void taskConfigsForClientMetadataCachesSnapshot() { + CosmosSourceConnector sourceConnector = new CosmosSourceConnector(); + String throughputControlContainerName = "throughputControl-" + UUID.randomUUID(); + // create throughput control container + CosmosAsyncClient client = null; + + try { + client = new CosmosClientBuilder() + .key(KafkaCosmosTestConfigurations.MASTER_KEY) + .endpoint(KafkaCosmosTestConfigurations.HOST) + .buildAsyncClient(); + CosmosContainerProperties containerProperties = new CosmosContainerProperties(throughputControlContainerName, "/groupId"); + client.getDatabase(databaseName).createContainerIfNotExists(containerProperties).block(); + + String connectorName = "kafka-test-getTaskConfigForClientMetadataCachesSnapshot"; + Map sourceConfigMap = new HashMap<>(); + sourceConfigMap.put("azure.cosmos.account.endpoint", KafkaCosmosTestConfigurations.HOST); + sourceConfigMap.put("azure.cosmos.account.key", KafkaCosmosTestConfigurations.MASTER_KEY); + sourceConfigMap.put("azure.cosmos.source.database.name", databaseName); + List containersIncludedList = Arrays.asList( + singlePartitionContainerName, + multiPartitionContainerName + ); + sourceConfigMap.put("azure.cosmos.source.containers.includedList", containersIncludedList.toString()); + + String singlePartitionContainerTopicName = singlePartitionContainerName + "topic"; + List containerTopicMapList = Arrays.asList(singlePartitionContainerTopicName + "#" + singlePartitionContainerName); + sourceConfigMap.put("azure.cosmos.source.containers.topicMap", containerTopicMapList.toString()); + sourceConfigMap.put("azure.cosmos.throughputControl.enabled", "true"); + sourceConfigMap.put("azure.cosmos.throughputControl.account.endpoint", KafkaCosmosTestConfigurations.HOST); + sourceConfigMap.put("azure.cosmos.throughputControl.account.key", KafkaCosmosTestConfigurations.MASTER_KEY); + sourceConfigMap.put("azure.cosmos.throughputControl.group.name", "throughputControl-metadataCachesSnapshot"); + sourceConfigMap.put("azure.cosmos.throughputControl.targetThroughput", String.valueOf(400)); + sourceConfigMap.put("azure.cosmos.throughputControl.globalControl.database.name", databaseName); + sourceConfigMap.put("azure.cosmos.throughputControl.globalControl.container.name", throughputControlContainerName); + + // setup the internal state + this.setupDefaultConnectorInternalStatesWithMetadataKafkaReader(sourceConnector, sourceConfigMap, connectorName); + + int maxTask = 2; + List> taskConfigs = sourceConnector.taskConfigs(maxTask); + assertThat(taskConfigs.size()).isEqualTo(maxTask); + validateTaskConfigsTaskId(taskConfigs, connectorName); + + for (Map taskConfig : taskConfigs) { + // validate the client metadata cache snapshot is also populated in the task configs + List> metadataCachesPairList = new ArrayList<>(); + metadataCachesPairList.add( + Pair.of("azure.cosmos.client.metadata.caches.snapshot", singlePartitionContainerName)); + metadataCachesPairList.add( + Pair.of("azure.cosmos.client.metadata.caches.snapshot", multiPartitionContainerName)); + metadataCachesPairList.add( + Pair.of("azure.cosmos.throughputControl.client.metadata.caches.snapshot", throughputControlContainerName)); + + for (Pair metadataCachesPair : metadataCachesPairList) { + CosmosClientMetadataCachesSnapshot clientMetadataCachesSnapshot = + KafkaCosmosUtils.getCosmosClientMetadataFromString(taskConfig.get(metadataCachesPair.getLeft())); + assertThat(clientMetadataCachesSnapshot).isNotNull(); + AsyncCache collectionInfoCache = clientMetadataCachesSnapshot.getCollectionInfoByNameCache(); + + AtomicInteger invokedCount = new AtomicInteger(0); + String cacheKey = String.format("dbs/%s/colls/%s", databaseName, metadataCachesPair.getRight()); + collectionInfoCache.getAsync(cacheKey, null, () -> { + invokedCount.incrementAndGet(); + return Mono.just(new DocumentCollection()); + }).block(); + + assertThat(invokedCount.get()).isEqualTo(0); + } + } + } finally { + sourceConnector.stop(); + } + } + @Test(groups = { "kafka", "kafka-emulator" }, timeOut = TIMEOUT) public void getTaskConfigs_withMetadataCosmosStorageManager() throws JsonProcessingException { CosmosSourceConnector sourceConnector = new CosmosSourceConnector();