Skip to content

Commit

Permalink
Add t:Broadway.Message.acknowledger/0 + improve docs (#323)
Browse files Browse the repository at this point in the history
  • Loading branch information
whatyouhide authored Apr 5, 2023
1 parent b1f3640 commit 96d6334
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 19 deletions.
5 changes: 4 additions & 1 deletion lib/broadway/acknowledger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ defmodule Broadway.Acknowledger do
different producers. Broadway will use this information
to correctly identify the acknowledger and pass it among
with the messages so you can properly communicate with
the source of the data for acknowledgement.
the source of the data for acknowledgement. `ack_ref` is
part of `t:Broadway.Message.acknowledger/0`.
* `successful` is the list of messages that were
successfully processed and published.
Expand All @@ -51,6 +52,8 @@ defmodule Broadway.Acknowledger do
Note that `options` are different for every acknowledger, as the acknowledger
is what specifies what are the supported options. Check the documentation for the
acknowledger you're using to see the supported options.
`ack_ref` and `ack_data` are part of `t:Broadway.Message.acknowledger/0`.
"""
@callback configure(ack_ref :: term, ack_data :: term, options :: keyword) ::
{:ok, new_ack_data :: term}
Expand Down
29 changes: 21 additions & 8 deletions lib/broadway/caller_acknowledger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,22 @@ defmodule Broadway.CallerAcknowledger do
@moduledoc """
A simple acknowledger that sends a message back to a caller.
It must be stored as:
acknowledger: Broadway.CallerAcknowledger.init({pid, ref}, term)
The first parameter is a tuple with the pid to receive the messages
and a unique identifier (usually a reference). The second parameter,
If you want to use this acknowledger in messages produced by your
`Broadway.Producer`, you can get its configuration by calling
the `init/0` function. For example, you can use it in
`Broadway.test_message/3`:
some_ref = make_ref()
Broadway.test_message(
MyPipeline,
"some data",
acknowledger: Broadway.CallerAcknowledger.init({self(), some_ref}, :ignored)
)
The first parameter is a tuple with the PID to receive the messages
and a unique identifier (usually a reference). Such unique identifier
is then included in the messages sent to the PID. The second parameter,
which is per message, is ignored.
It sends a message in the format:
Expand All @@ -25,9 +35,12 @@ defmodule Broadway.CallerAcknowledger do

@doc """
Returns the acknowledger metadata.
See the module documentation.
"""
def init({pid, ref}, term) do
{__MODULE__, {pid, ref}, term}
@spec init({pid, ref :: term}, ignored_term :: term) :: Broadway.Message.acknowledger()
def init({pid, ref} = _pid_and_ref, ignored_term) when is_pid(pid) do
{__MODULE__, {pid, ref}, ignored_term}
end

@impl true
Expand Down
32 changes: 30 additions & 2 deletions lib/broadway/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,44 @@ defmodule Broadway.Message do
or internally by one of the built-in stages of Broadway.
Instead of modifying the struct directly, you should use the functions
provided by this module to manipulate messages.
provided by this module to manipulate messages. However, if you are implementing
a `Broadway.Producer` of your own, see the [`%Broadway.Message{}`](`__struct__/0`)
documentation to see what fields you should set.
"""

alias __MODULE__, as: Message
alias Broadway.{Acknowledger, NoopAcknowledger}

@typedoc """
The acknowledger of the message.
This tuple contains:
* A module implementing the `Broadway.Acknowledger` behaviour.
* An ack reference that is passed to the `c:Broadway.Acknowledger.ack/3`
callback. See `c:Broadway.Acknowledger.ack/3` for more information.
* An arbitrary term that is passed to the optional
`c:Broadway.Acknowledger.configure/3` callback.
"""
@typedoc since: "1.1.0"
@type acknowledger :: {module, ack_ref :: term, data :: term}

@typedoc """
The Broadway message struct.
Most of these fields are manipulated by Broadway itself. You can
*read* the `:metadata` field, and you can use the functions in this
module to update most of the other fields. If you are implementing
your own producer, see the `Broadway.Producer` documentation
for more information on how to create and manipulate message structs.
"""
@type t :: %Message{
data: term,
metadata: %{optional(atom) => term},
acknowledger: {module, ack_ref :: term, data :: term},
acknowledger: acknowledger,
batcher: atom,
batch_key: term,
batch_mode: :bulk | :flush,
Expand Down
10 changes: 7 additions & 3 deletions lib/broadway/noop_acknowledger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ defmodule Broadway.NoopAcknowledger do
@moduledoc """
An acknowledger that does nothing.
It must be initialized as:
If you want to use this acknowledger in messages produced by your
`Broadway.Producer`, you can get its configuration by calling
the `init/0` function. For example, you can use it in
`Broadway.test_message/3`:
acknowledger: Broadway.NoopAcknowledger.init()
Broadway.test_message(MyPipeline, "some data", acknowledger: Broadway.NoopAcknowledger.init())
Set automatically on messages that have been acked immediately
Broadway sets this acknowledger automatically on messages that have been acked
via `Broadway.Message.ack_immediately/1`.
"""

Expand All @@ -15,6 +18,7 @@ defmodule Broadway.NoopAcknowledger do
@doc """
Returns the acknowledger metadata.
"""
@spec init() :: Broadway.Message.acknowledger()
def init do
{__MODULE__, _ack_ref = nil, _data = nil}
end
Expand Down
27 changes: 27 additions & 0 deletions lib/broadway/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,33 @@ defmodule Broadway.Producer do
A `Broadway.Producer` can implement two optional Broadway callbacks,
`c:prepare_for_start/2` and `c:prepare_for_draining/1`, which are useful
for booting up and shutting down Broadway topologies respectively.
## Producing Broadway messages
You should generally modify `Broadway.Message` structs by using the functions
in the `Broadway.Message` module. However, if you are implementing your
own producer, you **can manipulate** some of the struct's fields directly.
These fields are:
* `:data` (required) - the data of the message. Even though the function
`Broadway.Message.put_data/2` exists, when creating a `%Broadway.Message{}`
struct from scratch you will have to pass in the `:data` field directly.
* `:acknowledger` (required) - the acknowledger of the message, of type
`t:Broadway.Message.acknowledger/0`.
* `:metadata` (optional) - metadata about the message that your producer
can attach to the message. This is useful when you want to add some metadata
to messages, and document it for users to use in their pipelines.
For example, a producer could create a message by doing something like this:
%Broadway.Message{
data: "some data here",
acknowledger: Broadway.NoopAcknowledger.init()
}
"""

@doc """
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ defmodule Broadway.MixProject do
deps: deps(),
docs: docs(),
package: package(),
test_coverage: [tool: ExCoveralls]
test_coverage: [tool: ExCoveralls],
preferred_cli_env: [docs: :docs]
]
end

Expand Down
8 changes: 4 additions & 4 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
%{
"certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"},
"earmark_parser": {:hex, :earmark_parser, "1.4.29", "149d50dcb3a93d9f3d6f3ecf18c918fb5a2d3c001b5d3305c926cddfbd33355b", [:mix], [], "hexpm", "4902af1b3eb139016aed210888748db8070b8125c2342ce3dcae4f38dcc63503"},
"ex_doc": {:hex, :ex_doc, "0.29.1", "b1c652fa5f92ee9cf15c75271168027f92039b3877094290a75abcaac82a9f77", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "b7745fa6374a36daf484e2a2012274950e084815b936b1319aeebcf7809574f6"},
"earmark_parser": {:hex, :earmark_parser, "1.4.31", "a93921cdc6b9b869f519213d5bc79d9e218ba768d7270d46fdcf1c01bacff9e2", [:mix], [], "hexpm", "317d367ee0335ef037a87e46c91a2269fef6306413f731e8ec11fc45a7efd059"},
"ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"},
"excoveralls": {:hex, :excoveralls, "0.14.4", "295498f1ae47bdc6dce59af9a585c381e1aefc63298d48172efaaa90c3d251db", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e3ab02f2df4c1c7a519728a6f0a747e71d7d6e846020aae338173619217931c1"},
"gen_stage": {:hex, :gen_stage, "1.0.0", "51c8ae56ff54f9a2a604ca583798c210ad245f415115453b773b621c49776df5", [:mix], [], "hexpm", "1d9fc978db5305ac54e6f5fec7adf80cd893b1000cf78271564c516aa2af7706"},
"hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~>2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"nimble_options": {:hex, :nimble_options, "0.5.2", "42703307b924880f8c08d97719da7472673391905f528259915782bb346e0a1b", [:mix], [], "hexpm", "4da7f904b915fd71db549bcdc25f8d56f378ef7ae07dc1d372cbe72ba950dce0"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
"nimble_parsec": {:hex, :nimble_parsec, "1.3.0", "9e18a119d9efc3370a3ef2a937bf0b24c088d9c4bf0ba9d7c3751d49d347d035", [:mix], [], "hexpm", "7977f183127a7cbe9346981e2f480dc04c55ffddaef746bd58debd566070eef8"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
Expand Down

0 comments on commit 96d6334

Please sign in to comment.