Skip to content

Commit

Permalink
[Feature][Connector-V2][Elasticsearce]improve restore split(apache#2553)
Browse files Browse the repository at this point in the history
  • Loading branch information
iture123 committed Oct 16, 2022
1 parent d39c5cb commit ed2ba50
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ public SourceSplitEnumerator<ElasticsearchSourceSplit, ElasticsearchSourceState>
}

@Override
public SourceSplitEnumerator<ElasticsearchSourceSplit, ElasticsearchSourceState> restoreEnumerator(SourceSplitEnumerator.Context<ElasticsearchSourceSplit> enumeratorContext, ElasticsearchSourceState checkpointState) throws Exception {
return new ElasticsearchSourceSplitEnumerator(enumeratorContext, pluginConfig);
public SourceSplitEnumerator<ElasticsearchSourceSplit, ElasticsearchSourceState> restoreEnumerator(SourceSplitEnumerator.Context<ElasticsearchSourceSplit> enumeratorContext, ElasticsearchSourceState sourceState) {
return new ElasticsearchSourceSplitEnumerator(enumeratorContext, sourceState, pluginConfig);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,28 @@
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo;

import lombok.ToString;

@ToString
public class ElasticsearchSourceSplit implements SourceSplit {

private static final long serialVersionUID = -1L;

private String splitId;

private SourceIndexInfo sourceIndexInfo;
private int splitId;

public ElasticsearchSourceSplit(SourceIndexInfo sourceIndexInfo, int splitId) {
this.sourceIndexInfo = sourceIndexInfo;
public ElasticsearchSourceSplit(String splitId, SourceIndexInfo sourceIndexInfo) {
this.splitId = splitId;
}

@Override
public String splitId() {
return String.valueOf(splitId);
this.sourceIndexInfo = sourceIndexInfo;
}

public SourceIndexInfo getSourceIndexInfo() {
return sourceIndexInfo;
}

public void setSourceIndexInfo(SourceIndexInfo sourceIndexInfo) {
this.sourceIndexInfo = sourceIndexInfo;
}

public int getSplitId() {
@Override
public String splitId() {
return splitId;
}

public void setSplitId(int splitId) {
this.splitId = splitId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,47 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

@Slf4j
public class ElasticsearchSourceSplitEnumerator implements SourceSplitEnumerator<ElasticsearchSourceSplit, ElasticsearchSourceState> {

private SourceSplitEnumerator.Context<ElasticsearchSourceSplit> enumeratorContext;
private SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context;

private Config pluginConfig;

private EsRestClient esRestClient;

public ElasticsearchSourceSplitEnumerator(SourceSplitEnumerator.Context<ElasticsearchSourceSplit> enumeratorContext, Config pluginConfig) {
this.enumeratorContext = enumeratorContext;
private final Object stateLock = new Object();

private Map<Integer, List<ElasticsearchSourceSplit>> pendingSplit;

private volatile boolean shouldEnumerate;

public ElasticsearchSourceSplitEnumerator(SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context, Config pluginConfig) {
this(context, null, pluginConfig);
}

public ElasticsearchSourceSplitEnumerator(SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context, ElasticsearchSourceState sourceState, Config pluginConfig) {
this.context = context;
this.pluginConfig = pluginConfig;
this.pendingSplit = new HashMap<>();
this.shouldEnumerate = sourceState == null;
if (sourceState != null) {
this.shouldEnumerate = sourceState.isShouldEnumerate();
this.pendingSplit.putAll(sourceState.getPendingSplit());
}
}

@Override
Expand All @@ -51,32 +75,59 @@ public void open() {
}

@Override
public void run() throws Exception {
public void run() {
Set<Integer> readers = context.registeredReaders();
if (shouldEnumerate) {
List<ElasticsearchSourceSplit> newSplits = getElasticsearchSplit();

synchronized (stateLock) {
addPendingSplit(newSplits);
shouldEnumerate = false;
}

}
assignSplit(readers);
}

@Override
public void close() throws IOException {
esRestClient.close();
log.debug("No more splits to assign." +
" Sending NoMoreSplitsEvent to reader {}.", readers);
readers.forEach(context::signalNoMoreSplits);
}

@Override
public void addSplitsBack(List<ElasticsearchSourceSplit> splits, int subtaskId) {

private void addPendingSplit(Collection<ElasticsearchSourceSplit> splits) {
int readerCount = context.currentParallelism();
for (ElasticsearchSourceSplit split : splits) {
int ownerReader = getSplitOwner(split.splitId(), readerCount);
log.info("Assigning {} to {} reader.", split, ownerReader);
pendingSplit.computeIfAbsent(ownerReader, r -> new ArrayList<>())
.add(split);
}
}

@Override
public int currentUnassignedSplitSize() {
return 0;
private static int getSplitOwner(String tp, int numReaders) {
return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
}

@Override
public void handleSplitRequest(int subtaskId) {

private void assignSplit(Collection<Integer> readers) {
log.debug("Assign pendingSplits to readers {}", readers);

for (int reader : readers) {
List<ElasticsearchSourceSplit> assignmentForReader = pendingSplit.remove(reader);
if (assignmentForReader != null && !assignmentForReader.isEmpty()) {
log.info("Assign splits {} to reader {}",
assignmentForReader, reader);
try {
context.assignSplit(reader, assignmentForReader);
} catch (Exception e) {
log.error("Failed to assign splits {} to reader {}",
assignmentForReader, reader, e);
pendingSplit.put(reader, assignmentForReader);
}
}
}
}

@Override
public void registerReader(int subtaskId) {
private List<ElasticsearchSourceSplit> getElasticsearchSplit() {
List<ElasticsearchSourceSplit> splits = new ArrayList<>();
String scrolllTime = SourceConfigDeaultConstant.SCROLLL_TIME;
if (pluginConfig.hasPath(SourceConfig.SCROLL_TIME)) {
scrolllTime = pluginConfig.getString(SourceConfig.SCROLL_TIME);
Expand All @@ -89,24 +140,47 @@ public void registerReader(int subtaskId) {
List<IndexDocsCount> indexDocsCounts = esRestClient.getIndexDocsCount(pluginConfig.getString(SourceConfig.INDEX));
indexDocsCounts = indexDocsCounts.stream().filter(x -> x.getDocsCount() != null && x.getDocsCount() > 0)
.sorted(Comparator.comparingLong(IndexDocsCount::getDocsCount)).collect(Collectors.toList());
List<ElasticsearchSourceSplit> splits = new ArrayList<>();
int parallelism = enumeratorContext.currentParallelism();
List<String> sources = pluginConfig.getStringList(SourceConfig.SOURCE);

for (int i = 0; i < indexDocsCounts.size(); i++) {
IndexDocsCount indexDocsCount = indexDocsCounts.get(i);
if (i % parallelism == subtaskId) {
splits.add(new ElasticsearchSourceSplit(new SourceIndexInfo(indexDocsCount.getIndex(), sources, scrolllTime, scrollSize), subtaskId));
}
for (IndexDocsCount indexDocsCount : indexDocsCounts) {
splits.add(new ElasticsearchSourceSplit(String.valueOf(indexDocsCount.getIndex().hashCode()), new SourceIndexInfo(indexDocsCount.getIndex(), sources, scrolllTime, scrollSize)));
}
return splits;
}

@Override
public void close() throws IOException {
esRestClient.close();
}

@Override
public void addSplitsBack(List<ElasticsearchSourceSplit> splits, int subtaskId) {

enumeratorContext.assignSplit(subtaskId, splits);
enumeratorContext.signalNoMoreSplits(subtaskId);
}

@Override
public int currentUnassignedSplitSize() {
return 0;
}

@Override
public void handleSplitRequest(int subtaskId) {
throw new UnsupportedOperationException("Unsupported handleSplitRequest: " + subtaskId);
}

@Override
public void registerReader(int subtaskId) {
log.debug("Register reader {} to IoTDBSourceSplitEnumerator.",
subtaskId);
if (!pendingSplit.isEmpty()) {
assignSplit(Collections.singletonList(subtaskId));
}
}

@Override
public ElasticsearchSourceState snapshotState(long checkpointId) throws Exception {
return null;
synchronized (stateLock) {
return new ElasticsearchSourceState(shouldEnumerate, pendingSplit);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;

import lombok.AllArgsConstructor;
import lombok.Getter;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

@AllArgsConstructor
@Getter
public class ElasticsearchSourceState implements Serializable {
private boolean shouldEnumerate;
private Map<Integer, List<ElasticsearchSourceSplit>> pendingSplit;
}

0 comments on commit ed2ba50

Please sign in to comment.