Skip to content

Commit

Permalink
fix: various fixes around bulk action upgrades, atomics
Browse files Browse the repository at this point in the history
  • Loading branch information
zachdaniel committed Apr 26, 2024
1 parent 41c951d commit 726c6c3
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 61 deletions.
30 changes: 18 additions & 12 deletions lib/ash/actions/create/create.ex
Original file line number Diff line number Diff line change
Expand Up @@ -358,18 +358,24 @@ defmodule Ash.Actions.Create do
)

true ->
changeset.resource
|> Ash.DataLayer.create(changeset)
|> Helpers.rollback_if_in_transaction(
changeset.resource,
changeset
)
|> add_tenant(changeset)
|> manage_relationships(domain, changeset,
actor: opts[:actor],
authorize?: opts[:authorize?],
upsert?: opts[:upsert?]
)
case Ash.Changeset.handle_allow_nil_atomics(changeset, opts[:actor]) do
%Ash.Changeset{valid?: true} = changeset ->
changeset.resource
|> Ash.DataLayer.create(changeset)
|> Helpers.rollback_if_in_transaction(
changeset.resource,
changeset
)
|> add_tenant(changeset)
|> manage_relationships(domain, changeset,
actor: opts[:actor],
authorize?: opts[:authorize?],
upsert?: opts[:upsert?]
)

%Ash.Changeset{} = changeset ->
{:error, changeset}
end
end
|> case do
{:ok, result, instructions} ->
Expand Down
47 changes: 38 additions & 9 deletions lib/ash/actions/update/bulk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ defmodule Ash.Actions.Update.Bulk do
),
action,
input,
Keyword.merge(opts, resource: query.resource, inputs_was_stream?: false),
Keyword.merge(opts,
resource: query.resource,
strategy: [:stream],
inputs_was_stream?: false
),
reason
)
end
Expand Down Expand Up @@ -397,7 +401,7 @@ defmodule Ash.Actions.Update.Bulk do
{query, atomic_changeset} <-
add_changeset_filters(query, atomic_changeset),
%Ash.Changeset{valid?: true} = atomic_changeset <-
Ash.Changeset.hydrate_atomic_refs(atomic_changeset, opts[:actor], eager?: true),
Ash.Changeset.handle_allow_nil_atomics(atomic_changeset, opts[:actor]),
{:ok, data_layer_query} <-
Ash.Query.data_layer_query(query) do
case Ash.DataLayer.update_query(
Expand Down Expand Up @@ -550,6 +554,18 @@ defmodule Ash.Actions.Update.Bulk do
end
end
else
%Ash.Changeset{valid?: false, errors: error} ->
if Ash.DataLayer.in_transaction?(atomic_changeset.resource) do
Ash.DataLayer.rollback(atomic_changeset.resource, Ash.Error.to_error_class(error))
else
%Ash.BulkResult{
status: :error,
error_count: 1,
notifications: [],
errors: [Ash.Error.to_error_class(error)]
}
end

{:error, %Ash.Error.Forbidden.InitialDataRequired{}} ->
case Ash.Actions.Read.Stream.stream_strategy(
query,
Expand All @@ -576,12 +592,23 @@ defmodule Ash.Actions.Update.Bulk do
}

_strategy ->
input_stream =
if atomic_changeset.context[:data_layer][:use_atomic_update_data?] do
[atomic_changeset.data]
else
query
end

run(
atomic_changeset.domain,
query,
input_stream,
atomic_changeset.action,
input,
Keyword.put(opts, :strategy, [:stream]),
Keyword.merge(opts,
authorize_query?: false,
strategy: [:stream],
inputs_was_stream?: false
),
"authorization requires initial data"
)
end
Expand Down Expand Up @@ -690,11 +717,13 @@ defmodule Ash.Actions.Update.Bulk do
conditional_after_batch_hooks =
all_changes
|> Stream.with_index()
|> Enum.filter(fn {%{change: {module, change_opts}}, _index} ->
function_exported?(module, :after_batch, 3) &&
module.batch_callbacks?(query, change_opts, context)
_ ->
false
|> Enum.filter(fn
{%{change: {module, change_opts}}, _index} ->
function_exported?(module, :after_batch, 3) &&
module.batch_callbacks?(query, change_opts, context)

_ ->
false
end)
|> Enum.reduce(%{}, fn {%{where: where}, index}, acc ->
{:atomic, condition} =
Expand Down
47 changes: 27 additions & 20 deletions lib/ash/actions/update/update.ex
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ defmodule Ash.Actions.Update do
params,
Keyword.merge(opts,
strategy: [:atomic],
resource: atomic_changeset.resource,
authorize_query?: false,
return_records?: true,
atomic_changeset: atomic_changeset,
Expand Down Expand Up @@ -433,27 +434,33 @@ defmodule Ash.Actions.Update do
changeset =
Ash.Changeset.set_defaults(changeset, :update, true)

changeset.resource
|> Ash.DataLayer.update(changeset)
|> case do
{:ok, data} ->
{:ok, %{data | __metadata__: changeset.data.__metadata__}}

{:error, :no_rollback, error} ->
{:error, :no_rollback, error}

{:error, error} ->
{:error, error}
case Ash.Changeset.handle_allow_nil_atomics(changeset, opts[:actor]) do
%Ash.Changeset{valid?: true} = changeset ->
changeset.resource
|> Ash.DataLayer.update(changeset)
|> case do
{:ok, data} ->
{:ok, %{data | __metadata__: changeset.data.__metadata__}}

{:error, :no_rollback, error} ->
{:error, :no_rollback, error}

{:error, error} ->
{:error, error}
end
|> Helpers.rollback_if_in_transaction(
changeset.resource,
changeset
)
|> add_tenant(changeset)
|> manage_relationships(domain, changeset,
actor: opts[:actor],
authorize?: opts[:authorize?]
)

%Ash.Changeset{valid?: false} = changeset ->
{:error, changeset}
end
|> Helpers.rollback_if_in_transaction(
changeset.resource,
changeset
)
|> add_tenant(changeset)
|> manage_relationships(domain, changeset,
actor: opts[:actor],
authorize?: opts[:authorize?]
)

true ->
{:ok, changeset.data}
Expand Down
51 changes: 31 additions & 20 deletions lib/ash/changeset/changeset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1393,26 +1393,6 @@ defmodule Ash.Changeset do

case Ash.Type.cast_atomic(attribute.type, value, attribute.constraints) do
{:atomic, value} ->
value =
if attribute.allow_nil? do
value
else
expr(
if is_nil(^value) do
error(
^Ash.Error.Changes.Required,
%{
field: ^attribute.name,
type: ^:attribute,
resource: ^changeset.resource
}
)
else
^value
end
)
end

value = set_error_field(value, attribute.name)

%{changeset | atomics: Keyword.put(changeset.atomics, key, value)}
Expand All @@ -1426,6 +1406,37 @@ defmodule Ash.Changeset do
end
end

@doc false
def handle_allow_nil_atomics(changeset, actor) do
changeset.atomics
|> Enum.reduce(changeset, fn {key, value}, changeset ->
attribute = Ash.Resource.Info.attribute(changeset.resource, key)

value =
if attribute.allow_nil? do
value
else
expr(
if is_nil(^value) do
error(
^Ash.Error.Changes.Required,
%{
field: ^attribute.name,
type: ^:attribute,
resource: ^changeset.resource
}
)
else
^value
end
)
end

%{changeset | atomics: Keyword.put(changeset.atomics, key, value)}
end)
|> Ash.Changeset.hydrate_atomic_refs(actor, eager?: true)
end

@doc """
Set the result of the action. This will prevent running the underlying datalayer behavior
"""
Expand Down

0 comments on commit 726c6c3

Please sign in to comment.