-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(metrics): add actor input and output row number #3391
Conversation
5ea69ec
to
4feb853
Compare
Codecov Report
@@ Coverage Diff @@
## main #3391 +/- ##
==========================================
+ Coverage 73.81% 73.83% +0.01%
==========================================
Files 765 765
Lines 105357 105450 +93
==========================================
+ Hits 77767 77855 +88
- Misses 27590 27595 +5
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't stream_actor_row_count already do what you want?
We hope to get actor input and output rate from dashboard directly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be captious and have somehow high standard upon the PR instead of dealing with it as a kind of research code. Just a reminder, if you're demanding on the progress of your project, you can always fork and develop on your own branch instead of getting everything carefully reviewed and merged to main, which might be time-consuming for you. Anyway, feel free to ping me and other developers involving in the scaling thing and get things reviewed. For me, a large PR might take ~3days to a week to review, depending on my workload at that time. Small PRs like this can be reviewed much more quickly.
src/stream/src/executor/merge.rs
Outdated
actor_id: u32, | ||
status: OperatorInfoStatus, | ||
upstreams: Vec<Receiver<Message>>, | ||
metrics: Arc<StreamingMetrics>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can also map
and record metrics at:
SelectReceivers::new(self.actor_id, status, upstreams, self.metrics.clone());
// Channels that're blocked by the barrier to align.
select_all.boxed()
instead of here. SelectReceivers just multiplex the stream, and the metrics should not be coupled with this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
The true rate metrics and input/output number are metrics used for auto-scaling. I think I will add more metrics after this PR. I will contact with other developers for auto-scaling. |
@@ -23,7 +23,7 @@ use crate::executor::error::StreamExecutorError; | |||
use crate::executor::monitor::StreamingMetrics; | |||
use crate::executor::{ExecutorInfo, Message, MessageStream}; | |||
use crate::task::ActorId; | |||
|
|||
const ENABLE_EXECUTOR_ROW_COUNT: bool = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to add docs to this global variable:
/// Set to true to enable per-executor row count metrics. This will produce a lot of timeseries and might affect the prometheus performance. If you only need actor input and output rows data, see stream_actor_in_record_cnt
and stream_actor_out_record_cnt
instead.
@@ -162,6 +164,12 @@ impl DispatchExecutorInner { | |||
async fn dispatch(&mut self, msg: Message) -> Result<()> { | |||
match msg { | |||
Message::Chunk(chunk) => { | |||
let actor_id_str = self.actor_id.to_string(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pre-calculate means that we should put this in struct body, so that throughout the actor lifetime this string will only be generated once.
metrics | ||
.actor_in_record_cnt | ||
.with_label_values(&[&actor_id_str]) | ||
.inc_by(chunk.cardinality().try_into().unwrap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's that try_into().unwrap()
thing? Do you mean as u64
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM. Please fix the comments before merging. The new merging process is to add mergify/can-merge
label instead of clicking the merge button.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. BTW, this PR is used for auto-scaling. Since auto-scaling is an important feature for our "ease of use" claims, this is expected to stay in the repo.
if ENABLE_EXECUTOR_ROW_COUNT { | ||
metrics | ||
.executor_row_count | ||
.with_label_values(&[&actor_id_string, &executor_id_string]) | ||
.inc_by(chunk.cardinality() as u64); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sampling method can be used in executor_row_count
. Maybe implement it in the next PR.
if ENABLE_EXECUTOR_ROW_COUNT { | ||
if let Message::Chunk(chunk) = &message { | ||
if chunk.cardinality() > 0 { | ||
metrics | ||
.executor_row_count | ||
.with_label_values(&[&actor_id_string, &executor_id_string]) | ||
.inc_by(chunk.cardinality() as u64); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here
if let Ok(Message::Chunk(chunk)) = &msg { | ||
metrics | ||
.actor_in_record_cnt | ||
.with_label_values(&[&actor_id_str]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and maybe here
Of course, as long as the code looks good enough :) |
ff5fd6f
to
efbda6a
Compare
I believe not all comments are resolved. Please help resolve them before merging :) |
I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.
What's changed and what's your intention?
Adding actor input and output row number metrics by collect message size from
DispatchExecutor
,ReceiverExecutor
andMergerExecutor
Checklist
./risedev check
(or alias,./risedev c
)