From 8d72c6165512e48ef6daeac1dab46b8c057246d4 Mon Sep 17 00:00:00 2001 From: Michael Ries Date: Tue, 4 Jul 2023 20:46:52 +0100 Subject: [PATCH] support deleting an object from a bucket --- lib/jetstream/api/object.ex | 26 +++++++++++++++++++--- lib/jetstream/api/object/meta.ex | 22 ++++++++++++++++++- lib/jetstream/api/stream.ex | 27 +++++++++++++++++++++++ test/jetstream/api/object_test.exs | 35 +++++++++++++++++++++++++++--- 4 files changed, 103 insertions(+), 7 deletions(-) diff --git a/lib/jetstream/api/object.ex b/lib/jetstream/api/object.ex index 7b22ee7..a5bb77d 100644 --- a/lib/jetstream/api/object.ex +++ b/lib/jetstream/api/object.ex @@ -35,6 +35,17 @@ defmodule Jetstream.API.Object do Stream.delete(conn, stream_name(bucket_name)) end + def delete_object(conn, bucket_name, object_name) do + with {:ok, meta} <- info(conn, bucket_name, object_name), + meta <- %Meta{meta | deleted: true}, + topic <- meta_stream_topic(bucket_name, object_name), + {:ok, body} <- Jason.encode(meta), + {:ok, _msg} <- Gnat.request(conn, topic, body, headers: [{"Nats-Rollup", "sub"}]) do + filter = chunk_stream_topic(meta) + Stream.purge(conn, stream_name(bucket_name), %{filter: filter}) + end + end + def get_object(conn, bucket_name, object_name, chunk_fun) do with {:ok, %{config: _stream}} <- Stream.info(conn, stream_name(bucket_name)), {:ok, meta} <- info(conn, bucket_name, object_name) do @@ -58,7 +69,7 @@ defmodule Jetstream.API.Object do end end - def list_objects(conn, bucket_name) do + def list_objects(conn, bucket_name, options \\ []) do with {:ok, %{config: stream}} <- Stream.info(conn, stream_name(bucket_name)), topic <- Util.reply_inbox(), {:ok, sub} <- Gnat.sub(conn, self(), topic), @@ -77,7 +88,13 @@ defmodule Jetstream.API.Object do :ok = Gnat.unsub(conn, sub) :ok = Consumer.delete(conn, stream.name, consumer.name) - {:ok, messages} + show_deleted = Keyword.get(options, :show_deleted, false) + + if show_deleted do + {:ok, messages} + else + {:ok, Enum.reject(messages, &(&1.deleted == true))} + end end end @@ -150,6 +167,8 @@ defmodule Jetstream.API.Object do defp adjust_duplicate_window(_ttl), do: @two_minutes_in_nanoseconds defp json_to_meta(json) do + raw = Jason.decode!(json) + %{ "bucket" => bucket, "chunks" => chunks, @@ -157,12 +176,13 @@ defmodule Jetstream.API.Object do "name" => name, "nuid" => nuid, "size" => size - } = Jason.decode!(json) + } = raw %Meta{ bucket: bucket, chunks: chunks, digest: digest, + deleted: Map.get(raw, "deleted", false), name: name, nuid: nuid, size: size diff --git a/lib/jetstream/api/object/meta.ex b/lib/jetstream/api/object/meta.ex index 7123fd3..5aca9cf 100644 --- a/lib/jetstream/api/object/meta.ex +++ b/lib/jetstream/api/object/meta.ex @@ -1,5 +1,25 @@ defmodule Jetstream.API.Object.Meta do @derive Jason.Encoder @enforce_keys [:bucket, :chunks, :digest, :name, :nuid, :size] - defstruct [:bucket, :chunks, :digest, :name, :nuid, :size] + defstruct bucket: nil, + chunks: nil, + deleted: false, + digest: nil, + name: nil, + nuid: nil, + size: nil +end + +defimpl Jason.Encoder, for: Jetstream.API.Object.Meta do + alias Jetstream.API.Object.Meta + + def encode(%Meta{deleted: true} = meta, opts) do + Map.take(meta, [:bucket, :chunks, :deleted, :digest, :name, :nuid, :size]) + |> Jason.Encode.map(opts) + end + + def encode(meta, opts) do + Map.take(meta, [:bucket, :chunks, :digest, :name, :nuid, :size]) + |> Jason.Encode.map(opts) + end end diff --git a/lib/jetstream/api/stream.ex b/lib/jetstream/api/stream.ex index ec0a25e..f926012 100644 --- a/lib/jetstream/api/stream.ex +++ b/lib/jetstream/api/stream.ex @@ -323,6 +323,25 @@ defmodule Jetstream.API.Stream do end end + @doc """ + Purges some of the messages in a stream. + + ## Examples + + iex> Jetstream.API.Stream.create(:gnat, %Jetstream.API.Stream{name: "stream", subjects: ["sub1", "sub2"]}) + iex> Jetstream.API.Stream.purge(:gnat, "stream", %{filter: "sub1"}) + :ok + + """ + @spec purge(conn :: Gnat.t(), stream_name :: binary()) :: :ok | {:error, any()} + def purge(conn, stream_name, method) when is_binary(stream_name) do + with :ok <- validate_purge_method(method), + body <- Jason.encode!(method), + {:ok, _response} <- request(conn, "$JS.API.STREAM.PURGE.#{stream_name}", body) do + :ok + end + end + @doc """ Information about config and state of a Stream. @@ -487,4 +506,12 @@ defmodule Jetstream.API.Stream do {:error, "To get a message you must use only one of `seq` or `last_by_subj`"} end end + + defp validate_purge_method(%{filter: subject}) when is_binary(subject) do + :ok + end + + defp validate_purge_method(_) do + {:error, "When purging, you must pass a %{filter: subject}"} + end end diff --git a/test/jetstream/api/object_test.exs b/test/jetstream/api/object_test.exs index cf5229f..4013ab3 100644 --- a/test/jetstream/api/object_test.exs +++ b/test/jetstream/api/object_test.exs @@ -1,6 +1,6 @@ defmodule Jetstream.API.ObjectTest do use Jetstream.ConnCase, min_server_version: "2.6.2" - alias Jetstream.API.Object + alias Jetstream.API.{Object, Stream} import Jetstream.API.Util, only: [nuid: 0] @moduletag with_gnat: :gnat @@ -37,6 +37,25 @@ defmodule Jetstream.API.ObjectTest do end end + describe "delete_object/3" do + test "delete an object" do + bucket = nuid() + assert {:ok, %{config: _config}} = Object.create_bucket(:gnat, bucket) + {:ok, _} = put_filepath(@readme_path, bucket, "README.md") + {:ok, _} = put_filepath(@readme_path, bucket, "OTHER.md") + assert :ok = Object.delete_object(:gnat, bucket, "README.md") + + assert {:ok, objects} = Object.list_objects(:gnat, bucket) + assert Enum.count(objects) == 1 + assert Enum.map(objects, & &1.name) == ["OTHER.md"] + assert {:ok, objects} = Object.list_objects(:gnat, bucket, show_deleted: true) + assert Enum.count(objects) == 2 + assert Enum.map(objects, & &1.name) |> Enum.sort() == ["OTHER.md", "README.md"] + + assert :ok = Object.delete_bucket(:gnat, bucket) + end + end + describe "get_object/4" do test "retrieves and object chunk-by-chunk" do nuid = Jetstream.API.Util.nuid() @@ -51,7 +70,7 @@ defmodule Jetstream.API.ObjectTest do assert_received :got_chunk - Object.delete_bucket(:gnat, nuid) + :ok = Object.delete_bucket(:gnat, nuid) end end @@ -73,6 +92,7 @@ defmodule Jetstream.API.ObjectTest do bucket = nuid() assert {:ok, %{config: _config}} = Object.create_bucket(:gnat, bucket) assert {:ok, []} = Object.list_objects(:gnat, bucket) + assert :ok = Object.delete_bucket(:gnat, bucket) end test "list a bucket with two files" do @@ -88,6 +108,8 @@ defmodule Jetstream.API.ObjectTest do assert readme.name == "README.md" assert readme.size == something.size assert readme.digest == something.digest + + assert :ok = Object.delete_bucket(:gnat, bucket) end end @@ -128,11 +150,18 @@ defmodule Jetstream.API.ObjectTest do Process.put(:buffer, Process.get(:buffer) <> chunk) end) + {:ok, %{state: state}} = Stream.info(:gnat, "OBJ_#{bucket}") + assert state.bytes > 2 * 1024 * 1024 + file_contents = Process.get(:buffer) assert byte_size(file_contents) == meta.size assert :crypto.hash(:sha256, file_contents) == sha - assert :ok = Object.delete_bucket(:gnat, bucket) + + assert :ok = Object.delete_object(:gnat, bucket, "big") + {:ok, %{state: state}} = Stream.info(:gnat, "OBJ_#{bucket}") + assert state.bytes < 1024 + :ok = Object.delete_bucket(:gnat, bucket) end # create a random 2MB binary file