From c881667e9569fc1c97bd70d0ed761cd1301922bb Mon Sep 17 00:00:00 2001 From: vernedeng Date: Thu, 15 Aug 2024 16:09:59 +0800 Subject: [PATCH] [INLONG-10791][Sort] Support inlong dirty sink --- inlong-sort/sort-flink/base/pom.xml | 7 + .../dirty/sink/sdk/InlongSdkDirtySink.java | 178 ++++++++++++++++++ .../sink/sdk/InlongSdkDirtySinkFactory.java | 126 +++++++++++++ .../base/dirty/sink/sdk/InlongSdkOptions.java | 51 +++++ .../org.apache.flink.table.factories.Factory | 3 +- 5 files changed, 364 insertions(+), 1 deletion(-) create mode 100644 inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java create mode 100644 inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java create mode 100644 inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java diff --git a/inlong-sort/sort-flink/base/pom.xml b/inlong-sort/sort-flink/base/pom.xml index a98220f178e..12b10fe19d7 100644 --- a/inlong-sort/sort-flink/base/pom.xml +++ b/inlong-sort/sort-flink/base/pom.xml @@ -56,6 +56,13 @@ ${project.version} provided + + + org.apache.inlong + dataproxy-sdk + ${project.version} + compile + diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java new file mode 100644 index 00000000000..27f6fea73ff --- /dev/null +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.dirty.sink.sdk; + +import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; +import org.apache.inlong.sdk.dataproxy.MessageSender; +import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; +import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; +import org.apache.inlong.sdk.dataproxy.common.SendResult; +import org.apache.inlong.sort.base.dirty.DirtyData; +import org.apache.inlong.sort.base.dirty.DirtyType; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import org.apache.inlong.sort.base.dirty.utils.FormatUtils; +import org.apache.inlong.sort.base.util.LabelUtils; + +import com.google.common.base.Preconditions; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.formats.json.RowDataToJsonConverters; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.StringJoiner; + +@Slf4j +public class InlongSdkDirtySink implements DirtySink { + + private final InlongSdkOptions options; + private final DataType physicalRowDataType; + private final String inlongGroupId; + private final String inlongStreamId; + private final SendMessageCallback callback; + + private transient DateTimeFormatter dateTimeFormatter; + private transient RowData.FieldGetter[] fieldGetters; + private transient RowDataToJsonConverters.RowDataToJsonConverter converter; + private transient MessageSender sender; + + public InlongSdkDirtySink(InlongSdkOptions options, DataType physicalRowDataType) { + this.options = options; + this.physicalRowDataType = physicalRowDataType; + this.inlongGroupId = options.getInlongGroupId(); + this.inlongStreamId = options.getInlongStreamId(); + this.callback = new LogCallBack(); + } + + @Override + public void invoke(DirtyData dirtyData) throws Exception { + try { + Map labelMap = LabelUtils.parseLabels(dirtyData.getLabels()); + String groupId = Preconditions.checkNotNull(labelMap.get("groupId")); + String streamId = Preconditions.checkNotNull(labelMap.get("streamId")); + + String message = join(groupId, streamId, + dirtyData.getDirtyType(), dirtyData.getLabels(), formatData(dirtyData, labelMap)); + sender.asyncSendMessage(inlongGroupId, inlongStreamId, message.getBytes(), callback); + } catch (Throwable t) { + log.error("failed to send dirty message to inlong sdk", t); + } + } + + @Override + public void open(Configuration configuration) throws Exception { + converter = FormatUtils.parseRowDataToJsonConverter(physicalRowDataType.getLogicalType()); + fieldGetters = FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType()); + dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + // init sender + ProxyClientConfig proxyClientConfig = + new ProxyClientConfig(options.getInlongManagerAddr(), options.getInlongGroupId(), + options.getInlongManagerAuthId(), options.getInlongManagerAuthKey()); + sender = DefaultMessageSender.generateSenderByClusterId(proxyClientConfig); + } + + @Override + public void close() throws Exception { + if (sender != null) { + sender.close(); + } + } + + private String join( + String inlongGroup, + String inlongStream, + DirtyType type, + String label, + String formattedData) { + + String now = LocalDateTime.now().format(dateTimeFormatter); + + StringJoiner joiner = new StringJoiner(options.getCsvFieldDelimiter()); + return joiner.add(inlongGroup + "." + inlongStream) + .add(now) + .add(type.name()) + .add(label) + .add(formattedData).toString(); + } + + private String formatData(DirtyData dirtyData, Map labels) throws JsonProcessingException { + String value; + T data = dirtyData.getData(); + if (data instanceof RowData) { + value = formatData((RowData) data, dirtyData.getRowType(), labels); + } else { + value = data.toString(); + } + return value; + } + + private String formatData(RowData data, LogicalType rowType, + Map labels) throws JsonProcessingException { + String value; + switch (options.getFormat()) { + case "csv": + RowData.FieldGetter[] getters = fieldGetters; + if (rowType != null) { + getters = FormatUtils.parseFieldGetters(rowType); + } + value = FormatUtils.csvFormat(data, getters, labels, options.getCsvFieldDelimiter()); + break; + case "json": + RowDataToJsonConverters.RowDataToJsonConverter jsonConverter = converter; + if (rowType != null) { + jsonConverter = FormatUtils.parseRowDataToJsonConverter(rowType); + } + value = FormatUtils.jsonFormat(data, jsonConverter, labels); + break; + default: + throw new UnsupportedOperationException( + String.format("Unsupported format for: %s", options.getFormat())); + } + return value; + } + + class LogCallBack implements SendMessageCallback { + + @Override + public void onMessageAck(SendResult result) { + if (result == SendResult.OK) { + return; + } + log.error("failed to send inlong dirty message, response={}", result); + + if (!options.isIgnoreSideOutputErrors()) { + throw new RuntimeException("writing dirty message to inlong sdk failed, response=" + result); + } + } + + @Override + public void onException(Throwable e) { + log.error("failed to send inlong dirty message", e); + + if (!options.isIgnoreSideOutputErrors()) { + throw new RuntimeException("writing dirty message to inlong sdk failed", e); + } + } + } +} diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java new file mode 100644 index 00000000000..000836b6675 --- /dev/null +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.dirty.sink.sdk; + +import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER; +import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT; +import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS; +import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE; + +public class InlongSdkDirtySinkFactory implements DirtySinkFactory { + + private static final String IDENTIFIER = "inlong-sdk"; + + private static final ConfigOption DIRTY_SIDE_OUTPUT_INLONG_MANAGER = + ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-manager-addr") + .stringType() + .noDefaultValue() + .withDescription("The inlong manager addr to init inlong sdk"); + + private static final ConfigOption DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID = + ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-auth-id") + .stringType() + .noDefaultValue() + .withDescription("The inlong manager auth id to init inlong sdk"); + + private static final ConfigOption DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY = + ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-auth-key") + .stringType() + .noDefaultValue() + .withDescription("The inlong manager auth id to init inlong sdk"); + + private static final ConfigOption DIRTY_SIDE_OUTPUT_INLONG_GROUP = + ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-group-id") + .stringType() + .noDefaultValue() + .withDescription("The inlong group id of dirty sink"); + + private static final ConfigOption DIRTY_SIDE_OUTPUT_INLONG_STREAM = + ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-stream-id") + .stringType() + .noDefaultValue() + .withDescription("The inlong stream id of dirty sink"); + + @Override + public DirtySink createDirtySink(DynamicTableFactory.Context context) { + ReadableConfig config = Configuration.fromMap(context.getCatalogTable().getOptions()); + FactoryUtil.validateFactoryOptions(this, config); + validate(config); + return new InlongSdkDirtySink<>(getOptions(config), + context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType()); + } + + private void validate(ReadableConfig config) { + String identifier = config.getOptional(DIRTY_IDENTIFIER).orElse(null); + if (identifier == null || identifier.trim().isEmpty()) { + throw new ValidationException( + "The option 'dirty.identifier' is not allowed to be empty."); + } + } + + private InlongSdkOptions getOptions(ReadableConfig config) { + return InlongSdkOptions.builder() + .inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER)) + .inlongGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP)) + .inlongStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM)) + .inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY)) + .inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID)) + .ignoreSideOutputErrors(config.getOptional(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS).orElse(true)) + .enableDirtyLog(true) + .build(); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + final Set> options = new HashSet<>(); + options.add(DIRTY_SIDE_OUTPUT_INLONG_MANAGER); + options.add(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID); + options.add(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY); + options.add(DIRTY_SIDE_OUTPUT_INLONG_GROUP); + options.add(DIRTY_SIDE_OUTPUT_INLONG_STREAM); + return options; + } + + @Override + public Set> optionalOptions() { + final Set> options = new HashSet<>(); + options.add(DIRTY_SIDE_OUTPUT_FORMAT); + options.add(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS); + options.add(DIRTY_SIDE_OUTPUT_LOG_ENABLE); + return options; + } +} diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java new file mode 100644 index 00000000000..0692d78580b --- /dev/null +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.dirty.sink.sdk; + +import lombok.Builder; +import lombok.Data; +import lombok.Getter; + +import java.io.Serializable; + +@Data +@Builder +@Getter +public class InlongSdkOptions implements Serializable { + + private static final String DEFAULT_FORMAT = "csv"; + + private static final String DEFAULT_CSV_FIELD_DELIMITER = ","; + private static final String DEFAULT_CSV_LINE_DELIMITER = "\n"; + + private static final String DEFAULT_KV_FIELD_DELIMITER = "&"; + private static final String DEFAULT_KV_ENTRY_DELIMITER = "="; + + private String inlongGroupId; + private String inlongStreamId; + private String inlongManagerAddr; + private String inlongManagerAuthKey; + private String inlongManagerAuthId; + private String format = DEFAULT_FORMAT; + private boolean ignoreSideOutputErrors; + private boolean enableDirtyLog; + private String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER; + private String csvLineDelimiter = DEFAULT_CSV_LINE_DELIMITER; + private String kvFieldDelimiter = DEFAULT_KV_FIELD_DELIMITER; + private String kvEntryDelimiter = DEFAULT_KV_ENTRY_DELIMITER; +} diff --git a/inlong-sort/sort-flink/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index a83e4d025c3..f4effff2534 100644 --- a/inlong-sort/sort-flink/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/inlong-sort/sort-flink/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -14,4 +14,5 @@ # limitations under the License. org.apache.inlong.sort.base.dirty.sink.log.LogDirtySinkFactory -org.apache.inlong.sort.base.dirty.sink.s3.S3DirtySinkFactory \ No newline at end of file +org.apache.inlong.sort.base.dirty.sink.s3.S3DirtySinkFactory +org.apache.inlong.sort.base.dirty.sink.sdk.InlongSdkDirtySinkFactory \ No newline at end of file