Skip to content

Commit

Permalink
small fix
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed May 27, 2023
1 parent 55afc50 commit fdd8ca5
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -175,6 +176,13 @@ protected boolean isRedirectable(String method) {
httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
}
}

// 2023/05/21 百岁 add for sequencde update
Optional<String> seqColName = this.options.getSeqColName();
if (seqColName.isPresent()) {
httpPut.setHeader("function_column.sequence_col", seqColName.get());
}

httpPut.setHeader("Expect", "100-continue");
httpPut.setHeader("label", label);
httpPut.setHeader("two_phase_commit", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public void init() {
}
writerManager = new DorisWriterManager(options);
rowCodec = DorisCodecFactory.createCodec(options);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import org.apache.commons.collections.MapUtils;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class Keys implements Serializable {
Expand Down Expand Up @@ -50,6 +52,11 @@ public String getToken() {
private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength";
private static final String LOAD_PROPS = "loadProps";

/**
* baisui add: https://doris.apache.org/zh-CN/docs/dev/data-operate/update-delete/sequence-column-manual
*/
public static final String COL_SEQUENCE_NAME = "sequence_col_name";

private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_";

private static final long DEFAULT_MAX_BATCH_SIZE = 90 * 1024 * 1024; //default 90M
Expand Down Expand Up @@ -129,6 +136,15 @@ public Map<String, Object> getLoadProps() {
return options.getMap(LOAD_PROPS);
}

public Optional<String> getSeqColName() {

Map<String, Object> props = getLoadProps();
if (MapUtils.isEmpty(props)) {
return Optional.empty();
}
return Optional.ofNullable((String) props.get(COL_SEQUENCE_NAME));
}

public int getMaxRetries() {
return MAX_RETRIES;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,12 +880,15 @@ public static IDataSourceFactoryGetter getWriterDataSourceFactoryGetter(Configur
public static IDataSourceFactoryGetter getReaderDataSourceFactoryGetter(Configuration config, IJobContainerContext containerContext) {
return getDataSourceFactoryGetter(config, containerContext, (res) -> {

IDataxProcessor processor = DataxProcessor.load(null, res.resType, res.getDataXName());
IDataxReader reader = null;
if ((reader = processor.getReader(null)) instanceof IDataSourceFactoryGetter) {
return reader;
if (res.resType != StoreResourceType.DataFlow) {
IDataxProcessor processor = DataxProcessor.load(null, res.resType, res.getDataXName());
IDataxReader reader = null;
if ((reader = processor.getReader(null)) instanceof IDataSourceFactoryGetter) {
return reader;
}
}


final DBIdentity dbFactoryId = DBIdentity.parseId(config.getString(DataxUtils.DATASOURCE_FACTORY_IDENTITY));
return new IDataSourceFactoryGetter() {
@Override
Expand Down

0 comments on commit fdd8ca5

Please sign in to comment.