-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Fix][Connector-V2] Fixed clickhouse connectors cannot stop under multiple parallelism #7921
Conversation
1、When the ClickHouse connector is set to multi parallelism, the task extraction is completed but cannot be stopped normally [(#7897)](#7897) 2、Added E2E test cases for this issue [(#7897)](#7897) 3、Local developers want to observe **Job Progress Information** in a timely manner, Need to modify the following configuration.The configuration in config is invalid ``` seatunnel engine/seatunnel-engineer-common/src/main/resources/seatunnely.yaml ```
1、When the ClickHouse connector is set to multi parallelism, the task extraction is completed but cannot be stopped normally [(#7897)](#7897) 2、Added E2E test cases for this issue [(#7897)](#7897) 3、Local developers want to observe **Job Progress Information** in a timely manner, Need to modify the following configuration.The configuration in config is invalid ``` seatunnel engine/seatunnel-engineer-common/src/main/resources/seatunnely.yaml ```
1、When the ClickHouse connector is set to multi parallelism, the task extraction is completed but cannot be stopped normally [(#7897)](#7897) 2、Added E2E test cases for this issue [(#7897)](#7897) 3、Local developers want to observe **Job Progress Information** in a timely manner, Need to modify the following configuration.The configuration in config is invalid ``` seatunnel engine/seatunnel-engineer-common/src/main/resources/seatunnely.yaml ```
This reverts commit 42e5919.
This reverts commit baa7cb8.
@@ -20,6 +20,7 @@ seatunnel: | |||
backup-count: 1 | |||
queue-type: blockingqueue | |||
print-execution-info-interval: 60 | |||
print-job-metrics-info-interval: 60 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default value of print-job-metrics-info-interval
is 60. This is valid without change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that the default value is also used if it is not changed here, but here is a parameter to let you know where you can change it.
Otherwise, if you run an example in the project and want to change the parameters, the results will not work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default value of
print-job-metrics-info-interval
is 60. This is valid without change
I think this parameter can be added here, at least adding the default value will not cause any other bad effects
But it worked for me, at first I wanted to change the configuration, I didn't know where to change, I searched a lot of places in the project, and it didn't take effect after the change, and then it was someone else who pointed out that I knew to change here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can configure it in your local environment. But do not commit it. Please revert it.
@@ -101,6 +101,13 @@ public void testClickhouse(TestContainer container) throws Exception { | |||
clearSinkTable(); | |||
} | |||
|
|||
@TestTemplate | |||
public void testSourceParallelism(TestContainer container) throws Exception { | |||
LOG.info("=========Multi parallelism testing begins==========="); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG.info("=========Multi parallelism testing begins==========="); |
@@ -20,6 +20,7 @@ seatunnel: | |||
backup-count: 1 | |||
queue-type: blockingqueue | |||
print-execution-info-interval: 60 | |||
print-job-metrics-info-interval: 60 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can configure it in your local environment. But do not commit it. Please revert it.
if (assigned < 0) { | ||
assigned = subtaskId; | ||
context.assignSplit(subtaskId, new ClickhouseSourceSplit()); | ||
} else { | ||
context.signalNoMoreSplits(subtaskId); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (assigned < 0) { | |
assigned = subtaskId; | |
context.assignSplit(subtaskId, new ClickhouseSourceSplit()); | |
} else { | |
context.signalNoMoreSplits(subtaskId); | |
} | |
if (assigned < 0) { | |
assigned = subtaskId; | |
context.assignSplit(subtaskId, new ClickhouseSourceSplit()); | |
context.signalNoMoreSplits(subtaskId); | |
} else { | |
context.signalNoMoreSplits(subtaskId); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write it like this?
if (assigned < 0) {
assigned = subtaskId;
context.assignSplit(subtaskId, new ClickhouseSourceSplit());
}
context.signalNoMoreSplits(subtaskId);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write it like this?
if (assigned < 0) { assigned = subtaskId; context.assignSplit(subtaskId, new ClickhouseSourceSplit()); } context.signalNoMoreSplits(subtaskId);
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I didn't notice this
this.readerContext.signalNoMoreElement(); | ||
this.splits.clear(); | ||
} else if (noMoreSplit | ||
&& splits.isEmpty() | ||
&& Boundedness.BOUNDED.equals(readerContext.getBoundedness())) { | ||
log.info("Closed the bounded ClickHouse source"); | ||
this.readerContext.signalNoMoreElement(); | ||
this.splits.clear(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.readerContext.signalNoMoreElement(); | |
this.splits.clear(); | |
} else if (noMoreSplit | |
&& splits.isEmpty() | |
&& Boundedness.BOUNDED.equals(readerContext.getBoundedness())) { | |
log.info("Closed the bounded ClickHouse source"); | |
this.readerContext.signalNoMoreElement(); | |
this.splits.clear(); | |
} | |
this.splits.clear(); | |
} | |
if (noMoreSplit | |
&& splits.isEmpty() | |
&& Boundedness.BOUNDED.equals(readerContext.getBoundedness())) { | |
log.info("Closed the bounded ClickHouse source"); | |
this.readerContext.signalNoMoreElement(); | |
this.splits.clear(); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boundedness.BOUNDED.equals(readerContext.getBoundedness())
This can be removed I think because clickhouse only supports batch mode reading
cc @Hisoka-X
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep it for now. The purpose of keeping it is to unify the processing logic of most connectors, to facilitate reference for other developers and reduce the chance of bugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep it for now. The purpose of keeping it is to unify the processing logic of most connectors, to facilitate reference for other developers and reduce the chance of bugs.
What if the user uses processing mode to write STREAMING? It's unnecessary code and I think it's better to get rid of it Developers adding features are certainly familiar with their read patterns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the user uses processing mode to write STREAMING?
Sorry, I don get it. Could you share more details? You meaning set job.mode
to STREAMING
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the user uses processing mode to write STREAMING?
Sorry, I don get it. Could you share more details? You meaning set
job.mode
toSTREAMING
?
env {
job.mode = "STREAMING"
parallelism = 2
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Clickhouse {
host = "127.0.0.1:8123"
database = "default"
sql = "select * from students"
username = "default"
password = ""
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source/ClickhouseSource
}
sink {
Console {
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/sink
}
So for example in this case the user will never exit the program, essentially no matter how you configure the read mode clickhouse only supports batch mode, right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If users set to STREAMING, but source only supported BATCH mode. The job will execute as BATCH mode. It depends on
Line 160 in a8d0d4c
return Boundedness.BOUNDED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood, but an extra judgment is a redundant operation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add checkpoint lock when read spilt. Please refer
Line 76 in 50113e7
synchronized (output.getCheckpointLock()) { |
cc @zhilinli123 as well. |
I don't quite understand where this should be added and where it needs to be used. Currently, I haven't found any places where it needs to be used |
Waiting for the start of the current pr merger modify this code: #7529 |
…tiple parallelism Adjust the bug regarding the Clickhouse connector not being able to stop properly
} | ||
this.readerContext.signalNoMoreElement(); | ||
this.splits.clear(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract clickhouse Close the task method
this.readerContext.signalNoMoreElement();
this.splits.clear();
} | ||
if (noMoreSplit && splits.isEmpty()) { | ||
log.info("Closed the bounded ClickHouse source"); | ||
this.readerContext.signalNoMoreElement(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
@@ -77,6 +77,9 @@ public void registerReader(int subtaskId) { | |||
if (assigned < 0) { | |||
assigned = subtaskId; | |||
context.assignSplit(subtaskId, new ClickhouseSourceSplit()); | |||
context.signalNoMoreSplits(subtaskId); | |||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (assigned < 0) {
assigned = subtaskId;
context.assignSplit(subtaskId, new ClickhouseSourceSplit());
}
context.signalNoMoreSplits(subtaskId);
…tiple parallelism Optimize code structure
…tiple parallelism format code
Wait for CI pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM if ci passes. Thanks @YOMO-Lee and @zhilinli123 @corgy-w for review!
+1 |
waiting for ci passed |
Fixed the bug where clickhouse connectors cannot stop properly under multiple parallelism