/**
* The current offsets of partitions which are stored in {@link #offsetsState}
* once a checkpoint is triggered.
- *
+ *
* NOTE: The offsets are populated in the main thread and saved in the
* checkpoint thread. Its usage must be guarded by the checkpoint lock.
*/
@@ -147,18 +145,18 @@ public class FlinkTubeMQConsumer extends RichParallelSourceFunction
/**
* Build a TubeMQ source function
*
- * @param masterAddress the master address of TubeMQ
- * @param topic the topic name
- * @param tidSet the topic's filter condition items
- * @param consumerGroup the consumer group name
+ * @param masterAddress the master address of TubeMQ
+ * @param topic the topic name
+ * @param streamIdSet the topic's filter condition items
+ * @param consumerGroup the consumer group name
* @param deserializationSchema the deserialize schema
- * @param configuration the configure
- * @param sessionKey the tube session key
+ * @param configuration the configure
+ * @param sessionKey the tube session key
*/
public FlinkTubeMQConsumer(
String masterAddress,
String topic,
- TreeSet tidSet,
+ TreeSet streamIdSet,
String consumerGroup,
DeserializationSchema deserializationSchema,
Configuration configuration,
@@ -166,14 +164,14 @@ public FlinkTubeMQConsumer(
Boolean innerFormat) {
checkNotNull(masterAddress, "The master address must not be null.");
checkNotNull(topic, "The topic must not be null.");
- checkNotNull(tidSet, "The tid set must not be null.");
+ checkNotNull(streamIdSet, "The streamId set must not be null.");
checkNotNull(consumerGroup, "The consumer group must not be null.");
checkNotNull(deserializationSchema, "The deserialization schema must not be null.");
checkNotNull(configuration, "The configuration must not be null.");
this.masterAddress = masterAddress;
this.topic = topic;
- this.tidSet = tidSet;
+ this.streamIdSet = streamIdSet;
this.consumerGroup = consumerGroup;
this.deserializationSchema = deserializationSchema;
this.sessionKey = sessionKey;
@@ -210,6 +208,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
@Override
public void open(Configuration parameters) throws Exception {
+ deserializationSchema.open(null);
ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, consumerGroup);
consumerConfig.setConsumePosition(consumeFromMax
? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET
@@ -220,7 +219,7 @@ public void open(Configuration parameters) throws Exception {
final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
- messagePullConsumer.subscribe(topic, tidSet);
+ messagePullConsumer.subscribe(topic, streamIdSet);
String jobId = getRuntimeContext().getJobId().toString();
messagePullConsumer.completeSubscribe(sessionKey.concat(jobId), numTasks, true, currentOffsets);
@@ -305,7 +304,9 @@ private Instant getRecords(Instant lastConsumeInstant, List messageList
rowDataList.forEach(data -> records.add((T) data));
}
}
+
return lastConsumeInstant;
+
}
@Override
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
index f5880f1a784..bb90c12774e 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
@@ -17,22 +17,31 @@
package org.apache.inlong.sort.tubemq.table;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.tubemq.corebase.Message;
import com.google.common.base.Objects;
+import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.stream.Collectors;
public class DynamicTubeMQDeserializationSchema implements DeserializationSchema {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicTubeMQDeserializationSchema.class);
/**
* data buffer message
*/
@@ -53,15 +62,28 @@ public class DynamicTubeMQDeserializationSchema implements DeserializationSchema
*/
private final boolean ignoreErrors;
+ private SourceMetricData sourceMetricData;
+
+ private MetricOption metricOption;
+
public DynamicTubeMQDeserializationSchema(
DeserializationSchema schema,
MetadataConverter[] metadataConverters,
TypeInformation producedTypeInfo,
- boolean ignoreErrors) {
+ boolean ignoreErrors,
+ MetricOption metricOption) {
this.deserializationSchema = schema;
this.metadataConverters = metadataConverters;
this.producedTypeInfo = producedTypeInfo;
this.ignoreErrors = ignoreErrors;
+ this.metricOption = metricOption;
+ }
+
+ @Override
+ public void open(InitializationContext context) {
+ if (metricOption != null) {
+ sourceMetricData = new SourceMetricData(metricOption);
+ }
}
@Override
@@ -71,7 +93,10 @@ public RowData deserialize(byte[] bytes) throws IOException {
@Override
public void deserialize(byte[] message, Collector out) throws IOException {
- deserializationSchema.deserialize(message, out);
+ List rows = new ArrayList<>();
+ deserializationSchema.deserialize(message,
+ new MetricsCollector<>(new ListCollector<>(rows), sourceMetricData));
+ rows.forEach(out::collect);
}
@Override
@@ -95,7 +120,7 @@ public boolean equals(Object o) {
DynamicTubeMQDeserializationSchema that = (DynamicTubeMQDeserializationSchema) o;
return ignoreErrors == that.ignoreErrors
&& Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()),
- Arrays.stream(that.metadataConverters).collect(Collectors.toList()))
+ Arrays.stream(that.metadataConverters).collect(Collectors.toList()))
&& Objects.equal(deserializationSchema, that.deserializationSchema)
&& Objects.equal(producedTypeInfo, that.producedTypeInfo);
}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index 43fb3e198e2..b27d02dc4c2 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.tubemq.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
@@ -25,13 +26,17 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
@@ -41,6 +46,9 @@
import java.util.TreeSet;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
@@ -68,13 +76,18 @@ private static DecodingFormat> getValueDecodingFo
.orElseGet(() -> helper.discoverDecodingFormat(DeserializationFormatFactory.class, FORMAT));
}
+ private static EncodingFormat> getValueEncodingFormat(
+ TableFactoryHelper helper) {
+ return helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, FORMAT)
+ .orElseGet(() -> helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT));
+ }
+
private static void validatePKConstraints(
ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
if (catalogTable.getSchema().getPrimaryKey().isPresent()
&& format.getChangelogMode().containsOnly(RowKind.INSERT)) {
Configuration options = Configuration.fromMap(catalogTable.getOptions());
String formatName = options.getOptional(FORMAT).orElse(options.get(FORMAT));
- innerFormat = INNERFORMATTYPE.equals(formatName);
throw new ValidationException(String.format(
"The TubeMQ table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
+ " on the table, because it can't guarantee the semantic of primary key.",
@@ -110,10 +123,15 @@ public DynamicTableSource createDynamicTableSource(Context context) {
helper.validateExcept(INNERFORMATTYPE);
validatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);
+ innerFormat = INNERFORMATTYPE.equals(tableOptions.get(FORMAT));
final Configuration properties = getTubeMQProperties(context.getCatalogTable().getOptions());
- final DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
+ final DataType physicalDataType = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
+
+ String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
+ String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+ String auditKeys = tableOptions.get(AUDIT_KEYS);
return createTubeMQTableSource(
physicalDataType,
@@ -123,7 +141,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {
TubeMQOptions.getTiSet(tableOptions),
TubeMQOptions.getConsumerGroup(tableOptions),
TubeMQOptions.getSessionKey(tableOptions),
- properties);
+ properties,
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
}
protected TubeMQTableSource createTubeMQTableSource(
@@ -131,23 +152,29 @@ protected TubeMQTableSource createTubeMQTableSource(
DecodingFormat> valueDecodingFormat,
String topic,
String url,
- TreeSet tid,
+ TreeSet streamId,
String consumerGroup,
String sessionKey,
- Configuration properties) {
+ Configuration properties,
+ String inlongMetric,
+ String auditHostAndPorts,
+ String auditKeys) {
return new TubeMQTableSource(
physicalDataType,
valueDecodingFormat,
url,
topic,
- tid,
+ streamId,
consumerGroup,
sessionKey,
properties,
null,
null,
false,
- innerFormat);
+ innerFormat,
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
}
@Override
@@ -172,6 +199,10 @@ public Set> optionalOptions() {
options.add(SESSION_KEY);
options.add(BOOTSTRAP_FROM_MAX);
options.add(TOPIC_PATTERN);
+ options.add(AUDIT_KEYS);
+ options.add(INLONG_METRIC);
+ options.add(INLONG_AUDIT);
return options;
}
+
}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
index c2642fd3513..42aec9cc9ad 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.tubemq.table;
+import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer;
import org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema.MetadataConverter;
import org.apache.inlong.tubemq.corebase.Message;
@@ -40,6 +41,8 @@
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -62,6 +65,8 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada
private static final String VALUE_METADATA_PREFIX = "value.";
+ private static final Logger LOG = LoggerFactory.getLogger(TubeMQTableSource.class);
+
// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------
@@ -84,9 +89,9 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada
*/
private final String topic;
/**
- * The TubeMQ tid filter collection.
+ * The TubeMQ streamId filter collection.
*/
- private final TreeSet tidSet;
+ private final TreeSet streamIdSet;
/**
* The TubeMQ consumer group name.
*/
@@ -120,6 +125,11 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada
* Metadata that is appended at the end of a physical source row.
*/
protected List metadataKeys;
+
+ private String inlongMetric;
+ private String auditHostAndPorts;
+ private String auditKeys;
+
/**
* Watermark strategy that is used to generate per-partition watermark.
*/
@@ -127,17 +137,18 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada
private WatermarkStrategy watermarkStrategy;
public TubeMQTableSource(DataType physicalDataType,
- DecodingFormat> valueDecodingFormat,
- String masterAddress, String topic,
- TreeSet tidSet, String consumerGroup, String sessionKey,
- Configuration configuration, @Nullable WatermarkStrategy watermarkStrategy,
- Optional proctimeAttribute, Boolean ignoreErrors, Boolean innerFormat) {
+ DecodingFormat> valueDecodingFormat,
+ String masterAddress, String topic,
+ TreeSet streamIdSet, String consumerGroup, String sessionKey,
+ Configuration configuration, @Nullable WatermarkStrategy watermarkStrategy,
+ Optional proctimeAttribute, Boolean ignoreErrors, Boolean innerFormat,
+ String inlongMetric, String auditHostAndPorts, String auditKeys) {
Preconditions.checkNotNull(physicalDataType, "Physical data type must not be null.");
Preconditions.checkNotNull(valueDecodingFormat, "The deserialization schema must not be null.");
Preconditions.checkNotNull(masterAddress, "The master address must not be null.");
Preconditions.checkNotNull(topic, "The topic must not be null.");
- Preconditions.checkNotNull(tidSet, "The tid set must not be null.");
+ Preconditions.checkNotNull(streamIdSet, "The streamId set must not be null.");
Preconditions.checkNotNull(consumerGroup, "The consumer group must not be null.");
Preconditions.checkNotNull(configuration, "The configuration must not be null.");
@@ -147,7 +158,7 @@ public TubeMQTableSource(DataType physicalDataType,
this.valueDecodingFormat = valueDecodingFormat;
this.masterAddress = masterAddress;
this.topic = topic;
- this.tidSet = tidSet;
+ this.streamIdSet = streamIdSet;
this.consumerGroup = consumerGroup;
this.sessionKey = sessionKey;
this.configuration = configuration;
@@ -155,6 +166,9 @@ public TubeMQTableSource(DataType physicalDataType,
this.proctimeAttribute = proctimeAttribute;
this.ignoreErrors = ignoreErrors;
this.innerFormat = innerFormat;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.auditKeys = auditKeys;
}
@Override
@@ -167,6 +181,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
final LogicalType physicalType = physicalDataType.getLogicalType();
final int physicalFieldCount = LogicalTypeChecks.getFieldCount(physicalType);
final IntStream physicalFields = IntStream.range(0, physicalFieldCount);
+
final DeserializationSchema deserialization = createDeserialization(context,
valueDecodingFormat, physicalFields.toArray(), null);
@@ -182,8 +197,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
public DynamicTableSource copy() {
return new TubeMQTableSource(
physicalDataType, valueDecodingFormat, masterAddress,
- topic, tidSet, consumerGroup, sessionKey, configuration,
- watermarkStrategy, proctimeAttribute, ignoreErrors, innerFormat);
+ topic, streamIdSet, consumerGroup, sessionKey, configuration,
+ watermarkStrategy, proctimeAttribute, ignoreErrors, innerFormat,
+ inlongMetric, auditHostAndPorts, auditKeys);
}
@Override
@@ -247,7 +263,7 @@ public boolean equals(Object o) {
&& Objects.equals(valueDecodingFormat, that.valueDecodingFormat)
&& Objects.equals(masterAddress, that.masterAddress)
&& Objects.equals(topic, that.topic)
- && Objects.equals(String.valueOf(tidSet), String.valueOf(that.tidSet))
+ && Objects.equals(String.valueOf(streamIdSet), String.valueOf(that.streamIdSet))
&& Objects.equals(consumerGroup, that.consumerGroup)
&& Objects.equals(proctimeAttribute, that.proctimeAttribute)
&& Objects.equals(watermarkStrategy, that.watermarkStrategy);
@@ -260,7 +276,7 @@ public int hashCode() {
valueDecodingFormat,
masterAddress,
topic,
- tidSet,
+ streamIdSet,
consumerGroup,
configuration,
watermarkStrategy,
@@ -273,7 +289,7 @@ public int hashCode() {
@Nullable
private DeserializationSchema createDeserialization(
- DynamicTableSource.Context context,
+ Context context,
@Nullable DecodingFormat> format,
int[] projection,
@Nullable String prefix) {
@@ -299,10 +315,17 @@ protected FlinkTubeMQConsumer createTubeMQConsumer(
.orElseThrow(IllegalStateException::new))
.map(m -> m.converter)
.toArray(MetadataConverter[]::new);
+
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
+ .build();
+
final DeserializationSchema tubeMQDeserializer = new DynamicTubeMQDeserializationSchema(
- deserialization, metadataConverters, producedTypeInfo, ignoreErrors);
+ deserialization, metadataConverters, producedTypeInfo, ignoreErrors, metricOption);
- final FlinkTubeMQConsumer tubeMQConsumer = new FlinkTubeMQConsumer(masterAddress, topic, tidSet,
+ final FlinkTubeMQConsumer tubeMQConsumer = new FlinkTubeMQConsumer(masterAddress, topic, streamIdSet,
consumerGroup, tubeMQDeserializer, configuration, sessionKey, innerFormat);
return tubeMQConsumer;
}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
index aec6e19919c..7e6b6398c88 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
@@ -51,6 +51,12 @@
org.apache.inlong
sort-connector-base
${project.version}
+
+
+ flink-connector-base
+ org.apache.flink
+
+