diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java index f5b30be5370..e1e773ff061 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java @@ -20,7 +20,6 @@ import org.apache.seatunnel.api.sink.MultiTableResourceManager; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; -import org.apache.seatunnel.api.sink.event.WriterCloseEvent; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.tracing.MDCTracer; @@ -318,10 +317,6 @@ public void close() throws IOException { (identifier, sinkWriter) -> { try { sinkWriter.close(); - sinkWritersContext - .get(identifier) - .getEventListener() - .onEvent(new WriterCloseEvent()); } catch (Throwable e) { if (firstE[0] == null) { firstE[0] = e; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index cf056df4612..295328d8210 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SinkWriter.Context; import org.apache.seatunnel.api.sink.SupportResourceShare; +import org.apache.seatunnel.api.sink.event.WriterCloseEvent; import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; @@ -174,6 +175,7 @@ private Address getCommitterTaskAddress() throws ExecutionException, Interrupted public void close() throws IOException { super.close(); writer.close(); + writerContext.getEventListener().onEvent(new WriterCloseEvent()); try { if (resourceManager != null) { resourceManager.close();