Skip to content

Commit

Permalink
Merge pull request #1111 from novaugust/me/async-stream
Browse files Browse the repository at this point in the history
Improve usage of Tasks
  • Loading branch information
rrrene authored Jan 17, 2024
2 parents 4694882 + 91dc1d6 commit 41d7f20
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 55 deletions.
3 changes: 2 additions & 1 deletion lib/credo/check.ex
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ defmodule Credo.Check do
source_files
|> Task.async_stream(fn source -> run_on_source_file(exec, source, params) end,
max_concurrency: exec.max_concurrent_check_runs,
timeout: :infinity
timeout: :infinity,
ordered: false
)
|> Stream.run()

Expand Down
14 changes: 10 additions & 4 deletions lib/credo/check/consistency/collector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,11 @@ defmodule Credo.Check.Consistency.Collector do
) do
frequencies_per_source_file =
source_files
|> Enum.map(&Task.async(fn -> {&1, collector.collect_matches(&1, params)} end))
|> Enum.map(&Task.await(&1, :infinity))
|> Task.async_stream(&{&1, collector.collect_matches(&1, params)},
timeout: :infinity,
ordered: false
)
|> Enum.map(fn {:ok, frequencies} -> frequencies end)

frequencies = total_frequencies(frequencies_per_source_file)

Expand All @@ -167,8 +170,11 @@ defmodule Credo.Check.Consistency.Collector do
result =
frequencies_per_source_file
|> source_files_with_issues(most_frequent_match)
|> Enum.map(&Task.async(fn -> issue_formatter.(most_frequent_match, &1, params) end))
|> Enum.flat_map(&Task.await(&1, :infinity))
|> Task.async_stream(&issue_formatter.(most_frequent_match, &1, params),
timeout: :infinity,
ordered: false
)
|> Enum.flat_map(fn {:ok, issue} -> issue end)

result
else
Expand Down
33 changes: 12 additions & 21 deletions lib/credo/check/design/duplicated_code.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,12 @@ defmodule Credo.Check.Design.DuplicatedCode do
defp append_issues_via_issue_service(found_hashes, source_files, nodes_threshold, params, exec)
when is_map(found_hashes) do
found_hashes
|> Enum.map(
&Task.async(fn ->
do_append_issues_via_issue_service(
&1,
source_files,
nodes_threshold,
params,
exec
)
end)
|> Task.async_stream(
&do_append_issues_via_issue_service(&1, source_files, nodes_threshold, params, exec),
timeout: :infinity,
ordered: false
)
|> Enum.map(&Task.await(&1, :infinity))
|> Stream.run()
end

defp do_append_issues_via_issue_service(
Expand Down Expand Up @@ -94,14 +88,14 @@ defmodule Credo.Check.Design.DuplicatedCode do
end

defp duplicate_nodes(source_files, mass_threshold) do
chunked_nodes =
nodes =
source_files
|> Enum.chunk_every(30)
|> Enum.map(&Task.async(fn -> calculate_hashes_for_chunk(&1, mass_threshold) end))
|> Enum.map(&Task.await(&1, :infinity))

nodes =
Enum.reduce(chunked_nodes, %{}, fn current_hashes, existing_hashes ->
|> Task.async_stream(&calculate_hashes_for_chunk(&1, mass_threshold),
timeout: :infinity,
ordered: false
)
|> Enum.reduce(%{}, fn {:ok, current_hashes}, existing_hashes ->
Map.merge(existing_hashes, current_hashes, fn _hash, node_items1, node_items2 ->
node_items1 ++ node_items2
end)
Expand Down Expand Up @@ -203,10 +197,7 @@ defmodule Credo.Check.Design.DuplicatedCode do
else
hash = ast |> Credo.Code.remove_metadata() |> to_hash
node_item = %{node: ast, filename: filename, mass: nil}
node_items = Map.get(existing_hashes, hash, [])

updated_hashes = Map.put(existing_hashes, hash, node_items ++ [node_item])

updated_hashes = Map.update(existing_hashes, hash, [node_item], &[node_item | &1])
{ast, updated_hashes}
end
end
Expand Down
7 changes: 1 addition & 6 deletions lib/credo/check/runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@ defmodule Credo.Check.Runner do
|> fix_deprecated_notation_for_checks_without_params()

check_tuples
|> Task.async_stream(
fn check_tuple ->
run_check(exec, check_tuple)
end,
timeout: :infinity
)
|> Task.async_stream(&run_check(exec, &1), timeout: :infinity, ordered: false)
|> Stream.run()

:ok
Expand Down
7 changes: 2 additions & 5 deletions lib/credo/cli/output/summary.ex
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,10 @@ defmodule Credo.CLI.Output.Summary do
Credo.Code.prewalk(source_file, &scope_count_traverse/2, 0)
end

defp scope_count([]), do: 0

defp scope_count(source_files) when is_list(source_files) do
source_files
|> Enum.map(&Task.async(fn -> scope_count(&1) end))
|> Enum.map(&Task.await/1)
|> Enum.reduce(&(&1 + &2))
|> Task.async_stream(&scope_count/1, ordered: false)
|> Enum.reduce(0, fn {:ok, n}, sum -> n + sum end)
end

@def_ops [:defmodule, :def, :defp, :defmacro]
Expand Down
24 changes: 6 additions & 18 deletions lib/credo/sources.ex
Original file line number Diff line number Diff line change
Expand Up @@ -170,24 +170,12 @@ defmodule Credo.Sources do
end

defp read_files(filenames, parse_timeout) do
tasks = Enum.map(filenames, &Task.async(fn -> to_source_file(&1) end))

task_dictionary =
tasks
|> Enum.zip(filenames)
|> Enum.into(%{})

tasks_with_results = Task.yield_many(tasks, parse_timeout)

results =
Enum.map(tasks_with_results, fn {task, res} ->
# Shutdown the tasks that did not reply nor exit
{task, res || Task.shutdown(task, :brutal_kill)}
end)

Enum.map(results, fn
{_task, {:ok, value}} -> value
{task, nil} -> SourceFile.timed_out(task_dictionary[task])
filenames
|> Task.async_stream(&to_source_file/1, timeout: parse_timeout, on_timeout: :kill_task)
|> Stream.zip(filenames)
|> Enum.map(fn
{{:exit, :timeout}, filename} -> SourceFile.timed_out(filename)
{{:ok, value}, _} -> value
end)
end

Expand Down

0 comments on commit 41d7f20

Please sign in to comment.