Skip to content

Commit

Permalink
start support for object store
Browse files Browse the repository at this point in the history
  • Loading branch information
mmmries committed Jul 1, 2023
1 parent a890140 commit a80f3ef
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 0 deletions.
61 changes: 61 additions & 0 deletions lib/jetstream/api/object.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule Jetstream.API.Object do
@moduledoc """
API for interacting with the JetStream Object Store
Learn more about Object Store: https://docs.nats.io/nats-concepts/jetstream/obj_store
"""
alias Jetstream.API.{Stream}

@stream_prefix "OBJ_"
@subject_prefix "$O."

def create_bucket(conn, bucket_name, params \\ []) do
stream = %Stream{
name: stream_name(bucket_name),
subjects: stream_subjects(bucket_name),
description: Keyword.get(params, :description),
max_msgs_per_subject: Keyword.get(params, :history, 1),
discard: :new,
deny_delete: true,
allow_rollup_hdrs: true,
max_age: Keyword.get(params, :ttl, 0),
max_bytes: Keyword.get(params, :max_bucket_size, -1),
max_msg_size: Keyword.get(params, :max_value_size, -1),
num_replicas: Keyword.get(params, :replicas, 1),
storage: Keyword.get(params, :storage, :file),
placement: Keyword.get(params, :placement),
duplicate_window: adjust_duplicate_window(Keyword.get(params, :ttl, 0))
}

Stream.create(conn, stream)
end

def delete_bucket(conn, bucket_name) do
Stream.delete(conn, stream_name(bucket_name))
end

defp stream_name(bucket_name) do
"#{@stream_prefix}#{bucket_name}"
end

defp stream_subjects(bucket_name) do
[
chunk_stream_subject(bucket_name),
meta_stream_subject(bucket_name)
]
end

defp chunk_stream_subject(bucket_name) do
"#{@subject_prefix}#{bucket_name}.C.>"
end

defp meta_stream_subject(bucket_name) do
"#{@subject_prefix}#{bucket_name}.M.>"
end

@two_minutes_in_nanoseconds 1_200_000_000
# The `duplicate_window` can't be greater than the `max_age`. The default `duplicate_window`
# is 2 minutes. We'll keep the 2 minute window UNLESS the ttl is less than 2 minutes
defp adjust_duplicate_window(ttl) when ttl > 0 and ttl < @two_minutes_in_nanoseconds, do: ttl
defp adjust_duplicate_window(_ttl), do: @two_minutes_in_nanoseconds
end
23 changes: 23 additions & 0 deletions test/jetstream/api/object_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Jetstream.API.ObjectTest do
use Jetstream.ConnCase, min_server_version: "2.6.2"
alias Jetstream.API.Object

@moduletag with_gnat: :gnat

describe "create_bucket/3" do
test "create/delete a bucket" do
assert {:ok, %{config: config}} = Object.create_bucket(:gnat, "MY-STORE")
assert config.name == "OBJ_MY-STORE"
assert config.subjects == [
"$O.MY-STORE.C.>",
"$O.MY-STORE.M.>",
]
assert config.max_age == 0
assert config.max_bytes == -1
assert config.storage == :file
assert config.allow_rollup_hdrs == true

assert :ok = Object.delete_bucket(:gnat, "MY-STORE")
end
end
end

0 comments on commit a80f3ef

Please sign in to comment.