Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fleet_executor] Interceptor run op #37623

Merged
merged 4 commits into from
Nov 29, 2021

Conversation

wangxicoding
Copy link
Contributor

PR types

Others

PR changes

Others

Describe

Interceptor run op

@paddle-bot-old
Copy link

Thanks for your contribution!
Please wait for the result of CI firstly. See Paddle CI Manual for details.

while (IsInputReady() && CanWriteOutput()) {
// If there is no limit, source interceptor can be executed
// an unlimited number of times.
// Now source node can only run
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个annotation是不是没写完😂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

额,写了的,不知道咋漏了

// buffer is using
if (out_buff.second.second != 0) return;
}
step_ = 0; // reset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

咱们有RESET message呀,这个置0应该是消息驱动的吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

除了source和若干特殊interceptor,大部分interceptor都是不需要reset的。如果需要reset,也是需要像stop那样,否则还没跑完就给reset了。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个reset不应该放在carrier.start()里么,每一次外部调用fleet_executor.run()的时候,这个step就应该reset呀。这个step不是标明当前ministep里,跑了多少个micro step么。而且在运行过程中,reset source interceptor有什么特殊含义?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那这个reset也得有一个数据流的流动,和当前的并不冲突。这一步的判断总归是要做的,只不过再加个reset的条件触发。否则刚执行完一个micro_step,来了个reset消息,还没执行完就给reset了。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,应该加一个条件,不过,为啥会刚执行完一个mirco step就会收到reset 🤨

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那在开头加上一个step的reset吗,没有下一步了咋办O_o

// source compute node, should we add a new SourceInterceptor?
if (upstream.empty()) {
is_source_ = true;
PADDLE_ENFORCE_GT(node_->max_run_times(), 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥这儿有一个判断,这个东西不是上层指定的,他说跑0次就跑0次呗
而且,如果这个要判断,是不是只要有下游的interceptor都要判断一下,因为都需要至少跑一次才可以触发下游

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source interceptor,in degree为0,没有上游依赖,需要指定跑多少次,跑0次肯定是错的,这个max_run_times其实也是不准确的,应该就是run_times。 其它的compute interceptor,并不需要知道跑多少次,上游给消息就跑,除非有特殊功能,是不需要max_run_times的。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reasonable

attrs["shape"] = framework::vectorize<int>({2, 3});
attrs["value"] = 1.0f;

auto zero_op = framework::OpRegistry::CreateOp("fill_constant", {},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥叫zero呀🤔,值不是1么

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

从别处copy的,木有改 ╮(╯_╰)╭

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😂,all right

framework::AttributeMap());

// NOTE: don't delete
return {zero_op.release(), op.release()};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dangling ptr++

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没有和线程或者carrier绑定起来,析构了就访问非法地址了-。-

Copy link
Contributor

@FeixLiu FeixLiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wangxicoding wangxicoding merged commit 5b962bd into PaddlePaddle:develop Nov 29, 2021
@wangxicoding wangxicoding deleted the interceptor_run_op branch November 29, 2021 03:51
Zjq9409 pushed a commit to Zjq9409/Paddle that referenced this pull request Dec 10, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants