Skip to content

Commit

Permalink
[Improve][Core] Move MultiTableSink to seatunnel-api module (apache#7243
Browse files Browse the repository at this point in the history
)

* [Improve][Core] Move MultiTableSink to seatunnel-api module

* [Improve][Core] Move MultiTableSink to seatunnel-api module
  • Loading branch information
Hisoka-X authored Jul 23, 2024
1 parent fe0c477 commit cc59499
Show file tree
Hide file tree
Showing 13 changed files with 19 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
package org.apache.seatunnel.api.sink.multitablesink;

import lombok.AllArgsConstructor;
import lombok.Getter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
package org.apache.seatunnel.api.sink.multitablesink;

import lombok.AllArgsConstructor;
import lombok.Getter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
package org.apache.seatunnel.api.sink.multitablesink;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
Expand All @@ -28,6 +28,8 @@
import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import lombok.Getter;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -44,7 +46,7 @@ public class MultiTableSink
MultiTableCommitInfo,
MultiTableAggregatedCommitInfo> {

private final Map<String, SeaTunnelSink> sinks;
@Getter private final Map<String, SeaTunnelSink> sinks;
private final int replicaNum;

public MultiTableSink(MultiTableFactoryContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
package org.apache.seatunnel.api.sink.multitablesink;

import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
package org.apache.seatunnel.api.sink.multitablesink;

import org.apache.seatunnel.api.sink.SinkCommitter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
package org.apache.seatunnel.api.sink.multitablesink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.connector.TableSink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
package org.apache.seatunnel.api.sink.multitablesink;

import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
package org.apache.seatunnel.api.sink.multitablesink;

import lombok.AllArgsConstructor;
import lombok.Getter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
package org.apache.seatunnel.api.sink.multitablesink;

import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
package org.apache.seatunnel.api.sink.multitablesink;

import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.EventListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
package org.apache.seatunnel.api.sink.multitablesink;

import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.env.ParsingMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkFactory;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceOptions;
import org.apache.seatunnel.api.source.SourceSplit;
Expand Down Expand Up @@ -151,7 +152,7 @@ SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createMultiTableSi
ClassLoader classLoader) {
try {
TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT> factory =
discoverFactory(classLoader, TableSinkFactory.class, "MultiTableSink");
new MultiTableSinkFactory();
MultiTableFactoryContext context =
new MultiTableFactoryContext(options, classLoader, sinks);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
Expand Down Expand Up @@ -375,13 +375,8 @@ public static void handleSaveMode(SeaTunnelSink sink) {
throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
}
} else if (sink.getClass()
.getName()
.equals(
"org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSink")) {
// TODO we should not use class name to judge the sink type
Map<String, SeaTunnelSink> sinks =
(Map<String, SeaTunnelSink>) ReflectionUtils.getField(sink, "sinks").get();
} else if (sink instanceof MultiTableSink) {
Map<String, SeaTunnelSink> sinks = ((MultiTableSink) sink).getSinks();
for (SeaTunnelSink seaTunnelSink : sinks.values()) {
handleSaveMode(seaTunnelSink);
}
Expand Down

0 comments on commit cc59499

Please sign in to comment.