From 96d6334d79f9b17cde783609b47e7df4791d7f4b Mon Sep 17 00:00:00 2001 From: Andrea Leopardi Date: Wed, 5 Apr 2023 19:55:35 +0200 Subject: [PATCH] Add t:Broadway.Message.acknowledger/0 + improve docs (#323) --- lib/broadway/acknowledger.ex | 5 ++++- lib/broadway/caller_acknowledger.ex | 29 ++++++++++++++++++-------- lib/broadway/message.ex | 32 +++++++++++++++++++++++++++-- lib/broadway/noop_acknowledger.ex | 10 ++++++--- lib/broadway/producer.ex | 27 ++++++++++++++++++++++++ mix.exs | 3 ++- mix.lock | 8 ++++---- 7 files changed, 95 insertions(+), 19 deletions(-) diff --git a/lib/broadway/acknowledger.ex b/lib/broadway/acknowledger.ex index d71971b..3a945da 100644 --- a/lib/broadway/acknowledger.ex +++ b/lib/broadway/acknowledger.ex @@ -28,7 +28,8 @@ defmodule Broadway.Acknowledger do different producers. Broadway will use this information to correctly identify the acknowledger and pass it among with the messages so you can properly communicate with - the source of the data for acknowledgement. + the source of the data for acknowledgement. `ack_ref` is + part of `t:Broadway.Message.acknowledger/0`. * `successful` is the list of messages that were successfully processed and published. @@ -51,6 +52,8 @@ defmodule Broadway.Acknowledger do Note that `options` are different for every acknowledger, as the acknowledger is what specifies what are the supported options. Check the documentation for the acknowledger you're using to see the supported options. + + `ack_ref` and `ack_data` are part of `t:Broadway.Message.acknowledger/0`. """ @callback configure(ack_ref :: term, ack_data :: term, options :: keyword) :: {:ok, new_ack_data :: term} diff --git a/lib/broadway/caller_acknowledger.ex b/lib/broadway/caller_acknowledger.ex index ebd9ee8..2388a85 100644 --- a/lib/broadway/caller_acknowledger.ex +++ b/lib/broadway/caller_acknowledger.ex @@ -2,12 +2,22 @@ defmodule Broadway.CallerAcknowledger do @moduledoc """ A simple acknowledger that sends a message back to a caller. - It must be stored as: - - acknowledger: Broadway.CallerAcknowledger.init({pid, ref}, term) - - The first parameter is a tuple with the pid to receive the messages - and a unique identifier (usually a reference). The second parameter, + If you want to use this acknowledger in messages produced by your + `Broadway.Producer`, you can get its configuration by calling + the `init/0` function. For example, you can use it in + `Broadway.test_message/3`: + + some_ref = make_ref() + + Broadway.test_message( + MyPipeline, + "some data", + acknowledger: Broadway.CallerAcknowledger.init({self(), some_ref}, :ignored) + ) + + The first parameter is a tuple with the PID to receive the messages + and a unique identifier (usually a reference). Such unique identifier + is then included in the messages sent to the PID. The second parameter, which is per message, is ignored. It sends a message in the format: @@ -25,9 +35,12 @@ defmodule Broadway.CallerAcknowledger do @doc """ Returns the acknowledger metadata. + + See the module documentation. """ - def init({pid, ref}, term) do - {__MODULE__, {pid, ref}, term} + @spec init({pid, ref :: term}, ignored_term :: term) :: Broadway.Message.acknowledger() + def init({pid, ref} = _pid_and_ref, ignored_term) when is_pid(pid) do + {__MODULE__, {pid, ref}, ignored_term} end @impl true diff --git a/lib/broadway/message.ex b/lib/broadway/message.ex index a1301c1..111b3bf 100644 --- a/lib/broadway/message.ex +++ b/lib/broadway/message.ex @@ -9,16 +9,44 @@ defmodule Broadway.Message do or internally by one of the built-in stages of Broadway. Instead of modifying the struct directly, you should use the functions - provided by this module to manipulate messages. + provided by this module to manipulate messages. However, if you are implementing + a `Broadway.Producer` of your own, see the [`%Broadway.Message{}`](`__struct__/0`) + documentation to see what fields you should set. """ alias __MODULE__, as: Message alias Broadway.{Acknowledger, NoopAcknowledger} + @typedoc """ + The acknowledger of the message. + + This tuple contains: + + * A module implementing the `Broadway.Acknowledger` behaviour. + + * An ack reference that is passed to the `c:Broadway.Acknowledger.ack/3` + callback. See `c:Broadway.Acknowledger.ack/3` for more information. + + * An arbitrary term that is passed to the optional + `c:Broadway.Acknowledger.configure/3` callback. + + """ + @typedoc since: "1.1.0" + @type acknowledger :: {module, ack_ref :: term, data :: term} + + @typedoc """ + The Broadway message struct. + + Most of these fields are manipulated by Broadway itself. You can + *read* the `:metadata` field, and you can use the functions in this + module to update most of the other fields. If you are implementing + your own producer, see the `Broadway.Producer` documentation + for more information on how to create and manipulate message structs. + """ @type t :: %Message{ data: term, metadata: %{optional(atom) => term}, - acknowledger: {module, ack_ref :: term, data :: term}, + acknowledger: acknowledger, batcher: atom, batch_key: term, batch_mode: :bulk | :flush, diff --git a/lib/broadway/noop_acknowledger.ex b/lib/broadway/noop_acknowledger.ex index 39807ed..791c69e 100644 --- a/lib/broadway/noop_acknowledger.ex +++ b/lib/broadway/noop_acknowledger.ex @@ -2,11 +2,14 @@ defmodule Broadway.NoopAcknowledger do @moduledoc """ An acknowledger that does nothing. - It must be initialized as: + If you want to use this acknowledger in messages produced by your + `Broadway.Producer`, you can get its configuration by calling + the `init/0` function. For example, you can use it in + `Broadway.test_message/3`: - acknowledger: Broadway.NoopAcknowledger.init() + Broadway.test_message(MyPipeline, "some data", acknowledger: Broadway.NoopAcknowledger.init()) - Set automatically on messages that have been acked immediately + Broadway sets this acknowledger automatically on messages that have been acked via `Broadway.Message.ack_immediately/1`. """ @@ -15,6 +18,7 @@ defmodule Broadway.NoopAcknowledger do @doc """ Returns the acknowledger metadata. """ + @spec init() :: Broadway.Message.acknowledger() def init do {__MODULE__, _ack_ref = nil, _data = nil} end diff --git a/lib/broadway/producer.ex b/lib/broadway/producer.ex index b1aca35..b0ef77b 100644 --- a/lib/broadway/producer.ex +++ b/lib/broadway/producer.ex @@ -40,6 +40,33 @@ defmodule Broadway.Producer do A `Broadway.Producer` can implement two optional Broadway callbacks, `c:prepare_for_start/2` and `c:prepare_for_draining/1`, which are useful for booting up and shutting down Broadway topologies respectively. + + ## Producing Broadway messages + + You should generally modify `Broadway.Message` structs by using the functions + in the `Broadway.Message` module. However, if you are implementing your + own producer, you **can manipulate** some of the struct's fields directly. + + These fields are: + + * `:data` (required) - the data of the message. Even though the function + `Broadway.Message.put_data/2` exists, when creating a `%Broadway.Message{}` + struct from scratch you will have to pass in the `:data` field directly. + + * `:acknowledger` (required) - the acknowledger of the message, of type + `t:Broadway.Message.acknowledger/0`. + + * `:metadata` (optional) - metadata about the message that your producer + can attach to the message. This is useful when you want to add some metadata + to messages, and document it for users to use in their pipelines. + + For example, a producer could create a message by doing something like this: + + %Broadway.Message{ + data: "some data here", + acknowledger: Broadway.NoopAcknowledger.init() + } + """ @doc """ diff --git a/mix.exs b/mix.exs index ad3a5ab..66edb7c 100644 --- a/mix.exs +++ b/mix.exs @@ -14,7 +14,8 @@ defmodule Broadway.MixProject do deps: deps(), docs: docs(), package: package(), - test_coverage: [tool: ExCoveralls] + test_coverage: [tool: ExCoveralls], + preferred_cli_env: [docs: :docs] ] end diff --git a/mix.lock b/mix.lock index c6ec84d..f30d150 100644 --- a/mix.lock +++ b/mix.lock @@ -1,19 +1,19 @@ %{ "certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.29", "149d50dcb3a93d9f3d6f3ecf18c918fb5a2d3c001b5d3305c926cddfbd33355b", [:mix], [], "hexpm", "4902af1b3eb139016aed210888748db8070b8125c2342ce3dcae4f38dcc63503"}, - "ex_doc": {:hex, :ex_doc, "0.29.1", "b1c652fa5f92ee9cf15c75271168027f92039b3877094290a75abcaac82a9f77", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "b7745fa6374a36daf484e2a2012274950e084815b936b1319aeebcf7809574f6"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.31", "a93921cdc6b9b869f519213d5bc79d9e218ba768d7270d46fdcf1c01bacff9e2", [:mix], [], "hexpm", "317d367ee0335ef037a87e46c91a2269fef6306413f731e8ec11fc45a7efd059"}, + "ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"}, "excoveralls": {:hex, :excoveralls, "0.14.4", "295498f1ae47bdc6dce59af9a585c381e1aefc63298d48172efaaa90c3d251db", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e3ab02f2df4c1c7a519728a6f0a747e71d7d6e846020aae338173619217931c1"}, "gen_stage": {:hex, :gen_stage, "1.0.0", "51c8ae56ff54f9a2a604ca583798c210ad245f415115453b773b621c49776df5", [:mix], [], "hexpm", "1d9fc978db5305ac54e6f5fec7adf80cd893b1000cf78271564c516aa2af7706"}, "hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~>2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "nimble_options": {:hex, :nimble_options, "0.5.2", "42703307b924880f8c08d97719da7472673391905f528259915782bb346e0a1b", [:mix], [], "hexpm", "4da7f904b915fd71db549bcdc25f8d56f378ef7ae07dc1d372cbe72ba950dce0"}, - "nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.3.0", "9e18a119d9efc3370a3ef2a937bf0b24c088d9c4bf0ba9d7c3751d49d347d035", [:mix], [], "hexpm", "7977f183127a7cbe9346981e2f480dc04c55ffddaef746bd58debd566070eef8"}, "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},