Skip to content

Commit

Permalink
Merge pull request #12 from commanded/feature/error-callback
Browse files Browse the repository at this point in the history
Support Commanded's event handler `error/3` callback
  • Loading branch information
slashdotdash authored Jul 22, 2018
2 parents 5b3d81f + f6e00d7 commit 6c45cef
Show file tree
Hide file tree
Showing 12 changed files with 349 additions and 97 deletions.
14 changes: 1 addition & 13 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,20 +1,8 @@
# The directory Mix will write compiled artifacts to.
/_build

# If you run "mix test --cover", coverage assets end up here.
/cover

# The directory Mix downloads your dependencies sources to.
/deps

# Where 3rd-party dependencies like ExDoc output generated docs.
/doc

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez
.DS_Store
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
language: elixir

elixir:
- 1.5.1
- 1.6.5

otp_release:
- 20.0
- 20.3

services:
- postgresql
Expand Down
52 changes: 49 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ MIT License
- [Schema Prefix](#schema-prefix)
- [Usage](#usage)
- [Supervision](#supervision)
- [`after_update` callback](#after_update-callback)
- [`error/3` callback](#error-callback)
- [`after_update/3` callback](#after_update-callback)
- [Rebuilding a projection](#rebuilding-a-projection)
- [Contributing](#contributing)
- [Need help?](#need-help)
Expand Down Expand Up @@ -173,13 +174,58 @@ defmodule MyApp.Projections.Supervisor do
end
```

### `after_update` callback
### `error/3` callback

The `Commanded.Projections.Ecto` macro defines a Commanded event handler which means you can take advantage of the [`error/3` callback function](https://github.com/commanded/commanded/blob/f12a677ea70484e4f52159509897cdcdbf5c53b2/guides/Events.md#error3-callback) to handle any errors returned from a `project` function. The function is passed the error returned by the event handler (e.g. `{:error, error}`), the event causing the error, and a context map containing state passed between retries. Use the context map to track any transient state you need to access between retried failures, such as the number of failed attempts.

You can return one of the following responses depending upon the error severity:

- `{:retry, context}` - retry the failed event, provide a context map containing any state passed to subsequent failures. This could be used to count the number of failures, stopping after too many.

- `{:retry, delay, context}` - retry the failed event, after sleeping for the requested delay (in milliseconds). Context is a map as described in `{:retry, context}` above.

- `:skip` - skip the failed event by acknowledging receipt.

- `{:stop, reason}` - stop the projector with the given reason.

#### Error handling example

Here's an example projector module where an error tagged tuple is explicitly returned from a `project` function, but you can also handle exceptions caused by faulty `Ecto.Multi` database operations in a similar manner since the errors are caught and returned as tagged tuples (e.g. `{:error, %Ecto.ConstraintError{}}`).

```elixir
defmodule MyApp.ExampleProjector do
use Commanded.Projections.Ecto, name: "MyApp.ExampleProjector"

require Logger

alias Commanded.Event.FailureContext

project %AnEvent{} do
{:error, :failed}
end

def error({:error, :failed} = error, %AnEvent{}, %FailureContext{}) do
:skip
end

def error({:error, %Ecto.ConstraintError{} = error}, _event, _failure_context) do
Logger.error(fn -> "Failed due to constraint error: " <> inspect(error) end)
:skip
end

def error({:error, _error} = error, _event, _failure_context) do
:skip
end
end
```

### `after_update/3` callback

You can define an `after_update/3` function in a projector to be called after each projected event. It receives the event, its associated metadata, and all changes from `Ecto.Multi` executed in the database transaction.

```elixir
defmodule MyApp.ExampleProjector do
use Commanded.Projections.Ecto, name: "example_projection"
use Commanded.Projections.Ecto, name: "MyApp.ExampleProjector"

project %AnEvent{name: name} do
Ecto.Multi.insert(multi, :example_projection, %ExampleProjection{name: name})
Expand Down
5 changes: 3 additions & 2 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use Mix.Config
# Print only warnings and errors during test
config :logger, :console, level: :warn, format: "[$level] $message\n"

config :ex_unit,
capture_log: true
config :ex_unit, capture_log: true

config :commanded, event_store_adapter: Commanded.EventStore.Adapters.InMemory

config :commanded_ecto_projections,
ecto_repos: [Commanded.Projections.Repo],
Expand Down
70 changes: 45 additions & 25 deletions lib/projections/ecto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@ defmodule Commanded.Projections.Ecto do

defmacro __using__(opts) do
opts = opts || []
schema_prefix = opts[:schema_prefix] ||
Application.get_env(:commanded_ecto_projections, :schema_prefix)

schema_prefix =
opts[:schema_prefix] || Application.get_env(:commanded_ecto_projections, :schema_prefix)

quote location: :keep do
@opts unquote(opts)
@repo @opts[:repo] ||
Application.get_env(:commanded_ecto_projections, :repo) ||
raise "Commanded Ecto projections expects :repo to be configured in environment"
@projection_name @opts[:name] || raise "#{inspect __MODULE__} expects :name to be given"
@repo @opts[:repo] || Application.get_env(:commanded_ecto_projections, :repo) ||
raise("Commanded Ecto projections expects :repo to be configured in environment")
@projection_name @opts[:name] || raise("#{inspect(__MODULE__)} expects :name to be given")
@schema_prefix unquote(schema_prefix)
@timeout @opts[:timeout] || :infinity

# pass through any other configuration to the event handler
# Pass through any other configuration to the event handler
@handler_opts Keyword.drop(@opts, [:repo, :schema_prefix, :timeout])

unquote __include_projection_version_schema__(schema_prefix)
unquote(__include_projection_version_schema__(schema_prefix))

use Ecto.Schema
use Commanded.Event.Handler, @handler_opts
Expand All @@ -49,40 +49,60 @@ defmodule Commanded.Projections.Ecto do
import unquote(__MODULE__)

def update_projection(event, %{event_number: event_number} = metadata, multi_fn) do
changeset =
ProjectionVersion.changeset(%ProjectionVersion{projection_name: @projection_name}, %{
last_seen_event_number: event_number
})

multi =
Ecto.Multi.new()
|> Ecto.Multi.run(:verify_projection_version, fn _ ->
version = case @repo.get(ProjectionVersion, @projection_name) do
nil -> @repo.insert!(%ProjectionVersion{projection_name: @projection_name, last_seen_event_number: 0})
version -> version
end

if version.last_seen_event_number == nil || version.last_seen_event_number < event_number do
version =
case @repo.get(ProjectionVersion, @projection_name) do
nil ->
@repo.insert!(%ProjectionVersion{
projection_name: @projection_name,
last_seen_event_number: 0
})

version ->
version
end

if version.last_seen_event_number == nil ||
version.last_seen_event_number < event_number do
{:ok, %{version: version}}
else
{:error, :already_seen_event}
end
end)
|> Ecto.Multi.update(
:projection_version,
ProjectionVersion.changeset(%ProjectionVersion{projection_name: @projection_name}, %{last_seen_event_number: event_number}),
[prefix: unquote(schema_prefix)]
changeset,
prefix: unquote(schema_prefix)
)

multi = apply(multi_fn, [multi])

case @repo.transaction(multi, timeout: @timeout, pool_timeout: @timeout) do
{:ok, changes} -> after_update(event, metadata, changes)
with %Ecto.Multi{} = multi <- apply(multi_fn, [multi]),
{:ok, changes} <- attempt_transaction(multi) do
after_update(event, metadata, changes)
else
{:error, :verify_projection_version, :already_seen_event, _changes} -> :ok
{:error, stage, reason, _changes} -> {:error, reason}
{:error, _stage, error, _changes} -> {:error, error}
{:error, error} -> {:error, error}
end
end

def after_update(_event, _metadata, _changes), do: :ok

defoverridable [
after_update: 3,
]
defoverridable after_update: 3

defp attempt_transaction(multi) do
try do
@repo.transaction(multi, timeout: @timeout, pool_timeout: @timeout)
rescue
e -> {:error, e}
end
end
end
end

Expand All @@ -99,7 +119,7 @@ defmodule Commanded.Projections.Ecto do
@schema_prefix unquote(prefix)

schema "projection_versions" do
field :last_seen_event_number, :integer
field(:last_seen_event_number, :integer)

timestamps()
end
Expand Down
39 changes: 23 additions & 16 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ defmodule Commanded.Projections.Ecto.Mixfile do
app: :commanded_ecto_projections,
version: @version,
elixir: "~> 1.4",
elixirc_paths: elixirc_paths(Mix.env),
elixirc_paths: elixirc_paths(Mix.env()),
description: description(),
package: package(),
build_embedded: Mix.env == :prod,
start_permanent: Mix.env == :prod,
build_embedded: Mix.env() == :prod,
start_permanent: Mix.env() == :prod,
deps: deps(),
docs: docs(),
docs: docs()
]
end

def application do
[
extra_applications: extra_applications(Mix.env),
extra_applications: extra_applications(Mix.env())
]
end

Expand All @@ -30,11 +30,13 @@ defmodule Commanded.Projections.Ecto.Mixfile do
:logger,
:ecto,
:postgrex,
# :commanded
]
end

defp extra_applications(_) do
[
:logger,
:logger
]
end

Expand All @@ -44,37 +46,42 @@ defmodule Commanded.Projections.Ecto.Mixfile do
defp deps do
[
{:commanded, ">= 0.12.0", runtime: false},
{:ecto, "~> 2.1", runtime: false},
{:ecto, "~> 2.2", runtime: false},
{:ex_doc, ">= 0.0.0", only: :dev},
{:postgrex, "~> 0.13", only: :test},
{:mix_test_watch, "~> 0.4", only: :dev, runtime: false},
{:mix_test_watch, "~> 0.6", only: :dev, runtime: false}
]
end

defp description do
"""
Read model projections for Commanded using Ecto
"""
"""
Read model projections for Commanded using Ecto
"""
end

defp docs do
[
main: "Commanded.Projections.Ecto",
canonical: "http://hexdocs.pm/commanded_ecto_projections",
source_ref: "v#{@version}",
source_ref: "v#{@version}"
]
end

defp package do
[
files: [
"lib", "mix.exs", "README*", "LICENSE*",
"priv/repo/migrations",
"lib",
"mix.exs",
"README*",
"LICENSE*",
"priv/repo/migrations"
],
maintainers: ["Ben Smith"],
licenses: ["MIT"],
links: %{"GitHub" => "https://github.com/commanded/commanded-ecto-projections",
"Docs" => "https://hexdocs.pm/commanded_ecto_projections/"}
links: %{
"GitHub" => "https://github.com/commanded/commanded-ecto-projections",
"Docs" => "https://hexdocs.pm/commanded_ecto_projections/"
}
]
end
end
21 changes: 12 additions & 9 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
%{"commanded": {:hex, :commanded, "0.12.0", "3e5f4d5614f7a74378ef94b5585eb87ab618c2e743e978950021201fafa3cc47", [:mix], [{:poison, "~> 3.1", [hex: :poison, repo: "hexpm", optional: false]}, {:uuid, "~> 1.1", [hex: :uuid, repo: "hexpm", optional: false]}], "hexpm"},
%{
"commanded": {:hex, :commanded, "0.16.0", "10ab39952faf4af8bc0b077cce870f343e5e695b1e8cced4f425db66f651b048", [:mix], [{:phoenix_pubsub, "~> 1.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: true]}, {:poison, "~> 3.1", [hex: :poison, repo: "hexpm", optional: false]}, {:uuid, "~> 1.1", [hex: :uuid, repo: "hexpm", optional: false]}], "hexpm"},
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm"},
"db_connection": {:hex, :db_connection, "1.1.2", "2865c2a4bae0714e2213a0ce60a1b12d76a6efba0c51fbda59c9ab8d1accc7a8", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}, {:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, repo: "hexpm", optional: true]}], "hexpm"},
"decimal": {:hex, :decimal, "1.4.0", "fac965ce71a46aab53d3a6ce45662806bdd708a4a95a65cde8a12eb0124a1333", [:mix], [], "hexpm"},
"earmark": {:hex, :earmark, "1.2.3", "206eb2e2ac1a794aa5256f3982de7a76bf4579ff91cb28d0e17ea2c9491e46a4", [:mix], [], "hexpm"},
"ecto": {:hex, :ecto, "2.2.4", "defde3c8eca385bd86466d2e1491d19e77f9b79ad996dc8e89e4e107f3942f40", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: true]}, {:decimal, "~> 1.2", [hex: :decimal, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.8.0", [hex: :mariaex, repo: "hexpm", optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: true]}, {:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.13.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, repo: "hexpm", optional: true]}], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.16.4", "4bf6b82d4f0a643b500366ed7134896e8cccdbab4d1a7a35524951b25b1ec9f0", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"db_connection": {:hex, :db_connection, "1.1.3", "89b30ca1ef0a3b469b1c779579590688561d586694a3ce8792985d4d7e575a61", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}, {:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, repo: "hexpm", optional: true]}], "hexpm"},
"decimal": {:hex, :decimal, "1.5.0", "b0433a36d0e2430e3d50291b1c65f53c37d56f83665b43d79963684865beab68", [:mix], [], "hexpm"},
"earmark": {:hex, :earmark, "1.2.5", "4d21980d5d2862a2e13ec3c49ad9ad783ffc7ca5769cf6ff891a4553fbaae761", [:mix], [], "hexpm"},
"ecto": {:hex, :ecto, "2.2.10", "e7366dc82f48f8dd78fcbf3ab50985ceeb11cb3dc93435147c6e13f2cda0992e", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: true]}, {:decimal, "~> 1.2", [hex: :decimal, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.8.0", [hex: :mariaex, repo: "hexpm", optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: true]}, {:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.13.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, repo: "hexpm", optional: true]}], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.18.4", "4406b8891cecf1352f49975c6d554e62e4341ceb41b9338949077b0d4a97b949", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"file_system": {:hex, :file_system, "0.2.6", "fd4dc3af89b9ab1dc8ccbcc214a0e60c41f34be251d9307920748a14bf41f1d3", [:mix], [], "hexpm"},
"fs": {:hex, :fs, "0.9.2", "ed17036c26c3f70ac49781ed9220a50c36775c6ca2cf8182d123b6566e49ec59", [:rebar], [], "hexpm"},
"mix_test_watch": {:hex, :mix_test_watch, "0.5.0", "2c322d119a4795c3431380fca2bca5afa4dc07324bd3c0b9f6b2efbdd99f5ed3", [:mix], [{:fs, "~> 0.9.1", [hex: :fs, repo: "hexpm", optional: false]}], "hexpm"},
"mix_test_watch": {:hex, :mix_test_watch, "0.6.0", "5e206ed04860555a455de2983937efd3ce79f42bd8536fc6b900cc286f5bb830", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
"poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], [], "hexpm"},
"postgrex": {:hex, :postgrex, "0.13.3", "c277cfb2a9c5034d445a722494c13359e361d344ef6f25d604c2353185682bfc", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm"},
"uuid": {:hex, :uuid, "1.1.7", "007afd58273bc0bc7f849c3bdc763e2f8124e83b957e515368c498b641f7ab69", [:mix], [], "hexpm"}}
"postgrex": {:hex, :postgrex, "0.13.5", "3d931aba29363e1443da167a4b12f06dcd171103c424de15e5f3fc2ba3e6d9c5", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm"},
"uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm"},
}
6 changes: 4 additions & 2 deletions test/projections/after_update_callback_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ defmodule Commanded.Projections.AfterUpdateCallbackTest do

alias Commanded.Projections.Repo

defmodule AnEvent, do: defstruct [name: "AnEvent", pid: nil]
defmodule AnEvent do
defstruct name: "AnEvent", pid: nil
end

defmodule Projection do
use Ecto.Schema

schema "projections" do
field :name, :string
field(:name, :string)
end
end

Expand Down
Loading

0 comments on commit 6c45cef

Please sign in to comment.