diff --git a/lib/jetstream/api/object.ex b/lib/jetstream/api/object.ex new file mode 100644 index 0000000..29e890d --- /dev/null +++ b/lib/jetstream/api/object.ex @@ -0,0 +1,61 @@ +defmodule Jetstream.API.Object do + @moduledoc """ + API for interacting with the JetStream Object Store + + Learn more about Object Store: https://docs.nats.io/nats-concepts/jetstream/obj_store + """ + alias Jetstream.API.{Stream} + + @stream_prefix "OBJ_" + @subject_prefix "$O." + + def create_bucket(conn, bucket_name, params \\ []) do + stream = %Stream{ + name: stream_name(bucket_name), + subjects: stream_subjects(bucket_name), + description: Keyword.get(params, :description), + max_msgs_per_subject: Keyword.get(params, :history, 1), + discard: :new, + deny_delete: true, + allow_rollup_hdrs: true, + max_age: Keyword.get(params, :ttl, 0), + max_bytes: Keyword.get(params, :max_bucket_size, -1), + max_msg_size: Keyword.get(params, :max_value_size, -1), + num_replicas: Keyword.get(params, :replicas, 1), + storage: Keyword.get(params, :storage, :file), + placement: Keyword.get(params, :placement), + duplicate_window: adjust_duplicate_window(Keyword.get(params, :ttl, 0)) + } + + Stream.create(conn, stream) + end + + def delete_bucket(conn, bucket_name) do + Stream.delete(conn, stream_name(bucket_name)) + end + + defp stream_name(bucket_name) do + "#{@stream_prefix}#{bucket_name}" + end + + defp stream_subjects(bucket_name) do + [ + chunk_stream_subject(bucket_name), + meta_stream_subject(bucket_name) + ] + end + + defp chunk_stream_subject(bucket_name) do + "#{@subject_prefix}#{bucket_name}.C.>" + end + + defp meta_stream_subject(bucket_name) do + "#{@subject_prefix}#{bucket_name}.M.>" + end + + @two_minutes_in_nanoseconds 1_200_000_000 + # The `duplicate_window` can't be greater than the `max_age`. The default `duplicate_window` + # is 2 minutes. We'll keep the 2 minute window UNLESS the ttl is less than 2 minutes + defp adjust_duplicate_window(ttl) when ttl > 0 and ttl < @two_minutes_in_nanoseconds, do: ttl + defp adjust_duplicate_window(_ttl), do: @two_minutes_in_nanoseconds +end diff --git a/test/jetstream/api/object_test.exs b/test/jetstream/api/object_test.exs new file mode 100644 index 0000000..2c15e4a --- /dev/null +++ b/test/jetstream/api/object_test.exs @@ -0,0 +1,23 @@ +defmodule Jetstream.API.ObjectTest do + use Jetstream.ConnCase, min_server_version: "2.6.2" + alias Jetstream.API.Object + + @moduletag with_gnat: :gnat + + describe "create_bucket/3" do + test "create/delete a bucket" do + assert {:ok, %{config: config}} = Object.create_bucket(:gnat, "MY-STORE") + assert config.name == "OBJ_MY-STORE" + assert config.subjects == [ + "$O.MY-STORE.C.>", + "$O.MY-STORE.M.>", + ] + assert config.max_age == 0 + assert config.max_bytes == -1 + assert config.storage == :file + assert config.allow_rollup_hdrs == true + + assert :ok = Object.delete_bucket(:gnat, "MY-STORE") + end + end +end