Skip to content

Commit

Permalink
Merge pull request #596 from tignear/feature/manual-connect-and-recon…
Browse files Browse the repository at this point in the history
…nect

feat(shard): manually shard connect and reconnect
  • Loading branch information
jb3 authored May 22, 2024
2 parents f956bb3 + af5103e commit eff410b
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 24 deletions.
2 changes: 2 additions & 0 deletions guides/intro/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Apart from the `token` field mentioned above, the following fields are also supp
should contain the total amount of shards that your bot is expected to have.
Useful for splitting a single bot across multiple servers, but see also [the
multi-node documentation](../advanced/multi_node.md).
- `:manual`: nostrum does not automatically spawn shards. You should use
`Nostrum.Shard.Supervisor.connect/2` to spawn shards instead.
- `gateway_intents` - a list of atoms representing gateway intents for Nostrum
to subscribe to from the Discord API. More information can be found in the
[gateway intents](./gateway_intents.md) documentation page.
Expand Down
30 changes: 27 additions & 3 deletions lib/nostrum/shard.ex
Original file line number Diff line number Diff line change
@@ -1,21 +1,45 @@
defmodule Nostrum.Shard do
@moduledoc false

use Supervisor
use Supervisor, restart: :transient

alias Nostrum.Shard.Session

def start_link([_, shard_num, _total] = opts) do
def start_link({:connect, [_, shard_num, _total]} = opts) do
Supervisor.start_link(__MODULE__, opts, name: :"Nostrum.Shard-#{shard_num}")
end

def start_link(
{:reconnect,
%{
shard_num: shard_num,
total_shards: _total_shards,
gateway: _gateway,
resume_gateway: _resume_gateway,
seq: _seq,
session: _session
}} =
opts
) do
Supervisor.start_link(__MODULE__, opts, name: :"Nostrum.Shard-#{shard_num}")
end

def start_link([_, _shard_num, _total] = opts) do
start_link({:connect, opts})
end

def init(opts) do
children = [
{Session, opts}
# TODO: Add per shard ratelimiter
# TODO: Add per shard cache
]

Supervisor.init(children, strategy: :one_for_all, max_restarts: 3, max_seconds: 60)
Supervisor.init(children,
strategy: :one_for_all,
max_restarts: 3,
max_seconds: 60,
auto_shutdown: :any_significant
)
end
end
88 changes: 86 additions & 2 deletions lib/nostrum/shard/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,45 @@ defmodule Nostrum.Shard.Session do
:gen_statem.cast(pid, {:request_guild_members, payload})
end

def disconnect(pid) do
:gen_statem.call(pid, {:disconnect, nil})
end

def disconnect(pid, timeout) do
:gen_statem.call(pid, {:disconnect, nil}, timeout)
end

def get_ws_state(pid) do
:sys.get_state(pid)
end

# State machine API

def start_link({:connect, [_gateway, _shard_num, _total]} = opts, statem_opts) do
:gen_statem.start_link(__MODULE__, opts, statem_opts)
end

def start_link(
{:reconnect,
%{
shard_num: _shard_num,
total_shards: _total_shards,
gateway: _gateway,
resume_gateway: _resume_gateway,
seq: _seq,
session: _session
}} =
opts,
statem_opts
) do
:gen_statem.start_link(__MODULE__, opts, statem_opts)
end

def start_link([_gateway, _shard_num, _total] = shard_opts, statem_opts) do
:gen_statem.start_link(__MODULE__, shard_opts, statem_opts)
end

def init([gateway, shard_num, total]) do
def init({:connect, [gateway, shard_num, total]}) do
Logger.metadata(shard: shard_num)

state = %WSState{
Expand All @@ -131,14 +159,46 @@ defmodule Nostrum.Shard.Session do
{:ok, :disconnected, state, connect}
end

def init(
{:reconnect,
%{
shard_num: shard_num,
total_shards: total_shards,
gateway: gateway,
resume_gateway: resume_gateway,
seq: seq,
session: session
}}
) do
Logger.metadata(shard: shard_num)

state = %WSState{
conn_pid: self(),
shard_num: shard_num,
total_shards: total_shards,
gateway: gateway,
resume_gateway: resume_gateway,
session: session,
seq: seq
}

connect = {:next_event, :internal, :connect}
{:ok, :disconnected, state, connect}
end

def init([gateway, shard_num, total]) do
init({:connect, [gateway, shard_num, total]})
end

def callback_mode, do: [:state_functions, :state_enter]

def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts, []]},
type: :worker,
restart: :permanent,
restart: :transient,
significant: true,
shutdown: 500
}
end
Expand Down Expand Up @@ -334,6 +394,30 @@ defmodule Nostrum.Shard.Session do
:keep_state_and_data
end

def connected({:call, from}, {:disconnect, nil}, %{
conn: conn,
shard_num: shard_num,
total_shards: total,
gateway: gateway,
resume_gateway: resume_gateway,
seq: seq,
session: session
}) do
:ok = :gun.close(conn)
:ok = :gun.flush(conn)

{:stop_and_reply, :normal,
{:reply, from,
%{
shard_num: shard_num,
total_shards: total,
gateway: gateway,
resume_gateway: resume_gateway,
session: session,
seq: seq
}}}
end

def connected(
:state_timeout,
:send_heartbeat = request,
Expand Down
106 changes: 87 additions & 19 deletions lib/nostrum/shard/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,26 @@ defmodule Nostrum.Shard.Supervisor do
handle yet, thus growing the message queue and the memory usage.
"""

use Supervisor
@typedoc """
Shard number(`shard_id`). Range is `0..total_shards-1`.
"""
@type shard_num :: non_neg_integer()
@typedoc """
Total shard count(`num_shards`).
"""
@type total_shards :: pos_integer()
@typedoc """
Represents gateway resume information.
"""
@type resume_information :: %{
shard_num: shard_num(),
total_shards: total_shards(),
gateway: String.t(),
resume_gateway: String.t() | nil,
session: String.t(),
seq: pos_integer()
}
use DynamicSupervisor

alias Nostrum.Error.CacheError
alias Nostrum.Shard
Expand Down Expand Up @@ -64,21 +83,33 @@ defmodule Nostrum.Shard.Supervisor do
end

def start_link(_args) do
{url, gateway_shard_count} = Util.gateway()
{_url, gateway_shard_count} = Util.gateway()

value = Application.get_env(:nostrum, :num_shards, :auto)
shard_range = cast_shard_range(gateway_shard_count, value)
on_start =
DynamicSupervisor.start_link(
__MODULE__,
nil,
name: __MODULE__
)

Supervisor.start_link(
__MODULE__,
[url, shard_range],
name: __MODULE__
)
case Application.get_env(:nostrum, :num_shards, :auto) do
:manual ->
on_start

value ->
{lowest, highest, total} = cast_shard_range(gateway_shard_count, value)

shard_range = lowest..highest

for num <- shard_range, do: connect(num - 1, total)
end

on_start
end

def update_status(status, game, stream, type) do
__MODULE__
|> Supervisor.which_children()
|> DynamicSupervisor.which_children()
|> Enum.filter(fn {_id, _pid, _type, [modules]} -> modules == Nostrum.Shard end)
|> Enum.map(fn {_id, pid, _type, _modules} -> Supervisor.which_children(pid) end)
|> List.flatten()
Expand All @@ -102,18 +133,55 @@ defmodule Nostrum.Shard.Supervisor do
end

@doc false
def init([url, {lowest, highest, total}]) do
shard_range = lowest..highest
children = for num <- shard_range, do: create_worker(url, num - 1, total)

Supervisor.init(children, strategy: :one_for_one, max_restarts: 3, max_seconds: 60)
def init(_) do
DynamicSupervisor.init(strategy: :one_for_one, max_restarts: 3, max_seconds: 60)
end

@doc false
def create_worker(gateway, shard_num, total) do
Supervisor.child_spec(
{Shard, [gateway, shard_num, total]},
id: shard_num
def create_worker(shard_num, total) do
{gateway, _gateway_shard_count} = Util.gateway()
{Shard, [gateway, shard_num, total]}
end

@doc """
Disconnects the shard with the given shard number from the Gateway.
This function returns `resume_information` given to `Nostrum.Shard.Supervisor.reconnect/1`.
"""
@spec disconnect(shard_num()) :: resume_information()
def disconnect(shard_num) do
:"Nostrum.Shard-#{shard_num}"
|> Supervisor.which_children()
|> Enum.find(fn {id, _pid, _type, _modules} -> id == Nostrum.Shard.Session end)
|> elem(1)
|> Session.disconnect()
end

@doc """
Spawns a shard with the specified number and connects it to the discord gateway.
"""
@spec connect(shard_num(), total_shards()) :: DynamicSupervisor.on_start_child()
def connect(shard_num, total_shards) do
DynamicSupervisor.start_child(__MODULE__, create_worker(shard_num, total_shards))
end

@doc """
Reconnect to the gateway using the given `resume_information`.
For more information about resume, please visit [the Discord Developer Portal](https://discord.com/developers/docs/topics/gateway#resuming).
"""
@spec reconnect(resume_information()) :: DynamicSupervisor.on_start_child()
def reconnect(
%{
shard_num: _shard_num,
total_shards: _total_shards,
gateway: _gateway,
resume_gateway: _resume_gateway,
seq: _seq,
session: _session
} = opts
) do
DynamicSupervisor.start_child(
__MODULE__,
{Shard, {:reconnect, opts}}
)
end
end
1 change: 1 addition & 0 deletions lib/nostrum/util.ex
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ defmodule Nostrum.Util do

case num do
{_lowest, _highest, total} -> total
:manual -> 0
nil -> 1
end
end
Expand Down

0 comments on commit eff410b

Please sign in to comment.