Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add boundedness to source reader context, so that we can control to stop this reader when produce data #1885

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 34 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@
<skipIT>true</skipIT>
<elasticsearch>7</elasticsearch>
<slf4j.version>1.7.25</slf4j.version>
<guava.version>19.0</guava.version>
<auto-service.version>1.0.1</auto-service.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -607,20 +609,43 @@
<version>${scala.version}</version>
</dependency>

<!-- logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<version>${auto-service.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>

<build>

<finalName>${project.artifactId}-${project.version}-${scala.version}</finalName>
Expand Down
4 changes: 0 additions & 4 deletions seatunnel-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,5 @@
<artifactId>seatunnel-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.seatunnel.api.source;

/**
* Used to define the boundedness of a source. In batch mode, the source is {@link Boundedness#BOUNDED}.
* In streaming mode, the source is {@link Boundedness#UNBOUNDED}.
*/
public enum Boundedness {
/**
* A BOUNDED stream is a stream with finite records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.seatunnel.api.source;

/**
* A {@link Collector} is used to collect data from {@link SourceReader}.
*
* @param <T> data type.
*/
public interface Collector<T> {

void collect(T record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
*
* @param <T> The type of records produced by the source.
* @param <SplitT> The type of splits handled by the source.
* @param <StateT> The type of checkpoint states.
*/
public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT> extends Serializable {

Expand All @@ -37,14 +38,48 @@ public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT> extends
*/
Boundedness getBoundedness();

/**
* Create source reader, used to produce data.
*
* @param readerContext reader context.
* @return source reader.
* @throws Exception when create reader failed.
*/
SourceReader<T, SplitT> createReader(SourceReader.Context readerContext) throws Exception;

/**
* Create split serializer, use to serialize/deserialize split generated by {@link SourceSplitEnumerator}.
*
* @return split serializer.
*/
Serializer<SplitT> getSplitSerializer();

SourceSplitEnumerator<SplitT, StateT> createEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext) throws Exception;
/**
* Create source split enumerator, used to generate splits. This method will be called only once when start a source.
*
* @param enumeratorContext enumerator context.
* @return source split enumerator.
* @throws Exception when create enumerator failed.
*/
SourceSplitEnumerator<SplitT, StateT> createEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext)
throws Exception;

SourceSplitEnumerator<SplitT, StateT> restoreEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext, StateT checkpointState) throws Exception;
/**
* Create source split enumerator, used to generate splits. This method will be called when restore from checkpoint.
*
* @param enumeratorContext enumerator context.
* @param checkpointState checkpoint state.
* @return source split enumerator.
* @throws Exception when create enumerator failed.
*/
SourceSplitEnumerator<SplitT, StateT> restoreEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext,
StateT checkpointState) throws Exception;

/**
* Create enumerator state serializer, used to serialize/deserialize checkpoint state.
*
* @return enumerator state serializer.
*/
Serializer<StateT> getEnumeratorStateSerializer();

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.io.Serializable;

/**
* An base class for the events passed between the SourceReaders and Enumerators.
* A base class for the events passed between the {@link SourceReader} and {@link SourceSplitEnumerator}.
*/
public interface SourceEvent extends Serializable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,17 @@
import java.io.IOException;
import java.util.List;

/**
* The {@link SourceReader} is used to generate source record, and it will be running at worker.
*
* @param <T> record type.
* @param <SplitT> source split type.
*/
public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable, CheckpointListener {

/**
* Open the source reader.
*/
void open();

/**
Expand All @@ -33,10 +42,28 @@ public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseab
@Override
void close() throws IOException;

/**
* Generate the next batch of records.
*
* @param output output collector.
* @throws Exception if error occurs.
*/
void pollNext(Collector<T> output) throws Exception;

/**
* Get the current split checkpoint state by checkpointId.
*
* @param checkpointId checkpoint Id.
* @return split checkpoint state.
* @throws Exception if error occurs.
*/
List<SplitT> snapshotState(long checkpointId) throws Exception;

/**
* Add the split checkpoint state to reader.
*
* @param splits split checkpoint state.
*/
void addSplits(List<SplitT> splits);

/**
Expand All @@ -48,6 +75,11 @@ public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseab
*/
void handleNoMoreSplits();

/**
* Handle the source event form {@link SourceSplitEnumerator}.
*
* @param sourceEvent source event.
*/
default void handleSourceEvent(SourceEvent sourceEvent) {
}

Expand All @@ -59,7 +91,12 @@ interface Context {
int getIndexOfSubtask();

/**
* Indicator that the input has reached the end of data.
* @return boundedness of this reader.
*/
Boundedness getBoundedness();

/**
* Indicator that the input has reached the end of data. Then will cancel this reader.
*/
void signalNoMoreElement();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
import java.util.List;
import java.util.Set;

/**
* The {@link SourceSplitEnumerator} is responsible for enumerating the splits of a source. It will run at master.
*
* @param <SplitT> source split type
* @param <StateT>source split state type
*/
public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT> extends AutoCloseable, CheckpointListener {

void open();
Expand Down Expand Up @@ -52,6 +58,12 @@ public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT> exten

StateT snapshotState(long checkpointId) throws Exception;

/**
* Handle the source event from {@link SourceReader}.
*
* @param subtaskId The id of the subtask to which the source event from.
* @param sourceEvent source event.
*/
default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,7 @@

package org.apache.seatunnel.api.state;

public interface State {
import java.io.Serializable;

public interface State extends Serializable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.api.table.factory;

/**
* This is the SPI interface.
*/
public interface Factory {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@

import org.apache.seatunnel.api.table.connector.TableSink;

/**
* This is an SPI interface, used to create {@link TableSink}. Each plugin need to have it own implementation.
* todo: now we have not use this interface, we directly use {@link org.apache.seatunnel.api.sink.SeaTunnelSink} as the SPI interface.
*
* @param <IN> row type
* @param <StateT> state type
* @param <CommitInfoT> commit info type
* @param <AggregatedCommitInfoT> aggregated commit info type
*/
public interface TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Factory {

TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createSink(TableFactoryContext context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.connector.TableSource;

/**
* This is an SPI interface, used to create {@link TableSource}. Each plugin need to have it own implementation.
* todo: now we have not use this interface, we directly use {@link org.apache.seatunnel.api.source.SeaTunnelSource} as the SPI interface
*/
public interface TableSourceFactory extends Factory {

<T, SplitT extends SourceSplit, StateT> TableSource<T, SplitT, StateT> createSource(TableFactoryContext context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,24 @@ public class SerializationUtils {

public static String objectToString(Serializable obj) {
if (obj != null) {
return Base64.encodeBase64String(org.apache.commons.lang3.SerializationUtils.serialize(obj));
return Base64.encodeBase64String(SerializationUtils.serialize(obj));
}
return null;
}

public static <T extends Serializable> T stringToObject(String str) {
if (StringUtils.isNotEmpty(str)) {
return org.apache.commons.lang3.SerializationUtils.deserialize(Base64.decodeBase64(str));
return SerializationUtils.deserialize(Base64.decodeBase64(str));
}
return null;
}

public static <T extends Serializable> byte[] serialize(T obj) {
return org.apache.commons.lang3.SerializationUtils.serialize(obj);
return SerializationUtils.serialize(obj);
}

public static <T extends Serializable> T deserialize(byte[] bytes) {
return org.apache.commons.lang3.SerializationUtils.deserialize(bytes);
return SerializationUtils.deserialize(bytes);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;

import com.google.auto.service.AutoService;

import java.util.List;

@AutoService(SeaTunnelSink.class)
public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, ConsoleCommitInfo, ConsoleAggregatedCommitInfo> {

@Override
Expand Down

This file was deleted.

Loading