Skip to content

Commit

Permalink
Persistent storage (#937)
Browse files Browse the repository at this point in the history
Start moving filesystems as an initial implementation.
  • Loading branch information
Qizot authored Jan 30, 2022
1 parent 849acd8 commit 86e4034
Show file tree
Hide file tree
Showing 11 changed files with 274 additions and 93 deletions.
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,6 @@ The following environment variables configure Livebook:
"attached:NODE:COOKIE" (Attached node) or "embedded" (Embedded).
Defaults to "standalone".

* LIVEBOOK_FILE_SYSTEM_1, LIVEBOOK_FILE_SYSTEM_2, ... - configures additional
file systems. Each variable should hold a configuration string, which must
be of the form: "s3 BUCKET_URL ACCESS_KEY_ID SECRET_ACCESS_KEY".

* LIVEBOOK_IP - sets the ip address to start the web application on.
Must be a valid IPv4 or IPv6 address.

Expand Down
3 changes: 3 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ config :mime, :types, %{
"text/plain" => ["livemd"]
}

# Sets the default storage backend
config :livebook, :storage, Livebook.Storage.Ets

# Sets the default authentication mode to token
config :livebook, :authentication_mode, :token

Expand Down
3 changes: 1 addition & 2 deletions lib/livebook.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ defmodule Livebook do
|> Livebook.FileSystem.Utils.ensure_dir_path()

local_file_system = Livebook.FileSystem.Local.new(default_path: root_path)
configured_file_systems = Livebook.Config.file_systems!("LIVEBOOK_FILE_SYSTEM_")

config :livebook, :file_systems, [local_file_system | configured_file_systems]
config :livebook, :default_file_systems, [local_file_system]

autosave_path =
if config_env() == :test do
Expand Down
5 changes: 4 additions & 1 deletion lib/livebook/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defmodule Livebook.Application do
LivebookWeb.Telemetry,
# Start the PubSub system
{Phoenix.PubSub, name: Livebook.PubSub},
# Start the storage module
Livebook.Storage.current(),
# Periodid measurement of system resources
Livebook.SystemResources,
# Start the tracker server on this node
Expand All @@ -31,7 +33,8 @@ defmodule Livebook.Application do
# Start the Endpoint (http/https)
# We skip the access url as we do our own logging below
{LivebookWeb.Endpoint, log_access_url: false}
] ++ app_specs()
] ++
app_specs()

opts = [strategy: :one_for_one, name: Livebook.Supervisor]

Expand Down
81 changes: 28 additions & 53 deletions lib/livebook/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,37 @@ defmodule Livebook.Config do
"""
@spec file_systems() :: list(FileSystem.t())
def file_systems() do
Application.fetch_env!(:livebook, :file_systems)
Application.fetch_env!(:livebook, :default_file_systems) ++
Enum.map(storage().all(:filesystem), &storage_to_fs/1)
end

@doc """
Appends a new file system to the configured ones.
"""
@spec append_file_system(FileSystem.t()) :: list(FileSystem.t())
def append_file_system(file_system) do
file_systems = Enum.uniq(file_systems() ++ [file_system])
Application.put_env(:livebook, :file_systems, file_systems, persistent: true)
file_systems
def append_file_system(%FileSystem.S3{} = file_system) do
attributes =
file_system
|> FileSystem.S3.to_config()
|> Map.to_list()

storage().insert(:filesystem, generate_filesystem_id(), [{:type, "s3"} | attributes])

file_systems()
end

@doc """
Removes the given file system from the configured ones.
"""
@spec remove_file_system(FileSystem.t()) :: list(FileSystem.t())
def remove_file_system(file_system) do
file_systems = List.delete(file_systems(), file_system)
Application.put_env(:livebook, :file_systems, file_systems, persistent: true)
file_systems
storage().all(:filesystem)
|> Enum.find(&(storage_to_fs(&1) == file_system))
|> case do
%{id: id} -> storage().delete(:filesystem, id)
end

file_systems()
end

@doc """
Expand Down Expand Up @@ -316,59 +326,24 @@ defmodule Livebook.Config do
}
end

@doc """
Parses file systems list.
Appends subsequent numbers to the given env prefix (starting from 1)
and parses the env variables until `nil` is encountered.
"""
def file_systems!(env_prefix) do
Stream.iterate(1, &(&1 + 1))
|> Stream.map(fn n ->
env = env_prefix <> Integer.to_string(n)
System.get_env(env)
end)
|> Stream.take_while(& &1)
|> Enum.map(&parse_file_system!/1)
defp storage() do
Livebook.Storage.current()
end

defp parse_file_system!(string) do
case string do
"s3 " <> config ->
FileSystem.S3.from_config_string(config)
defp storage_to_fs(%{type: "s3"} = config) do
case FileSystem.S3.from_config(config) do
{:ok, fs} ->
fs

_ ->
{:error, message} ->
abort!(
~s{unrecognised file system, expected "s3 BUCKET_URL ACCESS_KEY_ID SECRET_ACCESS_KEY", got: #{inspect(string)}}
~s{unrecognised file system, expected "s3 BUCKET_URL ACCESS_KEY_ID SECRET_ACCESS_KEY", got: #{inspect(message)}}
)
end
|> case do
{:ok, file_system} -> file_system
{:error, message} -> abort!(message)
end
end

@doc """
Returns environment variables configuration corresponding
to the given file systems.
The first (default) file system is ignored.
"""
def file_systems_as_env(file_systems)

def file_systems_as_env([_ | additional_file_systems]) do
additional_file_systems
|> Enum.with_index(1)
|> Enum.map(fn {file_system, n} ->
config = file_system_to_config_string(file_system)
["LIVEBOOK_FILE_SYSTEM_", Integer.to_string(n), "=", ?", config, ?"]
end)
|> Enum.intersperse(" ")
|> IO.iodata_to_binary()
end

defp file_system_to_config_string(%FileSystem.S3{} = file_system) do
["s3 ", FileSystem.S3.to_config_string(file_system)]
defp generate_filesystem_id() do
:crypto.strong_rand_bytes(6) |> Base.url_encode64()
end

@doc """
Expand Down
34 changes: 15 additions & 19 deletions lib/livebook/file_system/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,27 @@ defmodule Livebook.FileSystem.S3 do
end

@doc """
Parses file system from a configuration string.
The expected format is `"BUCKET_URL ACCESS_KEY_ID SECRET_ACCESS_KEY"`.
## Examples
Livebook.FileSystem.S3.from_config_string("https://s3.eu-central-1.amazonaws.com/mybucket myaccesskeyid mysecret")
Parses file system from a configuration map.
"""
@spec from_config_string(String.t()) :: {:ok, t()} | {:error, String.t()}
def from_config_string(string) do
case String.split(string) do
[bucket_url, access_key_id, secret_access_key] ->
@spec from_config(map()) :: {:ok, t()} | {:error, String.t()}
def from_config(config) do
case config do
%{
bucket_url: bucket_url,
access_key_id: access_key_id,
secret_access_key: secret_access_key
} ->
{:ok, new(bucket_url, access_key_id, secret_access_key)}

args ->
{:error, "S3 filesystem configuration expects 3 arguments, but got #{length(args)}"}
_config ->
{:error,
"S3 filesystem config is expected to have 3 arguments: 'bucket_url', 'access_key_id' and 'secret_access_key', but got #{inspect(config)}"}
end
end

@doc """
Formats the given file system into an equivalent configuration string.
"""
@spec to_config_string(t()) :: String.t()
def to_config_string(file_system) do
"#{file_system.bucket_url} #{file_system.access_key_id} #{file_system.secret_access_key}"
@spec to_config(t()) :: map()
def to_config(%__MODULE__{} = s3) do
Map.take(s3, [:bucket_url, :access_key_id, :secret_access_key])
end
end

Expand Down
45 changes: 45 additions & 0 deletions lib/livebook/storage.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
defmodule Livebook.Storage do
@moduledoc """
Behaviour defining an interface for storing arbitrary data in
[Entity-Attribute-Value](https://en.wikipedia.org/wiki/Entity%E2%80%93attribute%E2%80%93value_model) fashion.
"""

@type namespace :: atom()
@type entity_id :: binary()
@type attribute :: atom()
@type value :: binary()
@type timestamp :: non_neg_integer()

@type entity :: %{required(:id) => entity_id(), optional(attribute()) => value()}

@doc """
Returns a map identified by `entity_id` in `namespace`.
fetch(:filesystem, "rand-id")
#=> {:ok, %{id: "rand-id", type: "s3", bucket_url: "/...", secret: "abc", access_key: "xyz"}}
"""
@callback fetch(namespace(), entity_id()) :: {:ok, entity()} | :error

@doc """
Returns all values in namespace.
all(:filesystem)
[%{id: "rand-id", type: "s3", bucket_url: "/...", secret: "abc", access_key: "xyz"}]
"""
@callback all(namespace()) :: [entity()]

@doc """
Inserts given list of attribute-value paris to a entity belonging to specified namespace.
"""
@callback insert(namespace(), entity_id(), [{attribute(), value()}]) :: :ok

@doc """
Deletes an entity of given id from given namespace.
"""
@callback delete(namespace(), entity_id()) :: :ok

@spec current() :: module()
def current(), do: Application.fetch_env!(:livebook, :storage)
end
100 changes: 100 additions & 0 deletions lib/livebook/storage/ets.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
defmodule Livebook.Storage.Ets do
@moduledoc """
Ets implementation of `Livebook.Storage` behaviour.
The module is supposed to be started just once as it
is responsible for managing a named ets table.
`insert` and `delete` operations are supposed to be called using a GenServer
while all the lookups can be performed by directly accessing the named table.
"""
@behaviour Livebook.Storage

@table_name __MODULE__

use GenServer

@impl Livebook.Storage
def fetch(namespace, entity_id) do
@table_name
|> :ets.lookup({namespace, entity_id})
|> case do
[] ->
:error

entries ->
entries
|> Enum.map(fn {_key, attr, val, _timestamp} -> {attr, val} end)
|> Map.new()
|> Map.put(:id, entity_id)
|> then(&{:ok, &1})
end
end

@impl Livebook.Storage
def all(namespace) do
@table_name
|> :ets.match({{namespace, :"$1"}, :"$2", :"$3", :_})
|> Enum.group_by(
fn [entity_id, _attr, _val] -> entity_id end,
fn [_id, attr, val] -> {attr, val} end
)
|> Enum.map(fn {entity_id, attributes} ->
attributes
|> Map.new()
|> Map.put(:id, entity_id)
end)
end

@impl Livebook.Storage
def insert(namespace, entity_id, attributes) do
GenServer.call(__MODULE__, {:insert, namespace, entity_id, attributes})
end

@impl Livebook.Storage
def delete(namespace, entity_id) do
GenServer.call(__MODULE__, {:delete, namespace, entity_id})
end

@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@impl GenServer
def init(_opts) do
table = :ets.new(@table_name, [:named_table, :protected, :duplicate_bag])

{:ok, %{table: table}}
end

@impl GenServer
def handle_call({:insert, namespace, entity_id, attributes}, _from, %{table: table} = state) do
match_head = {{namespace, entity_id}, :"$1", :_, :_}

guards =
Enum.map(attributes, fn {key, _val} ->
{:==, :"$1", key}
end)

:ets.select_delete(table, [{match_head, guards, [true]}])

timestamp = System.os_time(:millisecond)

attributes =
Enum.map(attributes, fn {attr, val} ->
{{namespace, entity_id}, attr, val, timestamp}
end)

:ets.insert(table, attributes)

{:reply, :ok, state}
end

@impl GenServer
def handle_call({:delete, namespace, entity_id}, _from, %{table: table} = state) do
:ets.delete(table, {namespace, entity_id})

{:reply, :ok, state}
end
end
2 changes: 1 addition & 1 deletion lib/livebook_cli/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ defmodule LivebookCLI.Server do
|> Livebook.FileSystem.Utils.ensure_dir_path()

local_file_system = Livebook.FileSystem.Local.new(default_path: root_path)
opts_to_config(opts, [{:livebook, :file_systems, [local_file_system]} | config])
opts_to_config(opts, [{:livebook, :default_file_systems, [local_file_system]} | config])
end

defp opts_to_config([{:sname, sname} | opts], config) do
Expand Down
Loading

0 comments on commit 86e4034

Please sign in to comment.