Skip to content

An example that shows how to sink data from Confluent Cloud to Neo4j, by running Kafka Connect in a Docker container.

Notifications You must be signed in to change notification settings

alexwoolford/ccloud-neo4j

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

How to sink data from Confluent Cloud to Neo4j

The Neo4j connector isn't included in the Kafka Connect Docker image. In fact, none of the connectors are. This is done to keep the Docker Kafka Connect instance lightweight.

The first step is to create a custom Docker Kafka Connect image that contains the Neo4j connector. We do this by creating a Dockerfile containing a reference to the base image and a command to install the Neo4j connector:

FROM confluentinc/cp-kafka-connect:6.1.1
RUN confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:1.0.9

The instance was based on the confluentinc/cp-kafka-connect image. For more details, read Confluent extending images docs.

We can then build the instance:

docker build . -t ccloud-neo4j:1.0.0

We then login to Confluent Cloud, create a cluster, generate API key(s), and get the connection properties. This can be done via the UI or CLI.

We can now spin-up the Kafka Connect container that contains the Neo4j sink:

docker run -d \
  --name=kafka-connect \
  -p 8083:8083 \
  -e CONNECT_BOOTSTRAP_SERVERS="pkc-lgk0v.us-west1.gcp.confluent.cloud:9092" \
  -e CONNECT_GROUP_ID="ccloud-docker-connect" \
  -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
  -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
  -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
  -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
  -e CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components \
  -e CONNECT_REST_PORT=8083 \
  -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
  -e CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM="https" \
  -e CONNECT_SASL_MECHANISM="PLAIN" \
  -e CONNECT_REQUEST_TIMEOUT_MS="20000" \
  -e CONNECT_RETRY_BACKOFF_MS="500" \
  -e CONNECT_SECURITY_PROTOCOL="SASL_SSL" \
  -e CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM="https" \
  -e CONNECT_CONSUMER_SASL_MECHANISM="PLAIN" \
  -e CONNECT_CONSUMER_REQUEST_TIMEOUT_MS="20000" \
  -e CONNECT_CONSUMER_RETRY_BACKOFF_MS="500" \
  -e CONNECT_CONSUMER_SECURITY_PROTOCOL="SASL_SSL" \
  -e CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"5ZR7JIDVI2N2Q5J3\" password=\"OnM75g61oRERtxnKE/7CVV44VwzPAMwI+Cb+bMYC1Tx0wIsL2IYEl4vM5d******\";" \
  -e CONNECT_CONSUMER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"5ZR7JIDVI2N2Q5J3\" password=\"OnM75g61oRERtxnKE/7CVV44VwzPAMwI+Cb+bMYC1Tx0wIsL2IYEl4vM5d******\";" \
  -e CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY="All" \
  -e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO \
  ccloud-neo4j:1.0.0

As you can see, there are quite a few properties. At the very least, it'll be necessary to change the BOOTSTRAP_SERVERS, and the API key/secret in SASL_JAAS_CONFIG and CONNECT_CONSUMER_SASL_JAAS_CONFIG. For a more detailed descriptions of the properties, take a look at the Kafka Connect long configuration docs.

Once Dockerized Kafka Connect instance is running, we can deploy the Neo4j sink connector:

http PUT localhost:8083/connectors/kafkaexample-neo4j/config <<< '
{
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "name": "kafkaexample-neo4j",
    "neo4j.database": "kafkaexample",
    "neo4j.authentication.basic.password": "V1ctoria",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.server.uri": "bolt://neo4j.woolford.io:7687",
    "neo4j.topic.cypher.pageview": "MERGE(u:User {user: event.user}) MERGE(p:Page {page: event.page}) MERGE(u)-[:VIEWED]->(p)",
    "topics": "pageview",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
}'

Note the templated Cypher statement. This connector will take messages, as they're produced, and write them to a Neo4j database by injecting the values from the messages into the Cypher statement.

About

An example that shows how to sink data from Confluent Cloud to Neo4j, by running Kafka Connect in a Docker container.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published