Skip to content

Commit

Permalink
Merge pull request #83 from mmmries/del_object
Browse files Browse the repository at this point in the history
support deleting an object from a bucket
  • Loading branch information
mmmries authored Jul 4, 2023
2 parents 707c87a + 8d72c61 commit 4d20b0e
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 7 deletions.
26 changes: 23 additions & 3 deletions lib/jetstream/api/object.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ defmodule Jetstream.API.Object do
Stream.delete(conn, stream_name(bucket_name))
end

def delete_object(conn, bucket_name, object_name) do
with {:ok, meta} <- info(conn, bucket_name, object_name),
meta <- %Meta{meta | deleted: true},
topic <- meta_stream_topic(bucket_name, object_name),
{:ok, body} <- Jason.encode(meta),
{:ok, _msg} <- Gnat.request(conn, topic, body, headers: [{"Nats-Rollup", "sub"}]) do
filter = chunk_stream_topic(meta)
Stream.purge(conn, stream_name(bucket_name), %{filter: filter})
end
end

def get_object(conn, bucket_name, object_name, chunk_fun) do
with {:ok, %{config: _stream}} <- Stream.info(conn, stream_name(bucket_name)),
{:ok, meta} <- info(conn, bucket_name, object_name) do
Expand All @@ -58,7 +69,7 @@ defmodule Jetstream.API.Object do
end
end

def list_objects(conn, bucket_name) do
def list_objects(conn, bucket_name, options \\ []) do
with {:ok, %{config: stream}} <- Stream.info(conn, stream_name(bucket_name)),
topic <- Util.reply_inbox(),
{:ok, sub} <- Gnat.sub(conn, self(), topic),
Expand All @@ -77,7 +88,13 @@ defmodule Jetstream.API.Object do
:ok = Gnat.unsub(conn, sub)
:ok = Consumer.delete(conn, stream.name, consumer.name)

{:ok, messages}
show_deleted = Keyword.get(options, :show_deleted, false)

if show_deleted do
{:ok, messages}
else
{:ok, Enum.reject(messages, &(&1.deleted == true))}
end
end
end

Expand Down Expand Up @@ -150,19 +167,22 @@ defmodule Jetstream.API.Object do
defp adjust_duplicate_window(_ttl), do: @two_minutes_in_nanoseconds

defp json_to_meta(json) do
raw = Jason.decode!(json)

%{
"bucket" => bucket,
"chunks" => chunks,
"digest" => digest,
"name" => name,
"nuid" => nuid,
"size" => size
} = Jason.decode!(json)
} = raw

%Meta{
bucket: bucket,
chunks: chunks,
digest: digest,
deleted: Map.get(raw, "deleted", false),
name: name,
nuid: nuid,
size: size
Expand Down
22 changes: 21 additions & 1 deletion lib/jetstream/api/object/meta.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
defmodule Jetstream.API.Object.Meta do
@derive Jason.Encoder
@enforce_keys [:bucket, :chunks, :digest, :name, :nuid, :size]
defstruct [:bucket, :chunks, :digest, :name, :nuid, :size]
defstruct bucket: nil,
chunks: nil,
deleted: false,
digest: nil,
name: nil,
nuid: nil,
size: nil
end

defimpl Jason.Encoder, for: Jetstream.API.Object.Meta do
alias Jetstream.API.Object.Meta

def encode(%Meta{deleted: true} = meta, opts) do
Map.take(meta, [:bucket, :chunks, :deleted, :digest, :name, :nuid, :size])
|> Jason.Encode.map(opts)
end

def encode(meta, opts) do
Map.take(meta, [:bucket, :chunks, :digest, :name, :nuid, :size])
|> Jason.Encode.map(opts)
end
end
27 changes: 27 additions & 0 deletions lib/jetstream/api/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,25 @@ defmodule Jetstream.API.Stream do
end
end

@doc """
Purges some of the messages in a stream.
## Examples
iex> Jetstream.API.Stream.create(:gnat, %Jetstream.API.Stream{name: "stream", subjects: ["sub1", "sub2"]})
iex> Jetstream.API.Stream.purge(:gnat, "stream", %{filter: "sub1"})
:ok
"""
@spec purge(conn :: Gnat.t(), stream_name :: binary()) :: :ok | {:error, any()}
def purge(conn, stream_name, method) when is_binary(stream_name) do
with :ok <- validate_purge_method(method),
body <- Jason.encode!(method),
{:ok, _response} <- request(conn, "$JS.API.STREAM.PURGE.#{stream_name}", body) do
:ok
end
end

@doc """
Information about config and state of a Stream.
Expand Down Expand Up @@ -487,4 +506,12 @@ defmodule Jetstream.API.Stream do
{:error, "To get a message you must use only one of `seq` or `last_by_subj`"}
end
end

defp validate_purge_method(%{filter: subject}) when is_binary(subject) do
:ok
end

defp validate_purge_method(_) do
{:error, "When purging, you must pass a %{filter: subject}"}
end
end
35 changes: 32 additions & 3 deletions test/jetstream/api/object_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Jetstream.API.ObjectTest do
use Jetstream.ConnCase, min_server_version: "2.6.2"
alias Jetstream.API.Object
alias Jetstream.API.{Object, Stream}
import Jetstream.API.Util, only: [nuid: 0]

@moduletag with_gnat: :gnat
Expand Down Expand Up @@ -37,6 +37,25 @@ defmodule Jetstream.API.ObjectTest do
end
end

describe "delete_object/3" do
test "delete an object" do
bucket = nuid()
assert {:ok, %{config: _config}} = Object.create_bucket(:gnat, bucket)
{:ok, _} = put_filepath(@readme_path, bucket, "README.md")
{:ok, _} = put_filepath(@readme_path, bucket, "OTHER.md")
assert :ok = Object.delete_object(:gnat, bucket, "README.md")

assert {:ok, objects} = Object.list_objects(:gnat, bucket)
assert Enum.count(objects) == 1
assert Enum.map(objects, & &1.name) == ["OTHER.md"]
assert {:ok, objects} = Object.list_objects(:gnat, bucket, show_deleted: true)
assert Enum.count(objects) == 2
assert Enum.map(objects, & &1.name) |> Enum.sort() == ["OTHER.md", "README.md"]

assert :ok = Object.delete_bucket(:gnat, bucket)
end
end

describe "get_object/4" do
test "retrieves and object chunk-by-chunk" do
nuid = Jetstream.API.Util.nuid()
Expand All @@ -51,7 +70,7 @@ defmodule Jetstream.API.ObjectTest do

assert_received :got_chunk

Object.delete_bucket(:gnat, nuid)
:ok = Object.delete_bucket(:gnat, nuid)
end
end

Expand All @@ -73,6 +92,7 @@ defmodule Jetstream.API.ObjectTest do
bucket = nuid()
assert {:ok, %{config: _config}} = Object.create_bucket(:gnat, bucket)
assert {:ok, []} = Object.list_objects(:gnat, bucket)
assert :ok = Object.delete_bucket(:gnat, bucket)
end

test "list a bucket with two files" do
Expand All @@ -88,6 +108,8 @@ defmodule Jetstream.API.ObjectTest do
assert readme.name == "README.md"
assert readme.size == something.size
assert readme.digest == something.digest

assert :ok = Object.delete_bucket(:gnat, bucket)
end
end

Expand Down Expand Up @@ -128,11 +150,18 @@ defmodule Jetstream.API.ObjectTest do
Process.put(:buffer, Process.get(:buffer) <> chunk)
end)

{:ok, %{state: state}} = Stream.info(:gnat, "OBJ_#{bucket}")
assert state.bytes > 2 * 1024 * 1024

file_contents = Process.get(:buffer)

assert byte_size(file_contents) == meta.size
assert :crypto.hash(:sha256, file_contents) == sha
assert :ok = Object.delete_bucket(:gnat, bucket)

assert :ok = Object.delete_object(:gnat, bucket, "big")
{:ok, %{state: state}} = Stream.info(:gnat, "OBJ_#{bucket}")
assert state.bytes < 1024
:ok = Object.delete_bucket(:gnat, bucket)
end

# create a random 2MB binary file
Expand Down

0 comments on commit 4d20b0e

Please sign in to comment.