diff --git a/eng/ignore-links.txt b/eng/ignore-links.txt index 3d29da5e83526..b283f02db5901 100644 --- a/eng/ignore-links.txt +++ b/eng/ignore-links.txt @@ -1 +1,7 @@ https://github.com/Azure/azure-sdk-tools/blob/main/eng/common/testproxy/transition-scripts/generate-assets-json.ps1 + +# Confluent platform local addresses +http://localhost:9021/ +http://localhost:9000/ +http://localhost:9001/ +http://localhost:9004/ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/Confluent_Cloud_Setup.md b/sdk/cosmos/azure-cosmos-kafka-connect/dev/Confluent_Cloud_Setup.md new file mode 100644 index 0000000000000..c1a2a94c3e0e9 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-kafka-connect/dev/Confluent_Cloud_Setup.md @@ -0,0 +1,132 @@ +# Confluent Cloud Setup + +This guide walks through setting up Confluent Cloud using Docker containers. + +## Prerequisites + +- Bash shell + - Will not work in Cloud Shell or WSL1 +- Java 11+ ([download](https://www.oracle.com/java/technologies/javase-jdk11-downloads.html)) +- Maven ([download](https://maven.apache.org/download.cgi)) +- Docker ([download](https://www.docker.com/products/docker-desktop)) +- CosmosDB [Setting up an Azure Cosmos DB Instance] + +## Setup + +### Create Confluent Cloud Account and Setup Cluster +Go to [create account](https://www.confluent.io/get-started/) and fill out the appropriate fields. + +![SignupConfluentCloud](images/SignUpConfluentCloud.png) + +--- + +Select environments. + +![EnvironmentClick](images/environment-click.png) + +--- + +Select default which is an environment automatically setup by confluent. + +![DefaultClick](images/click-default.png) + +--- + +- Select add cluster. + +![Add Cluster](images/click-add-cluster.png) + +--- + +- Select Azure create the cluster and choose the same region as the Cosmos DB instance you created. + +![Select Azure](images/select-azure.png) + +--- + +- Name the cluster, and then select launch cluster. + +![Name and Launch](images/select-name-launch.png) + + +### Create ksqlDB Cluster +From inside the cluster select ksqlDB. Select add cluster. Select continue, name the cluster, and then select launch. + +![ksqlDB](images/select-ksqlDB.png) + +### Update Configurations +- The cluster key and secret can be found under api keys in the cluster. Choose the one for ksqlDB. Or generate client config using the CLI and Tools. ![CLI and Tools](images/cli-and-tools.png) +- The `BOOTSTRAP_SERVERS` endpoint can be found in the cluster under cluster settings and end endpoints. Or generate client config using the CLI and Tools. ![CLI and Tools](images/cli-and-tools.png) +- The schema registry key and secret can be found on the bottom of the right panel inside the confluent environment under credentials. +- The schema registry url can be found on the bottom of the right panel inside the confluent environment under Endpoint. + +![Schema Registry url](images/schema-registry.png) +![Schema Registry key and secret](images/schema-key-and-secret.png) + +### Run Integration Tests +To run the integration tests against a confluent cloud cluster, create ~/kafka-cosmos-local.properties with the following content: +``` +ACCOUNT_HOST=[emulator endpoint or you cosmos masterKey] +ACCOUNT_KEY=[emulator masterKey or your cosmos masterKey] +ACCOUNT_TENANT_ID=[update if AAD auth is required in the integration tests] +ACCOUNT_AAD_CLIENT_ID=[update if AAD auth is required in the integration tests] +ACCOUNT_AAD_CLIENT_SECRET=[update is AAD auth is required in the integration tests] +SASL_JAAS=[credential configured on the confluent cloud cluster] +BOOTSTRAP_SERVER=[bootstrap server endpoint of the confluent cloud cluster] +SCHEMA_REGISTRY_URL=[schema registry url of the cloud cluster] +SCHEMA_REGISTRY_KEY=[schema registry key of the cloud cluster] +SCHEMA_REGISTRY_SECRET=[schema registry secret of the cloud cluster] +CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3 +CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3 +CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3 +``` +Integration tests are having ITest suffix. Use following command to run integration tests([create the topic ahead of time](#create-topic-in-confluent-cloud-ui) ) +```bash +mvn -e -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-kafka-connect test package -Pkafka-integration +``` + +### Run a local sink/source workload by using confluent platform locally +- Following [Install Confluent Platform using ZIP and TAR](https://docs.confluent.io/platform/current/installation/installing_cp/zip-tar.html#prod-kafka-cli-install) to download the library +- Copy src/docker/resources/sink.example.json to the above unzipped confluent folder +- Copy src/docker/resources/source.example.json to the above unzipped confluent folder +- Update the sink.example.json and source.example.json with your cosmos endpoint +- Build the cosmos kafka connector jar +```bash +mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos,azure-cosmos-tests -am clean install +mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-kafka-connect clean install +``` +- Copy the built cosmos kafka connector jar to the plugin path folder (you can find from the etc/distributed.properties plugin.path config) +- ```cd unzipped confluent folder``` +- Update the etc/distributed.properties file with your confluent cloud cluster config +- Run ./bin/connect-distributed ./etc/distributed.properties +- Start your sink connector or source connector: ```curl -s -H "Content-Type: application/json" -X POST -d @ http://localhost:8083/connectors/ | jq .``` +- Monitor the logs and check any exceptions, and also monitor the throughput and other metrics from your confluent cloud cluster + +> If you want to delete your connector: ```curl -X DELETE http://localhost:8083/connectors/cosmosdb-source-connector-v2```. The connector name should match the one in your json config. + +> If you want to restart your connector: ```curl -s -H "Content-Type: application/json" -X POST http://localhost:8083/connectors/cosmosdb-source-connector-v2/restart | jq .``` + +> Follow [Kafka Connect REST Interface for Confluent Platform](https://docs.confluent.io/platform/current/connect/references/restapi.html) to check other options. + +### Create Topic in Confluent Cloud UI +For some cluster type, you will need to create the topic ahead of time. You can use the UI or through the [Confluent Cli](https://docs.confluent.io/cloud/current/client-apps/topics/manage.html#:~:text=Confluent%20CLI%20Follow%20these%20steps%20to%20create%20a,aren%E2%80%99t%20any%20topics%20created%20yet%2C%20click%20Create%20topic.) (Requires installing the Confluent Cli first). + +Inside the Cluster Overview, scroll down and select topics and partitions. + +![topic-partition](images/Topics-Partitions.png) + +--- + +Select add topic. + +![add-topic](images/add-topic.png) + +--- + +Name the topic and select create with defaults. Afterward, a prompt will appear about creating a schema. This can be +skipped as the tests will create the schemas. + +## Resources to Improve Infrastructure +- [Docker Configurations](https://docs.confluent.io/platform/current/installation/docker/config-reference.html) +- [Configuration Options](https://docs.confluent.io/platform/current/installation/configuration/index.html) +- [Connect Confluent Platform Components to Confluent Cloud](https://docs.confluent.io/cloud/current/cp-component/index.html) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/Confluent_Platform_Setup.md b/sdk/cosmos/azure-cosmos-kafka-connect/dev/Confluent_Platform_Setup.md new file mode 100644 index 0000000000000..976e05e3361cb --- /dev/null +++ b/sdk/cosmos/azure-cosmos-kafka-connect/dev/Confluent_Platform_Setup.md @@ -0,0 +1,68 @@ +# Confluent Platform Setup + +This guide walks through setting up Confluent Platform using Docker containers. + +## Prerequisites + +- Bash shell + - Will not work in Cloud Shell or WSL1 +- Java 11+ ([download](https://www.oracle.com/java/technologies/javase-jdk11-downloads.html)) +- Maven ([download](https://maven.apache.org/download.cgi)) +- Docker ([download](https://www.docker.com/products/docker-desktop)) +- Powershell (optional) ([download](https://learn.microsoft.com/powershell/scripting/install/installing-powershell)) + +### Startup + +> Running either script for the first time may take several minutes to run in order to download docker images for the Confluent platform components. + +```bash + +cd $REPO_ROOT/src/docker + +# Option 1: Use the bash script to setup +./startup.sh + +# Option 2: Use the powershell script to setup +pwsh startup.ps1 + +# verify the services are up and running +docker-compose ps + +``` + +> You may need to increase the memory allocation for Docker to 3 GB or more. +> +> Rerun the startup script to reinitialize the docker containers. + +Your Confluent Platform setup is now ready to use! + +### Running Kafka Connect standalone mode + +The Kafka Connect container that is included with the Confluent Platform setup runs as Kafka connect as `distributed mode`. Using Kafka Connect as `distributed mode` is *recommended* since you can interact with connectors using the Control Center UI. + +If you instead would like to run Kafka Connect as `standalone mode`, which is useful for quick testing, continue through this section. For more information on Kafka Connect standalone and distributed modes, refer to these [Confluent docs](https://docs.confluent.io/home/connect/userguide.html#standalone-vs-distributed-mode). + +### Access Confluent Platform components + +| Name | Address | Description | +| --- |-------------------------| --- | +| Control Center | | The main webpage for all Confluent services where you can create topics, configure connectors, interact with the Connect cluster (only for distributed mode) and more. | +| Kafka Topics UI | | Useful to viewing Kafka topics and the messages within them. | +| Schema Registry UI | | Can view and create new schemas, ideal for interacting with Avro data. | +| ZooNavigator | | Web interface for Zookeeper. Refer to the [docs](https://zoonavigator.elkozmon.com/en/stable/) for more information. | + +### Cleanup + +Tear down the Confluent Platform setup and cleanup any unneeded resources + +```bash + +cd $REPO_ROOT/src/docker + +# bring down all docker containers +docker-compose down + +# remove dangling volumes and networks +docker system prune -f --volumes --filter "label=io.confluent.docker" + +``` diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/CosmosDB_Setup.md b/sdk/cosmos/azure-cosmos-kafka-connect/dev/CosmosDB_Setup.md new file mode 100644 index 0000000000000..1859dc8a8f866 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-kafka-connect/dev/CosmosDB_Setup.md @@ -0,0 +1,95 @@ +# Setting up an Azure Cosmos DB Instance + +## Prerequisites + +- Azure subscription with permissions to create: + - Resource Groups, Cosmos DB +- Bash shell (tested on Visual Studio Codespaces, Cloud Shell, Mac, Ubuntu, Windows with WSL2) + - Will not work with WSL1 +- Azure CLI ([download](https://learn.microsoft.com/cli/azure/install-azure-cli?view=azure-cli-latest)) + +## Create Azure Cosmos DB Instance, Database and Container + +Login to Azure and select subscription. + +```bash + +az login + +# show your Azure accounts +az account list -o table + +# select the Azure subscription if necessary +az account set -s {subscription name or Id} + +``` + +Create a new Azure Resource Group for this quickstart, then add to it a Cosmos DB Account, Database and Container using the Azure CLI. + +> The `az cosmosdb sql` extension is currently in preview and is subject to change + +```bash + +# replace with a unique name +# do not use punctuation or uppercase (a-z, 0-9) +export Cosmos_Name={your Cosmos DB name} + +## if true, change name to avoid DNS failure on create +az cosmosdb check-name-exists -n ${Cosmos_Name} + +# set environment variables +export Cosmos_Location="centralus" +export Cosmos_Database="kafkaconnect" +export Cosmos_Container="kafka" + +# Resource Group Name +export Cosmos_RG=${Cosmos_Name}-rg-cosmos + +# create a new resource group +az group create -n $Cosmos_RG -l $Cosmos_Location + +# create the Cosmos DB server +# this command takes several minutes to run +az cosmosdb create -g $Cosmos_RG -n $Cosmos_Name + +# create the database +# 400 is the minimum --throughput (RUs) +az cosmosdb sql database create -a $Cosmos_Name -n $Cosmos_Database -g $Cosmos_RG --throughput 400 + +# create the container +# /id is the partition key (case sensitive) +az cosmosdb sql container create -p /id -g $Cosmos_RG -a $Cosmos_Name -d $Cosmos_Database -n $Cosmos_Container + +# OPTIONAL: Enable Time to Live (TTL) on the container +export Cosmos_Container_TTL=1000 +az cosmosdb sql container update -g $Cosmos_RG -a $Cosmos_Name -d $Cosmos_Database -n $Cosmos_Container --ttl=$Cosmos_Container_TTL + +``` + +With the Azure Cosmos DB instance setup, you will need to get the Cosmos DB endpoint URI and primary connection key. These values will be used to setup the Cosmos DB Source and Sink connectors. + +```bash + +# Keep note of both of the following values as they will be used later + +# get Cosmos DB endpoint URI +echo https://${Cosmos_Name}.documents.azure.com:443/ + +# get Cosmos DB primary connection key +az cosmosdb keys list -n $Cosmos_Name -g $Cosmos_RG --query primaryMasterKey -o tsv + +``` + +### Cleanup + +Remove the Cosmos DB instance and the associated resource group + +```bash + +# delete Cosmos DB instance +az cosmosdb delete -g $Cosmos_RG -n $Cosmos_Name + +# delete Cosmos DB resource group +az group delete --no-wait -y -n $Cosmos_RG + +``` diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Sink.md b/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Sink.md new file mode 100644 index 0000000000000..afb548e716f80 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Sink.md @@ -0,0 +1,361 @@ +# Kafka Connect Cosmos DB Sink Connector + +The Azure Cosmos DB sink connector allows you to export data from Apache Kafka® topics to an Azure Cosmos DB database. +The connector polls data from Kafka to write to container(s) in the database based on the topics subscription. + +## Topics covered + +- [Quickstart](#quickstart) +- [Sink configuration properties](#sink-configuration-properties) +- [Supported Data Types](#supported-data-types) +- [Single Message Transforms (SMTs)](#single-message-transforms) +- [Troubleshooting common issues](#troubleshooting-common-issues) +- [Limitations](#limitations) + +## Quickstart + +### Prerequisites + +- It is recommended to start with the Confluent Platform (recommended to use this [setup] as this gives you a complete environment to work with. + - If you do not wish to use Confluent Platform, then you need to install and configure Zookeper, Apache Kafka, Kafka Connect, yourself. You will also need to install and configure the Cosmos DB connectors manually. +- Cosmos DB Instance ([setup guide] +- Bash shell + - Will not work in Cloud Shell or WSL1 +- Java 11+ ([download](https://www.oracle.com/java/technologies/javase-jdk11-downloads.html)) +- Maven ([download](https://maven.apache.org/download.cgi)) + +### Install sink connector + +If you are using the Confluent Platform setup from this repo, the Cosmos DB Sink Connector is included in the installation and you can skip this step. + +Otherwise, you can download the JAR file from the latest [Release](https://mvnrepository.com/artifact/com.azure.cosmos.kafka/azure-cosmos-kafka-connect) or package this repo to create a new JAR file. To install the connector manually using the JAR file, refer to these [instructions](https://docs.confluent.io/current/connect/managing/install.html#install-connector-manually). + +You can also package a new JAR file from the source code. + +```bash +mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos,azure-cosmos-tests -am clean install +mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-kafka-connect clean install + +# include the following JAR file in Kafka Connect installation +ls target/azure-cosmos-kafka-connect-*.jar +``` + +### Create Kafka topic and write data + +If you are using the Confluent Platform, the easiest way to create a Kafka topic is by using the supplied Control Center UX. +Otherwise, you can create a Kafka topic manually using the following syntax: + +```bash +./kafka-topics.sh --create --zookeeper --replication-factor --partitions --topic +``` + +For this quickstart, we will create a Kafka topic named `hotels` and will write JSON data (non-schema embedded) to the topic. + +To create a topic inside Control Center, see [here](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-2-create-ak-topics). + +Next, start the Kafka console producer to write a few records to the `hotels` topic. + +```powershell + +# Option 1: If using Codespaces, use the built-in CLI utility +kafka-console-producer --broker-list localhost:9092 --topic hotels + +# Option 2: Using this repo's Confluent Platform setup, first exec into the broker container +docker exec -it broker /bin/bash +kafka-console-producer --broker-list localhost:9092 --topic hotels + +# Option 3: Using your Confluent Platform setup and CLI install +/bin/kafka-console-producer --broker-list --topic hotels + +``` + +In the console producer, enter: + +```json + +{"id": "h1", "HotelName": "Marriott", "Description": "Marriott description"} +{"id": "h2", "HotelName": "HolidayInn", "Description": "HolidayInn description"} +{"id": "h3", "HotelName": "Motel8", "Description": "Motel8 description"} + +``` + +The three records entered are published to the `hotels` Kafka topic in JSON format. + +### Create the sink connector + +Create the Cosmos DB Sink Connector in Kafka Connect + +The following JSON body defines the config for the Cosmos DB Sink Connector. + +> Note: You will need to fill out the values for `azure.cosmos.account.endpoint` and `azure.cosmos.account.key`, which you should have saved from the [Cosmos DB setup guide]. + +Refer to the [sink properties](#sink-configuration-properties) section for more information on each of these configuration properties. + +```json + +{ + "name": "cosmos-kafka-connectv2", + "config": { + "connector.class": "com.azure.cosmos.kafka.connect.CosmosSinkConnector", + "tasks.max": "5", + "topics": "GreenTaxiRecords", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "azure.cosmos.account.endpoint":"", + "azure.cosmos.account.key":"", + "azure.cosmos.applicationName": "", + "azure.cosmos.sink.database.name":"SampleDatabase", + "azure.cosmos.sink.containers.topicMap":"SampleTestContainer#SampleTestContainerSink" + } +} + +``` + +Once you have all the values filled out, save the JSON file somewhere locally. You can use this file to create the connector using the REST API. + +#### Create connector using Control Center + +An easy option to create the connector is by going through the Control Center webpage. + +Follow this [guide](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-3-install-a-ak-connector-and-generate-sample-data) to create a connector from Control Center but instead of using the `DatagenConnector` option, use the `CosmosSinkConnector` tile instead. When configuring the sink connector, fill out the values as you have filled in the JSON file. + +Alternatively, in the connectors page, you can upload the JSON file from earlier by using the `Upload connector config file` option. + +![Upload connector config](images/upload-connector-config.png "Upload connector config") + +#### Create connector using REST API + +Create the sink connector using the Connect REST API + +```bash + +# Curl to Kafka connect service +curl -H "Content-Type: application/json" -X POST -d @ http://localhost:8083/connectors + +``` + +### Confirm data written to Cosmos DB + +Check that the three records from the `hotels` topic are created in Cosmos DB. + +Navigate to your Cosmos DB instance on Azure portal You should see something like this: + +![CosmosDB sink records](images/cosmosdb-sink-records.png "CosmosDB sink records") + +### Cleanup + +To delete the connector from the Control Center, navigate to the sink connector you created and click the `Delete` icon. + +![Delete connector](images/delete-connector.png "Delete connector") + +Alternatively, use the Connect REST API. + +```bash +# Curl to Kafka connect service +curl -X DELETE http://localhost:8083/connectors/cosmosdb-sink-connector +``` + +To delete the created Azure Cosmos DB service and its resource group using Azure CLI, refer to these [steps]. + +## Sink configuration properties + +The following settings are used to configure the Cosmos DB Kafka Sink Connector. These configuration values determine which Kafka topics data is consumed, which Cosmos DB containers data is written into and formats to serialize the data. For an example configuration file with the default values, refer to [this config]. + +- [Generic Configs For Sink And Source] +- [Configs only for Sink] + +Data will always be written to the Cosmos DB as JSON without any schema. + +## Supported Data Types +Azure Cosmos DB sink connector converts SinkRecord in to JSON Document supporting below schema types from listed valid [Schema.Types](https://kafka.apache.org/21/javadoc/org/apache/kafka/connect/data/Schema.Type.html) + +| Schema Type | JSON Data Type | +| :--- | :--- | +| Array | Array | +| Boolean | Boolean | +| Float32 | Number | +| Float64 | Number | +| Int8 | Number | +| Int16 | Number | +| Int32 | Number | +| Int64 | Number| +| Map | Object (JSON)| +| String | String
Null | +| Struct | Object (JSON) | + +Cosmos DB sink Connector also supports the following AVRO logical types: + +| Schema Type | JSON Data Type | +| :--- | :--- | +| Date | Number | +| Time | Number | +| Timestamp | Number | + +>**Note:** Byte deserialization is currently not supported by Azure Cosmos DB sink connector. + +## Single Message Transforms + +Along with the Sink connector settings, you can specify the use of Single Message Transformations (SMTs) to modify messages flowing through the Kafka Connect platform. Refer to the [Confluent SMT Documentation](https://docs.confluent.io/platform/current/connect/transforms/overview.html) for more information. + +### Using the InsertUUID SMT to automatically add item IDs + +With the custom `InsertUUID` SMT, you can insert the `id` field with a random UUID value for each message, before it is written to Cosmos DB. + +> WARNING: Only use this SMT if the messages do **NOT** contain the `id` field. Otherwise, the `id` values will be **overwritten** and you may end up with duplicate items in your database. + +Note: Using UUIDs as the message ID can be quick and easy but are [not an ideal partition key](https://stackoverflow.com/questions/49031461/would-using-a-substring-of-a-guid-in-cosmosdb-as-partitionkey-be-a-bad-idea) to use in Cosmos DB. + +#### Install the SMT + +Before you can use the `InsertUUID` SMT, you will need to install this transform in your Confluent Platform setup. If you are using the Confluent Platform setup from this repo, the transform is already included in the installation and you can skip this step. + +Alternatively, you can package the [InsertUUID source](https://github.com/confluentinc/kafka-connect-insert-uuid) to create a new JAR file. To install the connector manually using the JAR file, refer to these [instructions](https://docs.confluent.io/current/connect/managing/install.html#install-connector-manually). + +```bash + +# clone the kafka-connect-insert-uuid repo +https://github.com/confluentinc/kafka-connect-insert-uuid.git +cd kafka-connect-insert-uuid + +# package the source code into a JAR file +mvn clean package + +# include the following JAR file in Confluent Platform installation +ls target/*.jar + +``` + +#### Configure the SMT + +Inside your Sink connector config, add the following properties to set the `id`. + +```json + +"transforms": "insertID", +"transforms.insertID.type": "com.github.cjmatta.kafka.connect.smt.InsertUuid$Value", +"transforms.insertID.uuid.field.name": "id" + +``` + +Refer to the [InsertUUID repository](https://github.com/confluentinc/kafka-connect-insert-uuid) for more information on using this SMT. + +### Using SMTs to configure Time to live (TTL) for Cosmos DB items + +Using both the `InsertField` and `Cast` SMTs, you can add specify the TTL on each item created in Cosmos DB. + +> Note: You will need to enable TTL on the Cosmos DB container to enable TTL at an item level. Refer to the [Cosmos DB setup guide] or the [Cosmos DB docs](https://learn.microsoft.com/azure/cosmos-db/nosql/how-to-time-to-live?tabs=dotnet-sdk-v3#enable-time-to-live-on-a-container-using-azure-portal) for more information on setting the TTL. + +Inside your Sink connector config, add the following properties to set the TTL (in seconds). In this following example, the TTL is set to 100 seconds. + +> Note: If the message already contains the `TTL` field, the `TTL` value will be overwritten by these SMTs. + +```json + +"transforms": "insertTTL,castTTLInt", +"transforms.insertTTL.type": "org.apache.kafka.connect.transforms.InsertField$Value", +"transforms.insertTTL.static.field": "ttl", +"transforms.insertTTL.static.value": "100", +"transforms.castTTLInt.type": "org.apache.kafka.connect.transforms.Cast$Value", +"transforms.castTTLInt.spec": "ttl:int32" + +``` + +Refer to the [InsertField](https://docs.confluent.io/platform/current/connect/transforms/insertfield.html) and [Cast](https://docs.confluent.io/platform/current/connect/transforms/cast.html) documentation for more information on using these SMTs. + +## Troubleshooting common issues + +Here are solutions to some common problems that you may encounter when working with the Cosmos DB Kafka Sink Connector. + +### Reading non-JSON data with JsonConverter + +If you have non-JSON data on your source topic in Kafka and attempt to read it using the JsonConverter, you will see the following exception: + +```none + +org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: +… +org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7) + +``` + +This is likely caused by data in the source topic being serialized in either Avro or another format (like CSV string). + +**Solution**: If the topic data is actually in Avro, then change your Kafka Connect sink connector to use the AvroConverter as shown below. + +```json + +"value.converter": "io.confluent.connect.avro.AvroConverter", +"value.converter.schema.registry.url": "http://schema-registry:8081", + +``` + +### Reading non-Avro data with AvroConverter + +When you try to use the Avro converter to read data from a topic that is not Avro. This would include data written by an Avro serializer other than the Confluent Schema Registry’s Avro serializer, which has its own wire format. + +```none + +org.apache.kafka.connect.errors.DataException: my-topic-name +at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97) +… +org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 +org.apache.kafka.common.errors.SerializationException: Unknown magic byte! + +``` + +**Solution**: Check the source topic’s serialization format. Then, either switch Kafka Connect’s sink connector to use the correct converter, or switch the upstream format to Avro. + +### Reading a JSON message without the expected schema/payload structure + +Kafka Connect supports a special structure of JSON messages containing both payload and schema as follows. + + ```json + +{ + "schema": { + "type": "struct", + "fields": [ + { + "type": "int32", + "optional": false, + "field": "userid" + }, + { + "type": "string", + "optional": false, + "field": "name" + } + ] + }, + "payload": { + "userid": 123, + "name": "Sam" + } +} + +``` + +If you try to read JSON data that does not contain the data in this structure, you will get this error: + +```none + +org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration. + +``` + +To be clear, the only JSON structure that is valid for schemas.enable=true has schema and payload fields as the top-level elements (shown above). + +As the message itself states, if you just have plain JSON data, you should change your connector’s configuration to: + +```json + +"value.converter": "org.apache.kafka.connect.json.JsonConverter", +"value.converter.schemas.enable": "false", + +``` + +## Limitations + +- Auto-creation of databases and containers within Cosmos DB are not supported. The database and containers must already exist, and they must be configured to use these. diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Source.md b/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Source.md new file mode 100644 index 0000000000000..5a090e801627a --- /dev/null +++ b/sdk/cosmos/azure-cosmos-kafka-connect/dev/README_Source.md @@ -0,0 +1,167 @@ +# Kafka Connect Cosmos DB Source Connector + +The Azure Cosmos DB Source connector provides the capability to read data from the Cosmos DB Change Feed and publish this data to a Kafka topic. + +## Topics covered + +- [Quickstart](#quickstart) +- [Source configuration properties](#source-configuration-properties) +- [Supported Data Types](#supported-data-types) + +## Quickstart + +### Prerequisites + +- It is recommended to start with the Confluent Platform (recommended to use this setup) as this gives you a complete environment to work with. + - If you do not wish to use Confluent Platform, then you need to install and configure Zookeper, Apache Kafka, Kafka Connect, yourself. You will also need to install and configure the Cosmos DB connectors manually. +- Cosmos DB Instance ([setup guide]) +- Bash shell + - Will not work in Cloud Shell or WSL1 +- Java 11+ ([download](https://www.oracle.com/java/technologies/javase-jdk11-downloads.html)) +- Maven ([download](https://maven.apache.org/download.cgi)) + +### Install source connector + +If you are using the Confluent Platform setup from this repo, the Cosmos DB Source Connector is included in the installation and you can skip this step. + +Otherwise, you can use JAR file from latest [Release](https://mvnrepository.com/artifact/com.azure.cosmos.kafka/azure-cosmos-kafka-connect) and install the connector manually, refer to these [instructions](https://docs.confluent.io/current/connect/managing/install.html#install-connector-manually). + +You can also package a new JAR file from the source code. + +```bash +mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos,azure-cosmos-tests -am clean install +mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-kafka-connect clean install + +# include the following JAR file in Kafka Connect installation +ls target/azure-cosmos-kafka-connect-*.jar + +``` + +### Create Kafka topic + +Create a Kafka topic using Confluent Control Center. For this quickstart, we will create a Kafka topic named `apparels` and will write JSON data (non-schema embedded) to the topic. + +To create a topic inside Control Center, see [here](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-2-create-ak-topics). + +### Create the source connector + +#### Create the Cosmos DB Source Connector in Kafka Connect + +The following JSON body defines the config for the Cosmos DB Source Connector. + +>Note: You will need to replace placeholder values for below properties which you should have saved from the [Cosmos DB setup guide]. +>`azure.cosmos.account.endpoint` +>`azure.cosmos.account.key` + +Refer to the [source properties](#source-configuration-properties) section for more information on each of these configuration properties. + +```json +{ + "name": "cosmosdb-source-connectorv2", + "config": { + "connector.class": "com.azure.cosmos.kafka.connect.CosmosSourceConnector", + "tasks.max": "5", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "azure.cosmos.account.endpoint":"{endpoint}", + "azure.cosmos.account.key":"{masterKey}", + "azure.cosmos.application.name": "{applicationName}", + "azure.cosmos.source.database.name":"{database}", + "azure.cosmos.source.containers.includedList":"{container}", + "azure.cosmos.source.changeFeed.maxItemCountHint":"500", + "azure.cosmos.source.containers.topicMap":"{topic}#{container}", + "azure.cosmos.source.metadata.storage.type":"Cosmos", + "azure.cosmos.source.metadata.storage.name":"{metadataContainerName}" + } +} + +``` + +Once you have all the values filled out, save the JSON file somewhere locally. You can use this file to create the connector using the REST API. + +#### Create connector using Control Center + +An easy option to create the connector is by going through the Control Center webpage. + +Follow this [guide](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-3-install-a-ak-connector-and-generate-sample-data) to create a connector from Control Center but instead of using the `DatagenConnector` option, use the `CosmosSourceConnector` tile instead. When configuring the source connector, fill out the values as you have filled in the JSON file. + +Alternatively, in the connectors page, you can upload the JSON file from earlier by using the `Upload connector config file` option. + +![Upload connector config](images/upload-connector-config.png "Upload connector config") + +#### Create connector using REST API + +Create the source connector using the Connect REST API + +```bash + +# Curl to Kafka connect service +curl -H "Content-Type: application/json" -X POST -d @ http://localhost:8083/connectors + +``` + +### Insert document in to Cosmos DB + +Use [Cosmos DB setup guide] to create and set up Cosmos DB Instance. + +- Sign into the [Azure portal](https://portal.azure.com/learn.docs.microsoft.com) using the account you activated. +- On the Azure portal menu (left hand side blue lines at the top), select All services. +- Select Databases > Azure Cosmos DB. Then select the DB you just created, click Data Explorer at the top. +- To create a new JSON document, in the SQL API pane, expand `kafka`, select Items, then select New Item in the toolbar. +- Now, add a document to the container with the following structure. Paste the following sample JSON block into the Items tab, overwriting the current content: + + ``` json + + { + "id": "2", + "productId": "33218897", + "category": "Women's Outerwear", + "manufacturer": "Contoso", + "description": "Black wool pea-coat", + "price": "49.99", + "shipping": { + "weight": 2, + "dimensions": { + "width": 8, + "height": 11, + "depth": 3 + } + } + } + + ``` + +- Select Save. +- Confirm the document has been saved by clicking Items on the left-hand menu. + +### Confirm data written to Kafka Topic + +- Open Kafka Topic UI on +- Select the Kafka topic `apparels` you created +- Verify that the document inserted in to Cosmos DB earlier appears in the Kafka topic. + +### Cleanup + +To delete the connector from the Control Center, navigate to the source connector you created and click the `Delete` icon. + +![Delete connector](images/delete-source-connector.png "Delete connector") + +Alternatively, use the Connect REST API. + +```bash + +# Curl to Kafka connect service +curl -X DELETE http://localhost:8083/connectors/cosmosdb-source-connector + +``` + +To delete the created Azure Cosmos DB service and its resource group using Azure CLI, refer to these [steps]. + +## Source configuration properties + +The following settings are used to configure the Cosmos DB Kafka Source Connector. These configuration values determine which Cosmos DB container is consumed, which Kafka topics data is written into and formats to serialize the data. For an example configuration file with the default values, refer to [this config]. + +- [Generic Configs For Sink And Source] +- [Configs only for Source] diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/SignUpConfluentCloud.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/SignUpConfluentCloud.png new file mode 100644 index 0000000000000..66bf0562f3cc9 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/SignUpConfluentCloud.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/Topics-Partitions.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/Topics-Partitions.png new file mode 100644 index 0000000000000..05e8e4981c071 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/Topics-Partitions.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/add-topic.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/add-topic.png new file mode 100644 index 0000000000000..d5cc688ebf90b Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/add-topic.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/attach-pr-to-issue.PNG b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/attach-pr-to-issue.PNG new file mode 100644 index 0000000000000..95b780b335100 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/attach-pr-to-issue.PNG differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/cli-and-tools.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/cli-and-tools.png new file mode 100644 index 0000000000000..97b8052da8ade Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/cli-and-tools.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/click-add-cluster.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/click-add-cluster.png new file mode 100644 index 0000000000000..45370b97c8695 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/click-add-cluster.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/click-default.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/click-default.png new file mode 100644 index 0000000000000..84c498c950562 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/click-default.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/converter-misconfigurations.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/converter-misconfigurations.png new file mode 100644 index 0000000000000..ee6ae42e3d803 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/converter-misconfigurations.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/cosmosdb-sink-records.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/cosmosdb-sink-records.png new file mode 100644 index 0000000000000..6660ae55ae5bb Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/cosmosdb-sink-records.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/delete-connector.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/delete-connector.png new file mode 100644 index 0000000000000..f3e7b07fba5aa Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/delete-connector.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/delete-source-connector.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/delete-source-connector.png new file mode 100644 index 0000000000000..879a57d28de1e Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/delete-source-connector.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/env-configurations.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/env-configurations.png new file mode 100644 index 0000000000000..04bd4e4a08632 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/env-configurations.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/environment-click.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/environment-click.png new file mode 100644 index 0000000000000..90c3ceffd11c3 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/environment-click.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/doc/images/microsoft.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/microsoft.png similarity index 100% rename from sdk/cosmos/azure-cosmos-kafka-connect/doc/images/microsoft.png rename to sdk/cosmos/azure-cosmos-kafka-connect/dev/images/microsoft.png diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/name-topic.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/name-topic.png new file mode 100644 index 0000000000000..2304affe1f1b0 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/name-topic.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/schema-key-and-secret.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/schema-key-and-secret.png new file mode 100644 index 0000000000000..3b4b44c15c467 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/schema-key-and-secret.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/schema-registry.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/schema-registry.png new file mode 100644 index 0000000000000..5e0188466d366 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/schema-registry.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/select-azure.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/select-azure.png new file mode 100644 index 0000000000000..4e4528f5d249f Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/select-azure.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/select-enable-schema.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/select-enable-schema.png new file mode 100644 index 0000000000000..c11c530aaeba2 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/select-enable-schema.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/select-ksqlDB.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/select-ksqlDB.png new file mode 100644 index 0000000000000..c547a69c4fef2 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/select-ksqlDB.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/select-name-launch.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/select-name-launch.png new file mode 100644 index 0000000000000..367d2901185e0 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/select-name-launch.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/select-schema-region.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/select-schema-region.png new file mode 100644 index 0000000000000..06c3fd6059c23 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/select-schema-region.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/upload-connector-config.png b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/upload-connector-config.png new file mode 100644 index 0000000000000..ec3dec8048d38 Binary files /dev/null and b/sdk/cosmos/azure-cosmos-kafka-connect/dev/images/upload-connector-config.png differ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/dev/setup.md b/sdk/cosmos/azure-cosmos-kafka-connect/dev/setup.md new file mode 100644 index 0000000000000..6ad7ce91d2a4f --- /dev/null +++ b/sdk/cosmos/azure-cosmos-kafka-connect/dev/setup.md @@ -0,0 +1,65 @@ +# Dev azure cosmos kafka connect client library for Java + +### Prerequisites + +Ensure you have the following prerequisites installed. +- Bash shell + - Will not work in Cloud Shell or WSL1 +- Docker ([download](https://www.docker.com/products/docker-desktop)) +- Git +- Java 11+ ([download](https://www.oracle.com/java/technologies/javase-jdk11-downloads.html)) +- Maven ([download](https://maven.apache.org/download.cgi)) + +### How to run validation + +#### To run unit tests: +``` +mvn -e -Dgpg.skip -Dmaven.javadoc.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-kafka-connect test package -Punit +``` + +#### To run integration tests: +You can run the integration tests against a local kafka cluster by using [Test Containers](https://testcontainers.com), or you can run the integration tests against confluent cloud cluster. + +To run the integration tests against a local kafka cluster, create ~/kafka-cosmos-local.properties with the following content: +``` +ACCOUNT_HOST=[emulator endpoint or you cosmos masterKey] +ACCOUNT_KEY=[emulator masterKey or your cosmos masterKey] +ACCOUNT_TENANT_ID=[update if AAD auth is required in the integration tests] +ACCOUNT_AAD_CLIENT_ID=[update if AAD auth is required in the integration tests] +ACCOUNT_AAD_CLIENT_SECRET=[update is AAD auth is required in the integration tests] +SASL_JAAS= +BOOTSTRAP_SERVER= +CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 +CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 +CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 +``` + +To run the integration tests against a confluent cloud cluster, create ~/kafka-cosmos-local.properties with the following content: +``` +ACCOUNT_HOST=[emulator endpoint or you cosmos masterKey] +ACCOUNT_KEY=[emulator masterKey or your cosmos masterKey] +ACCOUNT_TENANT_ID=[update if AAD auth is required in the integration tests] +ACCOUNT_AAD_CLIENT_ID=[update if AAD auth is required in the integration tests] +ACCOUNT_AAD_CLIENT_SECRET=[update is AAD auth is required in the integration tests] +SASL_JAAS=[credential configured on the confluent cloud cluster] +BOOTSTRAP_SERVER=[bootstrap server endpoint of the confluent cloud cluster] +SCHEMA_REGISTRY_URL=[schema registry url of the cloud cluster] +SCHEMA_REGISTRY_KEY=[schema registry key of the cloud cluster] +SCHEMA_REGISTRY_SECRET=[schema registry secret of the cloud cluster] +CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3 +CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3 +CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3 +``` +Please follow [Confluent_Cloud_Setup] to setup a confluent cloud cluster. +Integration tests are having ITest suffix. Use following command to run integration tests: +```bash +mvn -e -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-kafka-connect test package -Pkafka-integration +``` + +### Configure Confluent Platform, Cosmos DB and validate Kafka Connectors + +- [Confluent Platform Setup] +- [Confluent Cloud Setup] +- [Setting up an Azure Cosmos DB Instance] +- [Kafka Connect Cosmos DB Sink Connector] +- [Kafka Connect Cosmos DB Source Connector] diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md b/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md index 9e5b224b60a12..e48b3d4b80f8f 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md +++ b/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md @@ -1,6 +1,6 @@ ## Configuration Reference: -## Generic Configuration +## Generic Configurations | Config Property Name | Default | Description | |:------------------------------------------------------------------|:-----------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| @@ -33,7 +33,7 @@ | `azure.cosmos.throughputControl.globalControl.renewIntervalInMS` | `-1` | This controls how often the client is going to update the throughput usage of itself and adjust its own throughput share based on the throughput usage of other clients. Default is 5s, the allowed min value is 5s. | | `azure.cosmos.throughputControl.globalControl.expireIntervalInMS` | `-1` | This controls how quickly we will detect the client has been offline and hence allow its throughput share to be taken by other clients. Default is 11s, the allowed min value is 2 * renewIntervalInMS + 1. | -## Source Connector Configuration +## Source Connector Configurations | Config Property Name | Default | Description | |:--------------------------------------------------|:-------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `azure.cosmos.source.database.name` | None | Cosmos DB database name. | @@ -49,7 +49,7 @@ | `azure.cosmos.source.messageKey.enabled` | `true` | Whether to set the kafka record message key. | | `azure.cosmos.source.messageKey.field` | `id` | The field to use as the message key. | -## Sink Connector Configuration +## Sink Connector Configurations | Config Property Name | Default | Description | |:-----------------------------------------------------------|:--------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `azure.cosmos.sink.database.name` | `""` | Cosmos DB database name. | diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/resources/sink.example.json b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/resources/sink.example.json new file mode 100644 index 0000000000000..782d9fc45f8cd --- /dev/null +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/resources/sink.example.json @@ -0,0 +1,17 @@ +{ + "name": "cosmosdb-sink-connector-v2", + "config": { + "connector.class": "com.azure.cosmos.kafka.connect.CosmosSinkConnector", + "tasks.max": "5", + "topics": "GreenTaxiRecords", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "azure.cosmos.account.endpoint":"{endpoint}", + "azure.cosmos.account.key":"{masterKey}", + "azure.cosmos.applicationName": "{applicationName}", + "azure.cosmos.sink.database.name":"{databaseName}", + "azure.cosmos.sink.containers.topicMap":"{topic}#{container}" + } +} diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/resources/source.example.json b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/resources/source.example.json new file mode 100644 index 0000000000000..3fbb1901f338e --- /dev/null +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/resources/source.example.json @@ -0,0 +1,20 @@ +{ + "name": "cosmosdb-source-connector-v2", + "config": { + "connector.class": "com.azure.cosmos.kafka.connect.CosmosSourceConnector", + "tasks.max": "5", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "azure.cosmos.account.endpoint":"{endpoint}", + "azure.cosmos.account.key":"{masterKey}", + "azure.cosmos.application.name": "{applicationName}", + "azure.cosmos.source.database.name":"{database}", + "azure.cosmos.source.containers.includedList":"{container}", + "azure.cosmos.source.changeFeed.maxItemCountHint":"500", + "azure.cosmos.source.containers.topicMap":"{topic}#{container}", + "azure.cosmos.source.metadata.storage.type":"Cosmos", + "azure.cosmos.source.metadata.storage.name":"{metadataContainerName}" + } +} diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/startup.ps1 b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/startup.ps1 index b496a72d47562..161f9ada50e17 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/startup.ps1 +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/startup.ps1 @@ -10,8 +10,8 @@ New-Item -Path "$PSScriptRoot" -ItemType "directory" -Name "connectors" -Force | cd $PSScriptRoot/../.. Write-Host "Rebuilding Cosmos DB connectors..." -mvn clean package -DskipTests -Dmaven.javadoc.skip -copy target\*-jar-with-dependencies.jar $PSScriptRoot/connectors +mvn --% clean package -DskipTests -Dmaven.javadoc.skip +ChildItem -Path $PSScriptRoot/../../../target -Filter "azure-cosmos-kafka-connect-*.jar" | Where-Object { $_.Name -notlike "azure-cosmos-kafka-connect-*-sources.jar" } | Copy-Item -Destination $PSScriptRoot/connectors cd $PSScriptRoot Write-Host "Adding custom Insert UUID SMT" diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/startup.sh b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/startup.sh index 1f5dbd0566482..7e46abf71a23f 100755 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/startup.sh +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/startup.sh @@ -9,7 +9,7 @@ cd ../../ echo "Rebuilding Cosmos DB connectors..." mvn clean package -DskipTests=true -Dmaven.javadoc.skip=true -cp target/*-jar-with-dependencies.jar src/docker/connectors +find target/ . -name 'azure-cosmos-kafka-connect-*.jar' ! -name 'azure-cosmos-kafka-connect-*-sources.jar' -exec cp {} src/docker/connectors \; cd src/docker echo "Adding custom Insert UUID SMT" @@ -24,4 +24,4 @@ echo "Building Cosmos DB Kafka Connect Docker image" docker build . -t cosmosdb-kafka-connect:latest echo "Starting Docker Compose..." -docker-compose up -d \ No newline at end of file +docker-compose up -d