Skip to content

Commit

Permalink
[INLONG-10791][Sort] Support inlong dirty sink
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Aug 15, 2024
1 parent 8efeb14 commit c881667
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 1 deletion.
7 changes: 7 additions & 0 deletions inlong-sort/sort-flink/base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>dataproxy-sdk</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.base.dirty.sink.sdk;

import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.MessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sort.base.dirty.DirtyData;
import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
import org.apache.inlong.sort.base.util.LabelUtils;

import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.RowDataToJsonConverters;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.StringJoiner;

@Slf4j
public class InlongSdkDirtySink<T> implements DirtySink<T> {

private final InlongSdkOptions options;
private final DataType physicalRowDataType;
private final String inlongGroupId;
private final String inlongStreamId;
private final SendMessageCallback callback;

private transient DateTimeFormatter dateTimeFormatter;
private transient RowData.FieldGetter[] fieldGetters;
private transient RowDataToJsonConverters.RowDataToJsonConverter converter;
private transient MessageSender sender;

public InlongSdkDirtySink(InlongSdkOptions options, DataType physicalRowDataType) {
this.options = options;
this.physicalRowDataType = physicalRowDataType;
this.inlongGroupId = options.getInlongGroupId();
this.inlongStreamId = options.getInlongStreamId();
this.callback = new LogCallBack();
}

@Override
public void invoke(DirtyData<T> dirtyData) throws Exception {
try {
Map<String, String> labelMap = LabelUtils.parseLabels(dirtyData.getLabels());
String groupId = Preconditions.checkNotNull(labelMap.get("groupId"));
String streamId = Preconditions.checkNotNull(labelMap.get("streamId"));

String message = join(groupId, streamId,
dirtyData.getDirtyType(), dirtyData.getLabels(), formatData(dirtyData, labelMap));
sender.asyncSendMessage(inlongGroupId, inlongStreamId, message.getBytes(), callback);
} catch (Throwable t) {
log.error("failed to send dirty message to inlong sdk", t);
}
}

@Override
public void open(Configuration configuration) throws Exception {
converter = FormatUtils.parseRowDataToJsonConverter(physicalRowDataType.getLogicalType());
fieldGetters = FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType());
dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

// init sender
ProxyClientConfig proxyClientConfig =
new ProxyClientConfig(options.getInlongManagerAddr(), options.getInlongGroupId(),
options.getInlongManagerAuthId(), options.getInlongManagerAuthKey());
sender = DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
}

@Override
public void close() throws Exception {
if (sender != null) {
sender.close();
}
}

private String join(
String inlongGroup,
String inlongStream,
DirtyType type,
String label,
String formattedData) {

String now = LocalDateTime.now().format(dateTimeFormatter);

StringJoiner joiner = new StringJoiner(options.getCsvFieldDelimiter());
return joiner.add(inlongGroup + "." + inlongStream)
.add(now)
.add(type.name())
.add(label)
.add(formattedData).toString();
}

private String formatData(DirtyData<T> dirtyData, Map<String, String> labels) throws JsonProcessingException {
String value;
T data = dirtyData.getData();
if (data instanceof RowData) {
value = formatData((RowData) data, dirtyData.getRowType(), labels);
} else {
value = data.toString();
}
return value;
}

private String formatData(RowData data, LogicalType rowType,
Map<String, String> labels) throws JsonProcessingException {
String value;
switch (options.getFormat()) {
case "csv":
RowData.FieldGetter[] getters = fieldGetters;
if (rowType != null) {
getters = FormatUtils.parseFieldGetters(rowType);
}
value = FormatUtils.csvFormat(data, getters, labels, options.getCsvFieldDelimiter());
break;
case "json":
RowDataToJsonConverters.RowDataToJsonConverter jsonConverter = converter;
if (rowType != null) {
jsonConverter = FormatUtils.parseRowDataToJsonConverter(rowType);
}
value = FormatUtils.jsonFormat(data, jsonConverter, labels);
break;
default:
throw new UnsupportedOperationException(
String.format("Unsupported format for: %s", options.getFormat()));
}
return value;
}

class LogCallBack implements SendMessageCallback {

@Override
public void onMessageAck(SendResult result) {
if (result == SendResult.OK) {
return;
}
log.error("failed to send inlong dirty message, response={}", result);

if (!options.isIgnoreSideOutputErrors()) {
throw new RuntimeException("writing dirty message to inlong sdk failed, response=" + result);
}
}

@Override
public void onException(Throwable e) {
log.error("failed to send inlong dirty message", e);

if (!options.isIgnoreSideOutputErrors()) {
throw new RuntimeException("writing dirty message to inlong sdk failed", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.base.dirty.sink.sdk;

import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;

import java.util.HashSet;
import java.util.Set;

import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;

public class InlongSdkDirtySinkFactory implements DirtySinkFactory {

private static final String IDENTIFIER = "inlong-sdk";

private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_MANAGER =
ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-manager-addr")
.stringType()
.noDefaultValue()
.withDescription("The inlong manager addr to init inlong sdk");

private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID =
ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-auth-id")
.stringType()
.noDefaultValue()
.withDescription("The inlong manager auth id to init inlong sdk");

private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY =
ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-auth-key")
.stringType()
.noDefaultValue()
.withDescription("The inlong manager auth id to init inlong sdk");

private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_GROUP =
ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-group-id")
.stringType()
.noDefaultValue()
.withDescription("The inlong group id of dirty sink");

private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_STREAM =
ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-stream-id")
.stringType()
.noDefaultValue()
.withDescription("The inlong stream id of dirty sink");

@Override
public <T> DirtySink<T> createDirtySink(DynamicTableFactory.Context context) {
ReadableConfig config = Configuration.fromMap(context.getCatalogTable().getOptions());
FactoryUtil.validateFactoryOptions(this, config);
validate(config);
return new InlongSdkDirtySink<>(getOptions(config),
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
}

private void validate(ReadableConfig config) {
String identifier = config.getOptional(DIRTY_IDENTIFIER).orElse(null);
if (identifier == null || identifier.trim().isEmpty()) {
throw new ValidationException(
"The option 'dirty.identifier' is not allowed to be empty.");
}
}

private InlongSdkOptions getOptions(ReadableConfig config) {
return InlongSdkOptions.builder()
.inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER))
.inlongGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP))
.inlongStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM))
.inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY))
.inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID))
.ignoreSideOutputErrors(config.getOptional(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS).orElse(true))
.enableDirtyLog(true)
.build();
}

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(DIRTY_SIDE_OUTPUT_INLONG_MANAGER);
options.add(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID);
options.add(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY);
options.add(DIRTY_SIDE_OUTPUT_INLONG_GROUP);
options.add(DIRTY_SIDE_OUTPUT_INLONG_STREAM);
return options;
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(DIRTY_SIDE_OUTPUT_FORMAT);
options.add(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS);
options.add(DIRTY_SIDE_OUTPUT_LOG_ENABLE);
return options;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.base.dirty.sink.sdk;

import lombok.Builder;
import lombok.Data;
import lombok.Getter;

import java.io.Serializable;

@Data
@Builder
@Getter
public class InlongSdkOptions implements Serializable {

private static final String DEFAULT_FORMAT = "csv";

private static final String DEFAULT_CSV_FIELD_DELIMITER = ",";
private static final String DEFAULT_CSV_LINE_DELIMITER = "\n";

private static final String DEFAULT_KV_FIELD_DELIMITER = "&";
private static final String DEFAULT_KV_ENTRY_DELIMITER = "=";

private String inlongGroupId;
private String inlongStreamId;
private String inlongManagerAddr;
private String inlongManagerAuthKey;
private String inlongManagerAuthId;
private String format = DEFAULT_FORMAT;
private boolean ignoreSideOutputErrors;
private boolean enableDirtyLog;
private String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER;
private String csvLineDelimiter = DEFAULT_CSV_LINE_DELIMITER;
private String kvFieldDelimiter = DEFAULT_KV_FIELD_DELIMITER;
private String kvEntryDelimiter = DEFAULT_KV_ENTRY_DELIMITER;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
# limitations under the License.

org.apache.inlong.sort.base.dirty.sink.log.LogDirtySinkFactory
org.apache.inlong.sort.base.dirty.sink.s3.S3DirtySinkFactory
org.apache.inlong.sort.base.dirty.sink.s3.S3DirtySinkFactory
org.apache.inlong.sort.base.dirty.sink.sdk.InlongSdkDirtySinkFactory

0 comments on commit c881667

Please sign in to comment.