-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dask] [python-package] Search for available ports when setting up network (fixes #3753) #3766
[dask] [python-package] Search for available ports when setting up network (fixes #3753) #3766
Conversation
lightgbm_ports = set() | ||
worker_ip_to_port = {} | ||
for worker_address in worker_addresses: | ||
port = client.submit( | ||
func=_find_open_port, | ||
workers=[worker_address], | ||
worker_ip=urlparse(worker_address).hostname, | ||
local_listen_port=local_listen_port, | ||
ports_to_skip=lightgbm_ports | ||
).result() | ||
lightgbm_ports.add(port) | ||
worker_ip_to_port[worker_address] = port |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to draw reviewers' attention to this block. This setup checks for ports to use sequentially. So it will first run _find_open_port()
on worker 1's machine, then worker 2's machine, then work 3's, etc.
That means there is a small startup time introduced. I saw that this process took about 5 seconds per worker when I was running a cluster on AWS Fargate. So that means you might expect around 1 minute of startup time for a cluster with 20 workers.
I don't think this is too bad, and I think it's a fairly clean way to ensure that if you have multiple work processes on the same machine, they're assigned different ports in the network parameters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably overkill, but if you wanted to speed it up, maybe could group by IP address and apply the search in parallel across separate nodes (IP addresses)? So each the search for ports gathers as many ports as there are workers on that server.
def _find_ports_for_workers(client: Client, worker_addresses: Iterable[str], local_listen_port: int) -> Dict[str, int]:
worker_ip_to_port = {}
# group workers by ip addr
worker_hostname_map = defaultdict(list())
for worker_address in worker_addresses:
hostname = urlparse(worker_address).hostname
worker_hostname_map[hostname].append(worker_address)
# run search for ports on groups of workers
lgbm_machines = list()
for hostname in worker_hostname_map:
machines_on_node = client.submit(
func=_port_search_for_ip,
worker_ip=hostname,
desired_ports=len(worker_hostname_map[hostname]),
local_listen_port=local_listen_port
)
# add more futures
lgbm_machines.append(machines_on_node)
# wait for search across nodes to complete.
_ = wait(lgbm_machines)
lgbm_machines = lgbm_machines.results()
def _port_search_for_ip(worker_ip: str, local_listen_port: int, n_desired_ports: int) -> Iterable[str]:
max_tries = 1000
out_ports = list()
found_all_ports = False
for i in range(max_tries):
out_port = local_listen_port + i
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((worker_ip, out_port))
s.listen()
out_ports.append(out_port)
if len(out_ports) == n_desired_ports:
found_all_ports = True
break
except OSError:
continue
if not found_all_ports:
raise RuntimeError()
return [hostname + ':' + str(x) for x in out_ports]
Otherwise, the contraints of lightgbm + dask are that: 1) lightgbm requires that each worker knows about all of the other workers while at the same time 2) prior to distributing work to each worker, we know which workers can listen for lightgbm traffic on which ports. Seems like your fix nails this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably overkill, but if you wanted to speed it up, maybe could group by IP address and apply the search in parallel across separate nodes (IP addresses)? So each the search for ports gathers as many ports as there are workers on that server
That's a great idea, I like it! I think I'll push it to a later PR though, since it would take me some time to test and I really want to unblock you for #3708
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GH needs more emojis but if I could, I'd give you the prayer "bless up" hands
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just documented this in #3768, thanks for taking the time to type out that possible implementation. Super helpful.
hmmm, never saw this error before. Spotted on the
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jameslamb Just to clarify: I have literally zero experience with Dask. Sorry.
However, all your thoughts sound reasonable to me, so I think I can approve this PR.
Just please take a look at some general Python comments I left.
Co-authored-by: Nikita Titov <nekit94-08@mail.ru>
…M into fix/dask-network-setup
@StrikerRUS it's ok, I understand! I'm not sure if any other maintainers have Dask experience. If there's ever a Dask PR that you feel is large or controversial, we can probably get some reviewing help from our friends at XGBoost or I can ask others in the Dask community. |
Asking for a help from outside reviewers or maybe even find someone interested and experienced in LightGBM and Dask to collaborate on improvements in Dask-package on a regular basis supporting you will be just awesome. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jameslamb Thanks for addressing comments!
I've moved my test setup out into its own repo: https://github.com/jameslamb/lightgbm-dask-testing. I'm going to test this on |
happy to say this worked on |
This pull request has been automatically locked since there has not been any recent activity since it was closed. To start a new related discussion, open a new issue at https://github.com/microsoft/LightGBM/issues including a reference to this. |
See #3753 for additional details.
The Problem
By default, LightGBM's Python package is built to use TCP sockets for distributed learning (
-DUSE_SOCKET=1
). When performing distributed learning, LightGBM requires a list of machines and ports, as is documented here.lightgbm.dask
currently creates this list by assigning a fixed range of ports. Assuming that you pass parameterslocal_listen_port=12400
, for example, the first worker will open a socket{first_worker_ip}: 12400
, the second will open one{second_worker_ip}:12401
, and so on.This approach is fragile, since it makes the assumption that the ports you request will definitely be available. It can lead to an error like this:
This can happen either because of connections not quite cleaned up after previous LightGBM training runs (#3753 (comment)) or because of any other processes running on that machine which might already be using a port that LightGBM wants.
Changes in this PR
lightgbm.dask
will now search for open ports on each worker when constructing the network. It useslocal_listen_port
as a starting point, but doesn't assume that that port or any other specific port is available._train()
, before setting up the network. Onmaster
today, the code to set up the parameters is re-run in_train_part()
(once on each worker) even though it produces the same results.fit()
with a Dask model many times in a row with no sleeping in between...this is the primary way that the bug in [dask] [python-package] DaskLGBMRegressor training error: 'binding port 12402 failed' #3753 shows upNotes for reviewers
I tested this with both a
LocalCluster
with 3 workers and adask_cloudprovider.aws.FargateCluster
, using the steps described in #3753 (comment) and the notebooks in https://github.com/jameslamb/talks/tree/main/recent-developments-in-lightgbm.I think I'll set up a separate small repo with my testing setup so that this can be more easily re-used by contributors in the future.This can be tested with the setup in https://github.com/jameslamb/lightgbm-dask-testing.I captured the screenshot below to show some more evidence that this worked. I had the code print out its decision for ports on each worker, and I ran training 10 times in a row in a for loop like this:
You can see that on some runs, it chose 12400, 12401, and 12402. But on others, LightGBM chose to skip up a few ports. That's the "search until you find one" stuff working! The picture on the right side of the screenshot shows the individual training runs succeeding one after another.
Since this worked well on both
LocalCluster
(where all workers have the same IP address) andFargateCluster
(where each worker is a different physical machine, with a unique IP address), I feel confident in this fix.special thanks to @imatiach-msft and @ffineis for their help figuring this out!