-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Route tuner and assessor commands to 2 seperate queues #891
Conversation
chicm-ms
commented
Mar 20, 2019
- Route tuner and assessor commands to 2 seperate queues Tuner and assessor compete for the same message queue, assessor may starve tuner. #841
- Allow tuner to leverage intermediate result when trial is early stopped. Enable ealy stopped trials data for tuners. #843
Optimize MetisTuner (microsoft#811)
pull code
* Install dependencies for PAI/k8s IT * updates
Fix integration test dependencies (microsoft#822)
pull code
pull code
docs/en_US/ExperimentConfig.md
Outdated
|
||
__gpuNum__ specifies the gpu number to run the tuner process. The value of this field should be a positive number. | ||
|
||
Note: users could only specify one way to set tuner, for example, set {tunerName, optimizationMode} or {tunerCommand, tunerCwd}, and could not set them both. | ||
|
||
* __includeIntermeidateResults__ | ||
|
||
If __includeIntermeidateResults__ is true, the last intermediate results of the trials early stopped by assessor are sent to tuner as final results. The default value of __includeIntermeidateResults__ is false. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a little confuse of this name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name is what we have for now as a discussion result in Teams, it was proposed by Mao and Chi, if we can find a better one, it is still can be changed before release.
command, data = command_queue.get(timeout=3) | ||
try: | ||
self.process_command(command, data) | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we only catch the expected exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The process_command calls Tuner or Assessor code, we can not know the exception types it will raise, if we do not capture them, they will be raise to nowhere because this code path is in a worker thread.
@@ -77,7 +77,7 @@ def run(dispatch_type): | |||
for dispatcher_name in dipsatcher_list: | |||
try: | |||
# sleep 5 seconds here, to make sure previous stopped exp has enough time to exit to avoid port conflict | |||
time.sleep(5) | |||
time.sleep(6) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change this? shall we change the comments at the same time? "# sleep 5 seconds here, to make sure previous stopped exp has enough time to exit to avoid port conflict"....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated the comments.
The reason to change this is: after this tuner/assessor queue change, if there is no commands in queues, it will still possible to wait up to 3 seconds after dispatcher receives TERMINATE command. So it may take a little bit longer time to end for a normal exepriment.
@@ -106,7 +106,7 @@ describe('core/ipcInterface.terminate', (): void => { | |||
assert.ok(!procError); | |||
deferred.resolve(); | |||
}, | |||
2000); | |||
5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same reason as above to accommodate the queue change.
if command == CommandType.TrialEnd or (command == CommandType.ReportMetricData and data['type'] == 'PERIODICAL'): | ||
self.assessor_command_queue.put((command, data)) | ||
else: | ||
self.default_command_queue.put((command, data)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please make sure the queue put is thread-safe, because both main thread and assessor thread will put record to this queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, merge for integration test, will check this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just checked:
Queue is thread safe.
https://docs.python.org/3/library/queue.html
The queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics. It depends on the availability of thread support in Python; see the threading module.
Reference: https://github.com/python/cpython/blob/3.7/Lib/queue.py self.not_full lock is used with put method.
looks good. One more thing is that according to our discussion, trial end event will also be sent to tuner, we can implement this function in the next pr. |
OK |