diff --git a/docs/en/connector-v2/sink/Console.md b/docs/en/connector-v2/sink/Console.md index fd7623d7d38..55df281b275 100644 --- a/docs/en/connector-v2/sink/Console.md +++ b/docs/en/connector-v2/sink/Console.md @@ -14,14 +14,24 @@ Used to send data to Console. Both support streaming and batch mode. ## Options -| name | type | required | default value | -|----------------|------|----------|---------------| -| common-options | | no | - | +| name | type | required | default value | +|--------------------|---------|----------|---------------| +| common-options | | no | - | +| log.print.data | boolean | no | yes | +| log.print.delay.ms | int | no | 0 | ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details +### log.print.data + +Flag to determine whether data should be printed in the logs. The default value is `true`. + +### log.print.delay.ms + +Delay in milliseconds between printing each data item to the logs. The default value is `0`. + ## Example simple: diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java index bc80c664288..d076cd5367b 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java @@ -51,6 +51,12 @@ public interface EnvCommonOptions { .withDescription( "The interval (in milliseconds) between two consecutive checkpoints."); + Option CHECKPOINT_TIMEOUT = + Options.key("checkpoint.timeout") + .longType() + .noDefaultValue() + .withDescription("The timeout (in milliseconds) for a checkpoint."); + Option JARS = Options.key("jars") .stringType() diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java index 3a90b82e83b..09310f080c5 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java @@ -30,6 +30,7 @@ public static OptionRule getEnvOptionRules() { CommonOptions.PARALLELISM, EnvCommonOptions.JARS, EnvCommonOptions.CHECKPOINT_INTERVAL, + EnvCommonOptions.CHECKPOINT_TIMEOUT, EnvCommonOptions.CUSTOM_PARAMETERS) .build(); } diff --git a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java index 036a5d802f4..49957b99e21 100644 --- a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java +++ b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; @@ -30,13 +31,20 @@ import com.google.auto.service.AutoService; import lombok.NoArgsConstructor; +import static org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DATA; +import static org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DELAY; + @NoArgsConstructor @AutoService(SeaTunnelSink.class) public class ConsoleSink extends AbstractSimpleSink { private SeaTunnelRowType seaTunnelRowType; + private boolean isPrintData = true; + private int delayMs = 0; - public ConsoleSink(SeaTunnelRowType seaTunnelRowType) { + public ConsoleSink(SeaTunnelRowType seaTunnelRowType, ReadonlyConfig options) { this.seaTunnelRowType = seaTunnelRowType; + this.isPrintData = options.get(LOG_PRINT_DATA); + this.delayMs = options.get(LOG_PRINT_DELAY); } @Override @@ -51,7 +59,7 @@ public SeaTunnelDataType getConsumedType() { @Override public AbstractSinkWriter createWriter(SinkWriter.Context context) { - return new ConsoleSinkWriter(seaTunnelRowType, context); + return new ConsoleSinkWriter(seaTunnelRowType, context, isPrintData, delayMs); } @Override @@ -60,5 +68,8 @@ public String getPluginName() { } @Override - public void prepare(Config pluginConfig) {} + public void prepare(Config pluginConfig) { + this.isPrintData = ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DATA); + this.delayMs = ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DELAY); + } } diff --git a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java index 1e0450d66c7..5a66493aee5 100644 --- a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java +++ b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.console.sink; +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; @@ -27,6 +30,21 @@ @AutoService(Factory.class) public class ConsoleSinkFactory implements TableSinkFactory { + + public static final Option LOG_PRINT_DATA = + Options.key("log.print.data") + .booleanType() + .defaultValue(true) + .withDescription( + "Flag to determine whether data should be printed in the logs."); + + public static final Option LOG_PRINT_DELAY = + Options.key("log.print.delay.ms") + .intType() + .defaultValue(0) + .withDescription( + "Delay in milliseconds between printing each data item to the logs."); + @Override public String factoryIdentifier() { return "Console"; @@ -39,7 +57,10 @@ public OptionRule optionRule() { @Override public TableSink createSink(TableFactoryContext context) { + ReadonlyConfig options = context.getOptions(); return () -> - new ConsoleSink(context.getCatalogTable().getTableSchema().toPhysicalRowDataType()); + new ConsoleSink( + context.getCatalogTable().getTableSchema().toPhysicalRowDataType(), + options); } } diff --git a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java index 422a85b40d1..8b60386d7e6 100644 --- a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java +++ b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.commons.lang3.StringUtils; @@ -44,9 +45,18 @@ public class ConsoleSinkWriter extends AbstractSinkWriter { private final SinkWriter.Context context; private final DataTypeChangeEventHandler dataTypeChangeEventHandler; - public ConsoleSinkWriter(SeaTunnelRowType seaTunnelRowType, SinkWriter.Context context) { + boolean isPrintData = true; + int delayMs = 0; + + public ConsoleSinkWriter( + SeaTunnelRowType seaTunnelRowType, + SinkWriter.Context context, + boolean isPrintData, + int delayMs) { this.seaTunnelRowType = seaTunnelRowType; this.context = context; + this.isPrintData = isPrintData; + this.delayMs = delayMs; this.dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher(); log.info("output rowType: {}", fieldsInfo(seaTunnelRowType)); } @@ -67,13 +77,23 @@ public void write(SeaTunnelRow element) { for (int i = 0; i < fieldTypes.length; i++) { arr[i] = fieldToString(fieldTypes[i], fields[i]); } - log.info( - "subtaskIndex={} rowIndex={}: SeaTunnelRow#tableId={} SeaTunnelRow#kind={} : {}", - context.getIndexOfSubtask(), - rowCounter.incrementAndGet(), - element.getTableId(), - element.getRowKind(), - StringUtils.join(arr, ", ")); + if (isPrintData) { + log.info( + "subtaskIndex={} rowIndex={}: SeaTunnelRow#tableId={} SeaTunnelRow#kind={} : {}", + context.getIndexOfSubtask(), + rowCounter.incrementAndGet(), + element.getTableId(), + element.getRowKind(), + StringUtils.join(arr, ", ")); + } + if (delayMs > 0) { + try { + Thread.sleep(delayMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SeaTunnelException(e); + } + } } @Override diff --git a/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java b/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java index 0220c889629..e03c00c4959 100644 --- a/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java +++ b/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java @@ -48,7 +48,7 @@ void setUp() { String[] fieldNames = {}; SeaTunnelDataType[] fieldTypes = {}; SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames, fieldTypes); - consoleSinkWriter = new ConsoleSinkWriter(seaTunnelRowType, null); + consoleSinkWriter = new ConsoleSinkWriter(seaTunnelRowType, null, true, 0); } private Object fieldToStringTest(SeaTunnelDataType dataType, Object value) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index 7fb75064a4c..34aa7ee4f2d 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -265,7 +265,10 @@ private void setCheckpoint() { } } - if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) { + if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) { + long timeout = config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key()); + checkpointConfig.setCheckpointTimeout(timeout); + } else if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) { long timeout = config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT); checkpointConfig.setCheckpointTimeout(timeout); } diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index 4b5bef07cb0..583a1cf3e5c 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -265,7 +265,10 @@ private void setCheckpoint() { } } - if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) { + if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) { + long timeout = config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key()); + checkpointConfig.setCheckpointTimeout(timeout); + } else if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) { long timeout = config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT); checkpointConfig.setCheckpointTimeout(timeout); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java index 9f35f62fd60..c07f10fb1c9 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java @@ -20,7 +20,7 @@ public enum CheckpointCloseReason { PIPELINE_END("Pipeline turn to end state."), CHECKPOINT_EXPIRED( - "Checkpoint expired before completing. Please increase checkpoint timeout in the seatunnel.yaml"), + "Checkpoint expired before completing. Please increase checkpoint timeout in the seatunnel.yaml or jobConfig env."), CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."), CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."), CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."), diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 5137f23b7ba..9302c6f1893 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -278,6 +278,11 @@ private CheckpointConfig createJobCheckpointConfig( Long.parseLong( jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()).toString())); } + if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) { + jobCheckpointConfig.setCheckpointTimeout( + Long.parseLong( + jobEnv.get(EnvCommonOptions.CHECKPOINT_TIMEOUT.key()).toString())); + } return jobCheckpointConfig; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java new file mode 100644 index 00000000000..ed2b4dbb6d3 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.checkpoint; + +import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; +import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; +import org.apache.seatunnel.engine.core.job.JobImmutableInformation; +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest; +import org.apache.seatunnel.engine.server.TestUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.hazelcast.internal.serialization.Data; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; + +public class CheckpointTimeOutTest extends AbstractSeaTunnelServerTest { + + public static String CONF_PATH = "stream_fake_to_console_checkpointTimeOut.conf"; + public static long JOB_ID = System.currentTimeMillis(); + + @Test + public void testJobLevelCheckpointTimeOut() { + startJob(JOB_ID, CONF_PATH); + + await().atMost(120000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertTrue( + server.getCoordinatorService() + .getJobStatus(JOB_ID) + .equals(JobStatus.RUNNING)); + }); + + await().atMost(120000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertTrue( + server.getCoordinatorService() + .getJobStatus(JOB_ID) + .equals(JobStatus.FAILED)); + }); + } + + private void startJob(Long jobid, String path) { + LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobid.toString(), jobid); + + JobImmutableInformation jobImmutableInformation = + new JobImmutableInformation( + jobid, + "Test", + false, + nodeEngine.getSerializationService().toData(testLogicalDag), + testLogicalDag.getJobConfig(), + Collections.emptyList()); + + Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation); + + PassiveCompletableFuture voidPassiveCompletableFuture = + server.getCoordinatorService().submitJob(jobid, data); + voidPassiveCompletableFuture.join(); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_checkpointTimeOut.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_checkpointTimeOut.conf new file mode 100644 index 00000000000..2d541ac2acd --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_checkpointTimeOut.conf @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 1000 + checkpoint.timeout = 100 +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake1" + row.num = 1000 + split.num = 100 + split.read-interval = 3000 + parallelism = 1 + schema = { + fields { + name = "string" + age = "int" + } + } + parallelism = 1 + } +} + +transform { +} + +sink { + console { + log.print.delay.ms=5000 + } +} \ No newline at end of file