Skip to content

Commit

Permalink
Merge pull request #7 from annatel/refacto
Browse files Browse the repository at this point in the history
breaking change: queuetopia configuration
  • Loading branch information
Laetitia Ohayon authored Dec 10, 2020
2 parents a8ae451 + 23bc25d commit 972ebd6
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 54 deletions.
17 changes: 14 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ The package can be installed by adding `queuetopia` to your list of dependencies
```elixir
def deps do
[
{:queuetopia, "~> 0.6.1"}
{:queuetopia, "~> 1.0.0"}
]
end
```
Expand Down Expand Up @@ -117,7 +117,7 @@ defmodule MyApp do

def start(_type, _args) do
children = [
{MyApp.MailQueue, [[polling_interval: 1_000]]}
MyApp.MailQueue
]
Supervisor.start_link(children, strategy: :one_for_one)
end
Expand All @@ -127,7 +127,18 @@ end
Or, it can be started directly like this:

```elixir
MyApp.MailQueue.start_link([poll_interval: 1_000])
MyApp.MailQueue.start_link()
```

The configuration can be set as below:

```elixir
# config/config.exs
config :my_app, MyApp.MailQueue,
poll_interval: 60 * 1_000,
repoll_after_job_performed?: true,
disable?: true

```

Note that the polling interval is optionnal.
Expand Down
44 changes: 28 additions & 16 deletions lib/queuetopia.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ defmodule Queuetopia do
defmodule MyApp.MailQueue do
use Queuetopia,
repo: MyApp.Repo,
performer: MyApp.MailQueue.Performer
otp_app: :my_app,
performer: MyApp.MailQueue.Performer,
repo: MyApp.Repo
end
# config/config.exs
config :my_app, MyApp.MailQueue,
poll_interval: 60 * 1_000,
repoll_after_job_performed?: true,
disable?: true
"""

@doc """
Expand All @@ -52,36 +60,40 @@ defmodule Queuetopia do
@behaviour Queuetopia

use Supervisor

alias Queuetopia.Jobs.Job

@typedoc "Option values used by the `start*` functions"
@type option :: {:poll_interval, non_neg_integer()}

@otp_app Keyword.fetch!(opts, :otp_app)
@repo Keyword.fetch!(opts, :repo)
@performer Keyword.fetch!(opts, :performer) |> to_string()
@scope __MODULE__ |> to_string()

@default_poll_interval 60 * 1_000

defp config(otp_app, queue) when is_atom(otp_app) and is_atom(queue) do
config = Application.get_env(otp_app, queue, [])
[otp_app: otp_app] ++ config
end

@doc """
Starts the Queuetopia supervisor process.
The :poll_interval can also be given in order to config the polling interval of the scheduler.
"""
@spec start_link([option()]) :: Supervisor.on_start()
def start_link(opts \\ []) do
poll_interval = Keyword.get(opts, :poll_interval, @default_poll_interval)
repoll_after_job_performed? = Keyword.get(opts, :repoll_after_job_performed?, false)

Supervisor.start_link(
__MODULE__,
[
repo: @repo,
poll_interval: poll_interval,
repoll_after_job_performed?: repoll_after_job_performed?
],
name: __MODULE__
)
config = config(@otp_app, __MODULE__)
poll_interval = Keyword.get(config, :poll_interval, @default_poll_interval)
repoll_after_job_performed? = Keyword.get(config, :repoll_after_job_performed?, false)
disable? = Keyword.get(config, :disable?, false)

opts = [
repo: @repo,
poll_interval: poll_interval,
repoll_after_job_performed?: repoll_after_job_performed?
]

if disable?, do: :ignore, else: Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end

@impl true
Expand Down
19 changes: 19 additions & 0 deletions lib/queuetopia/test/assertions.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Queuetopia.Test.Assertions do
import ExUnit.Assertions
alias Queuetopia.Jobs.Job

@doc """
Asserts the job has juste been created
"""

def assert_job_created(queue, %{} = params) do
repo = queue.repo()

job =
Job
|> Ecto.Query.last()
|> repo.one()

refute is_nil(job) && assert(^params = job)
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Queuetopia.MixProject do
use Mix.Project

@source_url "https://github.com/annatel/queuetopia"
@version "0.6.3"
@version "1.0.0"

def project do
[
Expand Down
5 changes: 3 additions & 2 deletions test/queuetopia/jobs/jobs_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Queuetopia.JobsTest do
_ = Factory.insert(:lock, queue: queue, scope: scope_1)

assert [] = Jobs.list_available_pending_queues(TestRepo, scope_1)
assert [queue] = Jobs.list_available_pending_queues(TestRepo, scope_2)
assert [^queue] = Jobs.list_available_pending_queues(TestRepo, scope_2)
end
end

Expand Down Expand Up @@ -77,7 +77,8 @@ defmodule Queuetopia.JobsTest do
assert %Lock{locked_until: locked_until, locked_at: locked_at} =
TestRepo.get_by(Lock, scope: scope, queue: queue)

assert locked_until = DateTime.add(locked_at, 1_000, :second)
assert locked_until ==
locked_at |> DateTime.add(2_000, :millisecond) |> DateTime.truncate(:second)
end

test "when the queue is already locked" do
Expand Down
59 changes: 36 additions & 23 deletions test/queuetopia/scheduler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ defmodule Queuetopia.SchedulerTest do
alias Queuetopia.TestRepo
alias Queuetopia.TestQueuetopia

setup do
Application.put_env(:queuetopia, TestQueuetopia, poll_interval: 50)
:ok
end

test "poll only available queues" do
scope = TestQueuetopia.scope()

%Job{queue: queue} = Factory.insert(:slow_job, params: %{"duration" => 100}, scope: scope)
lock = Factory.insert(:lock, scope: scope, queue: queue)

start_supervised!({TestQueuetopia, [poll_interval: 50]})
start_supervised!(TestQueuetopia)

refute_receive :started, 50

Expand All @@ -30,7 +35,7 @@ defmodule Queuetopia.SchedulerTest do

assert scope != scope_2

start_supervised!({TestQueuetopia, [poll_interval: 50]})
start_supervised!(TestQueuetopia)

refute_receive :started, 50
refute_receive :started, 50
Expand All @@ -47,7 +52,7 @@ defmodule Queuetopia.SchedulerTest do
timeout: 5_000
)

start_supervised!({TestQueuetopia, [poll_interval: 50]})
start_supervised!(TestQueuetopia)

assert_receive :started, 70

Expand All @@ -73,7 +78,7 @@ defmodule Queuetopia.SchedulerTest do
scope: scope
)

start_supervised!({TestQueuetopia, [poll_interval: 50]})
start_supervised!(TestQueuetopia)

assert_receive :started, 80

Expand Down Expand Up @@ -104,7 +109,8 @@ defmodule Queuetopia.SchedulerTest do
scope: scope
)

start_supervised!({TestQueuetopia, [poll_interval: 500]})
Application.put_env(:queuetopia, TestQueuetopia, poll_interval: 500)
start_supervised!(TestQueuetopia)

assert_receive :started, 500
assert_receive :timeout, 3_000
Expand All @@ -119,7 +125,7 @@ defmodule Queuetopia.SchedulerTest do
%{queue: fast_queue} = Factory.insert(:success_job)
_ = Factory.insert(:success_job, scope: scope, queue: fast_queue)

start_supervised!({TestQueuetopia, [poll_interval: 50]})
start_supervised!(TestQueuetopia)

assert_receive :ok, 200

Expand Down Expand Up @@ -156,7 +162,7 @@ defmodule Queuetopia.SchedulerTest do

%{queue: other_queue} = Factory.insert(:success_job, scope: scope)

start_supervised!({TestQueuetopia, [poll_interval: 50]})
start_supervised!(TestQueuetopia)

assert_receive :ok, 200
assert_receive :fail, 200
Expand Down Expand Up @@ -191,7 +197,7 @@ defmodule Queuetopia.SchedulerTest do

%{queue: other_queue} = Factory.insert(:success_job, scope: scope)

start_supervised!({TestQueuetopia, [poll_interval: 50]})
start_supervised!(TestQueuetopia)

assert_receive :ok, 200
assert_receive :started, 200
Expand All @@ -218,7 +224,7 @@ defmodule Queuetopia.SchedulerTest do

%{queue: success_queue} = Factory.insert(:success_job, scope: scope)

start_supervised!({TestQueuetopia, [poll_interval: 50]})
start_supervised!(TestQueuetopia)

assert_receive :ok, 200
assert_receive :raise, 200
Expand Down Expand Up @@ -246,7 +252,7 @@ defmodule Queuetopia.SchedulerTest do

%{id: id, queue: queue} = Factory.insert(:failure_job, scope: scope)

start_supervised!({TestQueuetopia, [poll_interval: 50]})
start_supervised!(TestQueuetopia)

assert_receive :fail, 200
refute_receive :toto, 50
Expand Down Expand Up @@ -275,7 +281,7 @@ defmodule Queuetopia.SchedulerTest do
scope: scope
)

start_supervised!({TestQueuetopia, [poll_interval: 50]})
start_supervised!(TestQueuetopia)

assert_receive :started, 200
refute_receive :ok, 300
Expand All @@ -296,7 +302,7 @@ defmodule Queuetopia.SchedulerTest do

%{id: id, queue: queue} = Factory.insert(:raising_job, scope: scope, timeout: 50)

start_supervised!({TestQueuetopia, [poll_interval: 50]})
start_supervised!(TestQueuetopia)

assert_receive :raise, 200
refute_receive :toto, 50
Expand All @@ -320,7 +326,7 @@ defmodule Queuetopia.SchedulerTest do

%{id: id, queue: queue} = Factory.insert(:failure_job, scope: scope)

start_supervised!({TestQueuetopia, [poll_interval: 50]})
start_supervised!(TestQueuetopia)

assert_receive :fail, 100
refute_receive :toto, 50
Expand Down Expand Up @@ -351,7 +357,7 @@ defmodule Queuetopia.SchedulerTest do
scope: scope
)

start_supervised!({TestQueuetopia, [poll_interval: 50]})
start_supervised!(TestQueuetopia)

assert_receive :started, 100
refute_receive :toto, 200
Expand Down Expand Up @@ -385,7 +391,7 @@ defmodule Queuetopia.SchedulerTest do
error: nil
} = Factory.insert(:raising_job, scope: scope)

start_supervised!({TestQueuetopia, [poll_interval: 50]})
start_supervised!(TestQueuetopia)

assert_receive :raise, 100
refute_receive :toto, 50
Expand Down Expand Up @@ -414,7 +420,12 @@ defmodule Queuetopia.SchedulerTest do
%{queue: queue} = Factory.insert(:success_job, scope: scope)
_ = Factory.insert(:success_job, scope: scope, queue: queue)

start_supervised!({TestQueuetopia, [poll_interval: 500, repoll_after_job_performed?: true]})
Application.put_env(:queuetopia, TestQueuetopia,
poll_interval: 500,
repoll_after_job_performed?: true
)

start_supervised!(TestQueuetopia)

assert_receive :ok, 1_000

Expand All @@ -426,28 +437,30 @@ defmodule Queuetopia.SchedulerTest do

Factory.insert(:failure_job, scope: scope)

start_supervised!({TestQueuetopia, [poll_interval: 500, repoll_after_job_performed?: true]})
Application.put_env(:queuetopia, TestQueuetopia,
poll_interval: 500,
repoll_after_job_performed?: true
)

start_supervised!(TestQueuetopia)

assert_receive :fail, 1_000
assert_receive :fail, 1_000
end
end

test "send_poll/1 sends the poll messages, only if the process inbox is empty" do
start_supervised!({TestQueuetopia, [poll_interval: 5_000]})
Application.put_env(:queuetopia, TestQueuetopia, poll_interval: 5_000)

scheduler_pid = Process.whereis(TestQueuetopia.Scheduler)
start_supervised!(TestQueuetopia)

{:messages, messages} = Process.info(scheduler_pid, :messages)
assert length(messages) == 0
scheduler_pid = Process.whereis(TestQueuetopia.Scheduler)

Queuetopia.Scheduler.send_poll(scheduler_pid)

{:messages, messages} = Process.info(scheduler_pid, :messages)
assert length(messages) == 1

Queuetopia.Scheduler.send_poll(scheduler_pid)

{:messages, messages} = Process.info(scheduler_pid, :messages)
assert length(messages) == 1

Expand Down
26 changes: 26 additions & 0 deletions test/queuetopia/test/assertions_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Queuetopia.Test.AssertionsTest do
use Queuetopia.DataCase
import Queuetopia.Test.Assertions

describe "assert_job_created/0" do
test "when the job has just been created" do
scope = Queuetopia.TestQueuetopia.scope()
queue = [scope] |> Module.safe_concat()

job = Factory.insert(:job, scope: scope)

assert_job_created(queue, Map.from_struct(job))
end

test "when the job has not been created" do
assert_raise ExUnit.AssertionError, fn ->
scope = Queuetopia.TestQueuetopia.scope()
queue = [scope] |> Module.safe_concat()

job_params = Factory.params_for(:job)

assert_job_created(queue, job_params)
end
end
end
end
Loading

0 comments on commit 972ebd6

Please sign in to comment.