Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure remote node processes and allow for multiple connections #434

Merged
merged 2 commits into from
Jul 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions lib/livebook/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ defmodule Livebook.Application do
LivebookWeb.Telemetry,
# Start the PubSub system
{Phoenix.PubSub, name: Livebook.PubSub},
# Start the our own :standard_error handler (standard error -> group leader)
# This way we can run multiple embedded runtimes without worrying
# about restoring :standard_error to a valid process when terminating
{Livebook.Runtime.ErlDist.IOForwardGL, name: :standard_error},
# Start the supervisor dynamically managing sessions
Livebook.SessionSupervisor,
# Start the server responsible for associating files with sessions
Expand All @@ -29,10 +25,6 @@ defmodule Livebook.Application do
LivebookWeb.Endpoint
]

# Similarly as with :standard_error, we register our backend
# within the Livebook node, specifically for the embedded runtime
Logger.add_backend(Livebook.Runtime.ErlDist.LoggerGLBackend)

opts = [strategy: :one_for_one, name: Livebook.Supervisor]

with {:ok, _} = result <- Supervisor.start_link(children, opts) do
Expand Down
41 changes: 20 additions & 21 deletions lib/livebook/runtime/attached.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,27 @@ defmodule Livebook.Runtime.Attached do
# The node can be an ordinary Elixir runtime,
# a Mix project shell, a running release or anything else.

defstruct [:node, :cookie]
defstruct [:node, :cookie, :server_pid]

@type t :: %__MODULE__{
node: node(),
cookie: atom()
cookie: atom(),
server_pid: pid()
}

@doc """
Checks if the given node is available for use and initializes
it with Livebook-specific modules and processes.
"""
@spec init(node(), atom()) :: {:ok, t()} | {:error, :unreachable | :already_in_use}
@spec init(node(), atom()) :: {:ok, t()} | {:error, :unreachable}
def init(node, cookie \\ Node.get_cookie()) do
# Set cookie for connecting to this specific node
Node.set_cookie(node, cookie)

case Node.ping(node) do
:pong ->
case Livebook.Runtime.ErlDist.initialize(node) do
:ok ->
{:ok, %__MODULE__{node: node, cookie: cookie}}

{:error, :already_in_use} ->
{:error, :already_in_use}
end
server_pid = Livebook.Runtime.ErlDist.initialize(node)
{:ok, %__MODULE__{node: node, cookie: cookie, server_pid: server_pid}}

:pang ->
{:error, :unreachable}
Expand All @@ -45,12 +41,12 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do
alias Livebook.Runtime.ErlDist

def connect(runtime) do
ErlDist.Manager.set_owner(runtime.node, self())
Process.monitor({ErlDist.Manager, runtime.node})
ErlDist.RuntimeServer.set_owner(runtime.server_pid, self())
Process.monitor(runtime.server_pid)
end

def disconnect(runtime) do
ErlDist.Manager.stop(runtime.node)
ErlDist.RuntimeServer.stop(runtime.server_pid)
end

def evaluate_code(
Expand All @@ -61,8 +57,8 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do
prev_evaluation_ref,
opts \\ []
) do
ErlDist.Manager.evaluate_code(
runtime.node,
ErlDist.RuntimeServer.evaluate_code(
runtime.server_pid,
code,
container_ref,
evaluation_ref,
Expand All @@ -72,16 +68,16 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do
end

def forget_evaluation(runtime, container_ref, evaluation_ref) do
ErlDist.Manager.forget_evaluation(runtime.node, container_ref, evaluation_ref)
ErlDist.RuntimeServer.forget_evaluation(runtime.server_pid, container_ref, evaluation_ref)
end

def drop_container(runtime, container_ref) do
ErlDist.Manager.drop_container(runtime.node, container_ref)
ErlDist.RuntimeServer.drop_container(runtime.server_pid, container_ref)
end

def request_completion_items(runtime, send_to, ref, hint, container_ref, evaluation_ref) do
ErlDist.Manager.request_completion_items(
runtime.node,
ErlDist.RuntimeServer.request_completion_items(
runtime.server_pid,
send_to,
ref,
hint,
Expand All @@ -90,7 +86,10 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do
)
end

def duplicate(_runtime) do
{:error, "attached runtime is connected to a specific VM and cannot be duplicated"}
def duplicate(runtime) do
case Livebook.Runtime.Attached.init(runtime.node, runtime.cookie) do
{:ok, runtime} -> {:ok, runtime}
{:error, :unreachable} -> {:error, "node #{inspect(runtime.node)} is unreachable"}
end
end
end
26 changes: 13 additions & 13 deletions lib/livebook/runtime/elixir_standalone.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Livebook.Runtime.ElixirStandalone do
defstruct [:node, :primary_pid]
defstruct [:node, :server_pid]

# A runtime backed by a standalone Elixir node managed by Livebook.
#
Expand All @@ -13,7 +13,7 @@ defmodule Livebook.Runtime.ElixirStandalone do

@type t :: %__MODULE__{
node: node(),
primary_pid: pid()
server_pid: pid()
}

@doc """
Expand All @@ -38,10 +38,10 @@ defmodule Livebook.Runtime.ElixirStandalone do

with {:ok, elixir_path} <- find_elixir_executable(),
port = start_elixir_node(elixir_path, child_node, child_node_eval_string(), argv),
{:ok, primary_pid} <- parent_init_sequence(child_node, port) do
{:ok, server_pid} <- parent_init_sequence(child_node, port) do
runtime = %__MODULE__{
node: child_node,
primary_pid: primary_pid
server_pid: server_pid
}

{:ok, runtime}
Expand Down Expand Up @@ -69,12 +69,12 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do
alias Livebook.Runtime.ErlDist

def connect(runtime) do
ErlDist.Manager.set_owner(runtime.node, self())
Process.monitor({ErlDist.Manager, runtime.node})
ErlDist.RuntimeServer.set_owner(runtime.server_pid, self())
Process.monitor(runtime.server_pid)
end

def disconnect(runtime) do
ErlDist.Manager.stop(runtime.node)
ErlDist.RuntimeServer.stop(runtime.server_pid)
end

def evaluate_code(
Expand All @@ -85,8 +85,8 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do
prev_evaluation_ref,
opts \\ []
) do
ErlDist.Manager.evaluate_code(
runtime.node,
ErlDist.RuntimeServer.evaluate_code(
runtime.server_pid,
code,
container_ref,
evaluation_ref,
Expand All @@ -96,16 +96,16 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do
end

def forget_evaluation(runtime, container_ref, evaluation_ref) do
ErlDist.Manager.forget_evaluation(runtime.node, container_ref, evaluation_ref)
ErlDist.RuntimeServer.forget_evaluation(runtime.server_pid, container_ref, evaluation_ref)
end

def drop_container(runtime, container_ref) do
ErlDist.Manager.drop_container(runtime.node, container_ref)
ErlDist.RuntimeServer.drop_container(runtime.server_pid, container_ref)
end

def request_completion_items(runtime, send_to, ref, hint, container_ref, evaluation_ref) do
ErlDist.Manager.request_completion_items(
runtime.node,
ErlDist.RuntimeServer.request_completion_items(
runtime.server_pid,
send_to,
ref,
hint,
Expand Down
39 changes: 15 additions & 24 deletions lib/livebook/runtime/embedded.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ defmodule Livebook.Runtime.Embedded do
# where there is no option of starting a separate
# Elixir runtime.

defstruct [:node, :manager_pid]
defstruct [:node, :server_pid]

@type t :: %__MODULE__{
node: node(),
manager_pid: pid()
server_pid: pid()
}

alias Livebook.Runtime.ErlDist
Expand All @@ -20,7 +20,7 @@ defmodule Livebook.Runtime.Embedded do
Initializes new runtime by starting the necessary
processes within the current node.
"""
@spec init() :: {:ok, t()} | {:error, :failure}
@spec init() :: {:ok, t()}
def init() do
# As we run in the Livebook node, all the necessary modules
# are in place, so we just start the manager process.
Expand All @@ -33,30 +33,22 @@ defmodule Livebook.Runtime.Embedded do
# We tell manager to not override :standard_error,
# as we already do it for the Livebook application globally
# (see Livebook.Application.start/2).
case ErlDist.Manager.start(
anonymous: true,
cleanup_on_termination: false,
register_standard_error_proxy: false
) do
{:ok, pid} ->
{:ok, %__MODULE__{node: node(), manager_pid: pid}}

_ ->
{:error, :failure}
end
server_pid = ErlDist.initialize(node(), unload_modules_on_termination: false)
{:ok, %__MODULE__{node: node(), server_pid: server_pid}}
end
end

defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do
alias Livebook.Runtime.ErlDist

def connect(runtime) do
ErlDist.Manager.set_owner(runtime.manager_pid, self())
Process.monitor(runtime.manager_pid)
ErlDist.RuntimeServer.set_owner(runtime.server_pid, self())
Process.monitor(runtime.server_pid)
end

def disconnect(runtime) do
ErlDist.Manager.stop(runtime.manager_pid)
ErlDist.RuntimeServer.stop(runtime.server_pid)
end

def evaluate_code(
Expand All @@ -67,8 +59,8 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do
prev_evaluation_ref,
opts \\ []
) do
ErlDist.Manager.evaluate_code(
runtime.manager_pid,
ErlDist.RuntimeServer.evaluate_code(
runtime.server_pid,
code,
container_ref,
evaluation_ref,
Expand All @@ -78,16 +70,16 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do
end

def forget_evaluation(runtime, container_ref, evaluation_ref) do
ErlDist.Manager.forget_evaluation(runtime.manager_pid, container_ref, evaluation_ref)
ErlDist.RuntimeServer.forget_evaluation(runtime.server_pid, container_ref, evaluation_ref)
end

def drop_container(runtime, container_ref) do
ErlDist.Manager.drop_container(runtime.manager_pid, container_ref)
ErlDist.RuntimeServer.drop_container(runtime.server_pid, container_ref)
end

def request_completion_items(runtime, send_to, ref, hint, container_ref, evaluation_ref) do
ErlDist.Manager.request_completion_items(
runtime.manager_pid,
ErlDist.RuntimeServer.request_completion_items(
runtime.server_pid,
send_to,
ref,
hint,
Expand All @@ -97,7 +89,6 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do
end

def duplicate(_runtime) do
{:error,
"embedded runtime is connected to the Livebook application VM and cannot be duplicated"}
Livebook.Runtime.Embedded.init()
end
end
49 changes: 31 additions & 18 deletions lib/livebook/runtime/erl_dist.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ defmodule Livebook.Runtime.ErlDist do
# code evaluation may take place in a separate Elixir runtime,
# which also makes it easy to terminate the whole
# evaluation environment without stopping Livebook.
# This is what both `Runtime.ElixirStandalone` and `Runtime.Attached` do
# and this module contains the shared functionality they need.
# This is what `Runtime.ElixirStandalone`, `Runtime.MixStandalone`
# and `Runtime.Attached` do, so this module contains the shared
# functionality they need.
#
# To work with a separate node, we have to inject the necessary
# Livebook modules there and also start the relevant processes
# related to evaluation. Fortunately Erlang allows us to send modules
# binary representation to the other node and load them dynamically.
#
# For further details see `Livebook.Runtime.ErlDist.NodeManager`.

# Modules to load into the connected node.
@required_modules [
Expand All @@ -23,29 +26,31 @@ defmodule Livebook.Runtime.ErlDist do
Livebook.Evaluator.DefaultFormatter,
Livebook.Completion,
Livebook.Runtime.ErlDist,
Livebook.Runtime.ErlDist.Manager,
Livebook.Runtime.ErlDist.NodeManager,
Livebook.Runtime.ErlDist.RuntimeServer,
Livebook.Runtime.ErlDist.EvaluatorSupervisor,
Livebook.Runtime.ErlDist.IOForwardGL,
Livebook.Runtime.ErlDist.LoggerGLBackend
]

@doc """
Loads the necessary modules into the given node
and starts the primary Livebook remote process.
Starts a runtime server on the given node.

The initialization may be invoked only once on the given
node until its disconnected.
If necessary, the required modules are loaded
into the given node and the node manager process
is started with `node_manager_opts`.
"""
@spec initialize(node()) :: :ok | {:error, :already_in_use}
def initialize(node) do
if initialized?(node) do
{:error, :already_in_use}
else
@spec initialize(node(), keyword()) :: pid()
def initialize(node, node_manager_opts \\ []) do
unless modules_loaded?(node) do
load_required_modules(node)
start_manager(node)
end

:ok
unless node_manager_started?(node) do
start_node_manager(node, node_manager_opts)
end

start_runtime_server(node)
end

defp load_required_modules(node) do
Expand All @@ -55,12 +60,20 @@ defmodule Livebook.Runtime.ErlDist do
end
end

defp start_manager(node) do
:rpc.call(node, Livebook.Runtime.ErlDist.Manager, :start, [])
defp start_node_manager(node, opts) do
:rpc.call(node, Livebook.Runtime.ErlDist.NodeManager, :start, [opts])
end

defp start_runtime_server(node) do
Livebook.Runtime.ErlDist.NodeManager.start_runtime_server(node)
end

defp modules_loaded?(node) do
:rpc.call(node, Code, :ensure_loaded?, [Livebook.Runtime.ErlDist.NodeManager])
end

defp initialized?(node) do
case :rpc.call(node, Process, :whereis, [Livebook.Runtime.ErlDist.Manager]) do
defp node_manager_started?(node) do
case :rpc.call(node, Process, :whereis, [Livebook.Runtime.ErlDist.NodeManager]) do
nil -> false
_pid -> true
end
Expand Down
Loading