Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Nov 8, 2023
1 parent a0cd9ca commit 8bc404b
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-base</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>flink-connector-base</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,10 @@
*/
public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T>
implements
CheckpointedFunction {
CheckpointedFunction {

private static final Logger LOG = LoggerFactory.getLogger(FlinkTubeMQConsumer.class);
private static final String TUBE_OFFSET_STATE = "tube-offset-state";
private static final String SPLIT_COMMA = ",";
private static final String SPLIT_COLON = ":";

/**
* The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
Expand All @@ -82,9 +80,9 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T>
private final String topic;

/**
* The tubemq consumers use this tid set to filter records reading from server.
* The tubemq consumers use this streamId set to filter records reading from server.
*/
private final TreeSet<String> tidSet;
private final TreeSet<String> streamIdSet;

/**
* The consumer group name.
Expand Down Expand Up @@ -130,7 +128,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T>
/**
* The current offsets of partitions which are stored in {@link #offsetsState}
* once a checkpoint is triggered.
*
* <p>
* NOTE: The offsets are populated in the main thread and saved in the
* checkpoint thread. Its usage must be guarded by the checkpoint lock.</p>
*/
Expand All @@ -147,33 +145,33 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T>
/**
* Build a TubeMQ source function
*
* @param masterAddress the master address of TubeMQ
* @param topic the topic name
* @param tidSet the topic's filter condition items
* @param consumerGroup the consumer group name
* @param masterAddress the master address of TubeMQ
* @param topic the topic name
* @param streamIdSet the topic's filter condition items
* @param consumerGroup the consumer group name
* @param deserializationSchema the deserialize schema
* @param configuration the configure
* @param sessionKey the tube session key
* @param configuration the configure
* @param sessionKey the tube session key
*/
public FlinkTubeMQConsumer(
String masterAddress,
String topic,
TreeSet<String> tidSet,
TreeSet<String> streamIdSet,
String consumerGroup,
DeserializationSchema<T> deserializationSchema,
Configuration configuration,
String sessionKey,
Boolean innerFormat) {
checkNotNull(masterAddress, "The master address must not be null.");
checkNotNull(topic, "The topic must not be null.");
checkNotNull(tidSet, "The tid set must not be null.");
checkNotNull(streamIdSet, "The streamId set must not be null.");
checkNotNull(consumerGroup, "The consumer group must not be null.");
checkNotNull(deserializationSchema, "The deserialization schema must not be null.");
checkNotNull(configuration, "The configuration must not be null.");

this.masterAddress = masterAddress;
this.topic = topic;
this.tidSet = tidSet;
this.streamIdSet = streamIdSet;
this.consumerGroup = consumerGroup;
this.deserializationSchema = deserializationSchema;
this.sessionKey = sessionKey;
Expand Down Expand Up @@ -210,6 +208,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
@Override
public void open(Configuration parameters) throws Exception {

deserializationSchema.open(null);
ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, consumerGroup);
consumerConfig.setConsumePosition(consumeFromMax
? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET
Expand All @@ -220,7 +219,7 @@ public void open(Configuration parameters) throws Exception {
final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
messagePullConsumer.subscribe(topic, tidSet);
messagePullConsumer.subscribe(topic, streamIdSet);
String jobId = getRuntimeContext().getJobId().toString();
messagePullConsumer.completeSubscribe(sessionKey.concat(jobId), numTasks, true, currentOffsets);

Expand Down Expand Up @@ -305,7 +304,9 @@ private Instant getRecords(Instant lastConsumeInstant, List<Message> messageList
rowDataList.forEach(data -> records.add((T) data));
}
}

return lastConsumeInstant;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,31 @@

package org.apache.inlong.sort.tubemq.table;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.tubemq.corebase.Message;

import com.google.common.base.Objects;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class DynamicTubeMQDeserializationSchema implements DeserializationSchema<RowData> {

private static final Logger LOG = LoggerFactory.getLogger(DynamicTubeMQDeserializationSchema.class);
/**
* data buffer message
*/
Expand All @@ -53,15 +62,28 @@ public class DynamicTubeMQDeserializationSchema implements DeserializationSchema
*/
private final boolean ignoreErrors;

private SourceMetricData sourceMetricData;

private MetricOption metricOption;

public DynamicTubeMQDeserializationSchema(
DeserializationSchema<RowData> schema,
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
boolean ignoreErrors) {
boolean ignoreErrors,
MetricOption metricOption) {
this.deserializationSchema = schema;
this.metadataConverters = metadataConverters;
this.producedTypeInfo = producedTypeInfo;
this.ignoreErrors = ignoreErrors;
this.metricOption = metricOption;
}

@Override
public void open(InitializationContext context) {
if (metricOption != null) {
sourceMetricData = new SourceMetricData(metricOption);
}
}

@Override
Expand All @@ -71,7 +93,10 @@ public RowData deserialize(byte[] bytes) throws IOException {

@Override
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
deserializationSchema.deserialize(message, out);
List<RowData> rows = new ArrayList<>();
deserializationSchema.deserialize(message,
new MetricsCollector<>(new ListCollector<>(rows), sourceMetricData));
rows.forEach(out::collect);
}

@Override
Expand All @@ -95,7 +120,7 @@ public boolean equals(Object o) {
DynamicTubeMQDeserializationSchema that = (DynamicTubeMQDeserializationSchema) o;
return ignoreErrors == that.ignoreErrors
&& Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()),
Arrays.stream(that.metadataConverters).collect(Collectors.toList()))
Arrays.stream(that.metadataConverters).collect(Collectors.toList()))
&& Objects.equal(deserializationSchema, that.deserializationSchema)
&& Objects.equal(producedTypeInfo, that.producedTypeInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@
package org.apache.inlong.sort.tubemq.table;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.format.Format;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;

Expand All @@ -41,6 +46,9 @@
import java.util.TreeSet;

import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
Expand Down Expand Up @@ -68,13 +76,18 @@ private static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFo
.orElseGet(() -> helper.discoverDecodingFormat(DeserializationFormatFactory.class, FORMAT));
}

private static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
TableFactoryHelper helper) {
return helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, FORMAT)
.orElseGet(() -> helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT));
}

private static void validatePKConstraints(
ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
if (catalogTable.getSchema().getPrimaryKey().isPresent()
&& format.getChangelogMode().containsOnly(RowKind.INSERT)) {
Configuration options = Configuration.fromMap(catalogTable.getOptions());
String formatName = options.getOptional(FORMAT).orElse(options.get(FORMAT));
innerFormat = INNERFORMATTYPE.equals(formatName);
throw new ValidationException(String.format(
"The TubeMQ table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
+ " on the table, because it can't guarantee the semantic of primary key.",
Expand Down Expand Up @@ -110,10 +123,15 @@ public DynamicTableSource createDynamicTableSource(Context context) {
helper.validateExcept(INNERFORMATTYPE);

validatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);
innerFormat = INNERFORMATTYPE.equals(tableOptions.get(FORMAT));

final Configuration properties = getTubeMQProperties(context.getCatalogTable().getOptions());

final DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
final DataType physicalDataType = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();

String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
String auditKeys = tableOptions.get(AUDIT_KEYS);

return createTubeMQTableSource(
physicalDataType,
Expand All @@ -123,31 +141,40 @@ public DynamicTableSource createDynamicTableSource(Context context) {
TubeMQOptions.getTiSet(tableOptions),
TubeMQOptions.getConsumerGroup(tableOptions),
TubeMQOptions.getSessionKey(tableOptions),
properties);
properties,
inlongMetric,
auditHostAndPorts,
auditKeys);
}

protected TubeMQTableSource createTubeMQTableSource(
DataType physicalDataType,
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
String topic,
String url,
TreeSet<String> tid,
TreeSet<String> streamId,
String consumerGroup,
String sessionKey,
Configuration properties) {
Configuration properties,
String inlongMetric,
String auditHostAndPorts,
String auditKeys) {
return new TubeMQTableSource(
physicalDataType,
valueDecodingFormat,
url,
topic,
tid,
streamId,
consumerGroup,
sessionKey,
properties,
null,
null,
false,
innerFormat);
innerFormat,
inlongMetric,
auditHostAndPorts,
auditKeys);
}

@Override
Expand All @@ -172,6 +199,10 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SESSION_KEY);
options.add(BOOTSTRAP_FROM_MAX);
options.add(TOPIC_PATTERN);
options.add(AUDIT_KEYS);
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
return options;
}

}
Loading

0 comments on commit 8bc404b

Please sign in to comment.