From e1e5d4cf99762def5fbd25f2cc6c8a25b6c3daa1 Mon Sep 17 00:00:00 2001 From: liuli Date: Wed, 31 May 2023 18:45:00 +0800 Subject: [PATCH] [Bugfix][CDC Base] Solving the ConcurrentModificationException caused by snapshotState being modified concurrently. --- .../source/enumerator/IncrementalSourceEnumerator.java | 9 +++++++-- .../cdc/sqlserver/source/source/offset/LsnOffset.java | 4 ++-- .../engine/server/task/SourceSplitEnumeratorTask.java | 1 + 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java index fa0dddddb36..86f7ac42def 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java @@ -109,7 +109,9 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { (CompletedSnapshotSplitsReportEvent) sourceEvent; List completedSplitWatermarks = reportEvent.getCompletedSnapshotSplitWatermarks(); - splitAssigner.onCompletedSplits(completedSplitWatermarks); + synchronized (context) { + splitAssigner.onCompletedSplits(completedSplitWatermarks); + } // send acknowledge event CompletedSnapshotSplitsAckEvent ackEvent = @@ -153,7 +155,10 @@ private void assignSplits() { continue; } - Optional split = splitAssigner.getNext(); + Optional split; + synchronized (context) { + split = splitAssigner.getNext(); + } if (split.isPresent()) { final SourceSplitBase sourceSplit = split.get(); context.assignSplit(nextAwaiting, sourceSplit); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/offset/LsnOffset.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/offset/LsnOffset.java index 61a7f5a4c16..277c054578b 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/offset/LsnOffset.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/offset/LsnOffset.java @@ -61,8 +61,8 @@ public Lsn getCommitLsn() { return Lsn.valueOf(offset.get(SourceInfo.COMMIT_LSN_KEY)); } - public Long getEventSerialNo() { - return Long.valueOf(offset.get(SourceInfo.EVENT_SERIAL_NO_KEY)); + public Object getEventSerialNo() { + return offset.get(SourceInfo.EVENT_SERIAL_NO_KEY); } public int compareTo(Offset o) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java index 18ba0a78e69..28b17003893 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java @@ -142,6 +142,7 @@ public void triggerBarrier(Barrier barrier) throws Exception { final long barrierId = barrier.getId(); Serializable snapshotState = null; byte[] serialize = null; + // Do not modify this lock object, as it is also used in the SourceSplitEnumerator. synchronized (enumeratorContext) { if (barrier.snapshot()) { snapshotState = enumerator.snapshotState(barrierId);