Skip to content

Commit

Permalink
add doc
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jul 24, 2023
1 parent b22978e commit 1cd62fd
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 8 deletions.
22 changes: 22 additions & 0 deletions docs/en/seatunnel-engine/checkpoint-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,25 @@ seatunnel:
```

#### Kafka

Kafka is not only a message queue but also a storage engine, especially the Kafka compact topic, which can be used as a kv storage engine; Kafka compact topic supports key merging and tombstone message deletion

```yaml
seatunnel:
engine:
checkpoint:
interval: 6000
timeout: 7000
max-concurrent: 1
tolerable-failure: 2
storage:
type: kafka
max-retained: 3
plugin-config:
bootstrap.servers: localhost:9092
storage.compact.topic.replication.factor: 1
storage.compact.topic.partition: 12
checkpoint.storage.compact.topic: checkpoint-storage-config
```

4 changes: 3 additions & 1 deletion seatunnel-dist/release-docs/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ The text of each license is the standard Apache 2.0 license.
(Apache-2.0) listenablefuture (com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava https://mvnrepository.com/artifact/com.google.guava/listenablefuture/9999.0-empty-to-avoid-conflict-with-guava)
(Apache-2.0) accessors-smart (com.google.guava:accessors-smart:2.4.7 - https://mvnrepository.com/artifact/net.minidev/accessors-smart)
(Apache-2.0) json-smart (net.minidev:json-smart:2.4.7 - https://mvnrepository.com/artifact/net.minidev/json-smart)

(Apache-2.0) kafka-clients (org.apache.kafka:kafka-clients:3.4.1 - https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients)
(Apache-2.0) lz4-java (org.lz4:lz4-java:1.8.0 - https://mvnrepository.com/artifact/org.lz4/lz4-java)
========================================================================
MOZILLA PUBLIC LICENSE License
========================================================================
Expand All @@ -294,6 +295,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (com.google.protobuf:protobuf-java:2.5.0 - http://code.google.com/p/protobuf)
(BSD 3-Clause) Scala Library (org.scala-lang:scala-library:2.11.12 - http://www.scala-lang.org/)
(BSD 3-Clause) Scala Library (org.ow2.asm:asm:9.1 - https://mvnrepository.com/artifact/org.ow2.asm/asm/)
(BSD 2-Clause) zstd-jni (com.github.luben:zstd-jni:1.5.2-1 - https://mvnrepository.com/artifact/com.github.luben/zstd-jni)
========================================================================
CDDL License
========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,18 @@ private void poll(

@Override
public String storeCheckPoint(PipelineState state) throws CheckpointStorageException {
String checkPointUniqueName = generateUniqueKeyForKafka(state);
String checkpointName = getCheckPointName(state);
String checkPointUniqueName = generateUniqueKeyForKafka(state.getJobId(), checkpointName);
try {
byte[] key = serializer.serialize(checkPointUniqueName);
byte[] value = serializeCheckPointData(state);
// Sync send data
this.producer
.send(new ProducerRecord<>(kafkaConfiguration.getStorageTopic(), key, value))
.get();
// update cache
cache.put(state.getJobId(), checkpointName, state);

} catch (IOException e) {
throw new CheckpointStorageException(
"Failed to serialize checkpoint data,state is :" + state, e);
Expand Down Expand Up @@ -367,8 +371,11 @@ public void deleteCheckpoint(String jobId) {
try {
String storeUniqueKey = generateUniqueKeyForKafka(jobId, checkpointName);
byte[] key = serializer.serialize(storeUniqueKey);
producer.send(
new ProducerRecord<>(kafkaConfiguration.getStorageTopic(), key, null));
// persist data
producer.send(new ProducerRecord<>(kafkaConfiguration.getStorageTopic(), key, null))
.get();
// update cache
cache.remove(jobId, checkpointName);
} catch (Exception e) {
log.error("Failed to delete checkpoint for job {}", jobId, e);
}
Expand Down Expand Up @@ -416,9 +423,20 @@ public void deleteCheckpoint(String jobId, String pipelineId, String checkpointI
try {
String storeUniqueKey = generateUniqueKeyForKafka(jobId, checkpointName);
byte[] key = serializer.serialize(storeUniqueKey);
// persist data
producer.send(
new ProducerRecord<>(kafkaConfiguration.getStorageTopic(), key, null));
// update cache
cache.remove(jobId, checkpointName);
} catch (Exception e) {
if (e instanceof ExecutionException) {
throw new CheckpointStorageException(
String.format(
"Failed to delete checkpoint %s for job %s, "
+ "pipeline %s",
checkpointId, jobId, pipelineId),
e);
}
log.error(
"Failed to delete checkpoint {} for job {}, pipeline {}",
checkpointId,
Expand Down Expand Up @@ -446,9 +464,20 @@ public void deleteCheckpoint(String jobId, String pipelineId, List<String> check
try {
String storeUniqueKey = generateUniqueKeyForKafka(jobId, checkpointName);
byte[] key = serializer.serialize(storeUniqueKey);
// persist data
producer.send(
new ProducerRecord<>(kafkaConfiguration.getStorageTopic(), key, null));
// update cache
cache.remove(jobId, checkpointName);
} catch (Exception e) {
if (e instanceof ExecutionException) {
throw new CheckpointStorageException(
String.format(
"Failed to delete checkpoint %s for job %s, "
+ "pipeline %s",
checkpointId, jobId, pipelineId),
e);
}
log.error(
"Failed to delete checkpoint {} for job {}, pipeline {}",
checkpointId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
package org.apache.seatunnel.engine.checkpoint.storage.kafka.common;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class Table<R, C, V> {

private Map<R, Map<C, V>> table = new HashMap<>();
private ConcurrentMap<R, ConcurrentMap<C, V>> table = new ConcurrentHashMap<>();

public V put(R row, C column, V value) {
Map<C, V> columns = table.get(row);
ConcurrentMap<C, V> columns = table.get(row);
if (columns == null) {
columns = new HashMap<>();
columns = new ConcurrentHashMap<>();
table.put(row, columns);
}
return columns.put(column, value);
Expand Down
4 changes: 4 additions & 0 deletions tools/dependencies/known-dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
accessors-smart-2.4.7.jar
asm-9.1.jar
json-smart-2.4.7.jar
kafka-clients-3.4.1.jar
lz4-java-1.8.0.jar
snappy-java-1.1.8.4.jar
zstd-jni-1.5.2-1.jar

0 comments on commit 1cd62fd

Please sign in to comment.