Skip to content

Commit

Permalink
Pause channel (#2370)
Browse files Browse the repository at this point in the history
- Add :paused field to channel model
- Add endpoints to pause and unpause a channel
- Update ChannelStatusServer to skip paused channels

----

* Refactor checking channel status changes

* Add unit test for the :paused status

* Add paused field to Channel

* Make ChannelStatusServer.poll callable from production

* Force a polling to ensure the pause/unpause is processed

* Paused channels should appear in the down channels list

* Fix CI failure

* Apply fixes from PR
  • Loading branch information
ismaelbej authored Oct 30, 2024
1 parent 2585977 commit c82e6c0
Show file tree
Hide file tree
Showing 12 changed files with 243 additions and 75 deletions.
41 changes: 32 additions & 9 deletions lib/ask/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defmodule Ask.Channel do
field :settings, :map
field :patterns, Ask.Ecto.Type.JSON, default: []
field :status, Ask.Ecto.Type.JSON, virtual: true
field :paused, :boolean, default: false

belongs_to :user, Ask.User
has_many :respondent_group_channels, Ask.RespondentGroupChannel, on_delete: :delete_all
many_to_many :projects, Ask.Project, join_through: Ask.ProjectChannel, on_replace: :delete
Expand All @@ -36,7 +38,7 @@ defmodule Ask.Channel do
"""
def changeset(struct, params \\ %{}) do
struct
|> cast(params, [:name, :type, :provider, :base_url, :settings, :user_id, :patterns])
|> cast(params, [:name, :type, :provider, :base_url, :settings, :user_id, :patterns, :paused])
|> validate_required([:name, :type, :provider, :settings, :user_id])
|> validate_patterns
|> assoc_constraint(:user)
Expand Down Expand Up @@ -84,18 +86,39 @@ defmodule Ask.Channel do
end

def with_status(channel) do
status = channel.id |> ChannelStatusServer.get_channel_status()

status =
case status do
:up -> %{status: "up"}
:unknown -> %{status: "unknown"}
down_or_error -> down_or_error
end
status = channel |> get_status()

%{channel | status: status}
end

def get_status(%{paused: true}) do
%{status: "paused"}
end

def get_status(channel) do
channel.id
|> ChannelStatusServer.get_channel_status()
|> case do
:up -> %{status: "up"}
:unknown -> %{status: "unknown"}
down_or_error -> down_or_error
end
end

def is_paused?(channel) do
channel.paused
end

def is_down?(channel) do
channel
|> get_status()
|> case do
%{status: "up"} -> false
%{status: "unknown"} -> false
_ -> true
end
end

defp validate_patterns(changeset) do
changeset
|> validate_patterns_not_empty
Expand Down
112 changes: 65 additions & 47 deletions lib/ask/runtime/channel_status_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defmodule Ask.Runtime.ChannelStatusServer do
end

def poll(pid) do
send(pid, :poll)
send(pid, :poll_once)
end

def get_channel_status(channel_id) do
Expand All @@ -40,57 +40,23 @@ defmodule Ask.Runtime.ChannelStatusServer do
log_info("polling")

try do
Survey.running_channels()
|> Repo.preload(:user)
|> Enum.each(fn c ->
previous_status = get_status_from_state(c.id, state)
poll_channels(state)

spawn(fn ->
status = ChannelBroker.check_status(c.id)
timestamp = Timex.now()
{:noreply, state}
after
:timer.send_after(@poll_interval, :poll)
end
end

case status do
{:down, messages} ->
case previous_status do
%{status: :down} ->
nil

_ ->
AskWeb.Email.channel_down(c.user.email, c, messages) |> Ask.Mailer.deliver()

update_channel_status(c.id, %{
status: :down,
messages: messages,
name: c.name,
timestamp: timestamp
})
end

{:error, code} ->
case previous_status do
%{status: :error} ->
nil

_ ->
AskWeb.Email.channel_error(c.user.email, c, code) |> Ask.Mailer.deliver()

update_channel_status(c.id, %{
status: :error,
code: code,
name: c.name,
timestamp: timestamp
})
end

status ->
update_channel_status(c.id, status)
end
end)
end)
def handle_info(:poll_once, state) do
log_info("poll forced")

try do
poll_channels(state)

{:noreply, state}
after
:timer.send_after(@poll_interval, :poll)
nil
end
end

Expand All @@ -105,4 +71,56 @@ defmodule Ask.Runtime.ChannelStatusServer do
def log_info(message) do
Logger.info("ChannelStatusServer: #{message}")
end

defp poll_channels(state) do
Survey.running_channels()
|> Repo.preload(:user)
|> Enum.each(fn c ->

unless c.paused do
previous_status = get_status_from_state(c.id, state)

spawn(fn ->
status = ChannelBroker.check_status(c.id)
timestamp = Timex.now()

process_channel_status_change(status, previous_status, timestamp, c)
end)
end
end)
end

defp process_channel_status_change({:down, _messages}, %{status: :down}, _timestamp, _channel) do
nil
end

defp process_channel_status_change({:down, messages}, _previous_status, timestamp, channel) do
AskWeb.Email.channel_down(channel.user.email, channel, messages) |> Ask.Mailer.deliver()

update_channel_status(channel.id, %{
status: :down,
messages: messages,
name: channel.name,
timestamp: timestamp
})
end

defp process_channel_status_change({:error, _code}, %{status: :error}, _timestamp, _channel) do
nil
end

defp process_channel_status_change({:error, code}, _previous_status, timestamp, channel) do
AskWeb.Email.channel_error(channel.user.email, channel, code) |> Ask.Mailer.deliver()

update_channel_status(channel.id, %{
status: :error,
code: code,
name: channel.name,
timestamp: timestamp
})
end

defp process_channel_status_change(status, _previous_status, _timestamp, channel) do
update_channel_status(channel.id, status)
end
end
6 changes: 2 additions & 4 deletions lib/ask/runtime/survey_broker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Ask.Runtime.SurveyBroker do
import Ecto

alias Ask.{
Channel,
Repo,
Logger,
Survey,
Expand Down Expand Up @@ -131,10 +132,7 @@ defmodule Ask.Runtime.SurveyBroker do

channel_is_down? =
channels
|> Enum.any?(fn c ->
status = c.id |> ChannelStatusServer.get_channel_status()
status != :up && status != :unknown
end)
|> Enum.any?(&(&1 |> Channel.is_paused?() || &1 |> Channel.is_down?()))

poll_survey(survey, now, channel_is_down?)
end
Expand Down
5 changes: 2 additions & 3 deletions lib/ask/survey.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ defmodule Ask.Survey do
PanelSurvey
}

alias Ask.Runtime.ChannelStatusServer
alias Ask.Ecto.Type.JSON
alias Ecto.Multi

Expand Down Expand Up @@ -525,8 +524,8 @@ defmodule Ask.Survey do

down_channels =
channels
|> Enum.map(&(&1.id |> ChannelStatusServer.get_channel_status()))
|> Enum.filter(&(&1 != :up && &1 != :unknown))
|> Enum.map(&(&1 |> Channel.get_status()))
|> Enum.filter(&(&1[:status] != "up" && &1[:status] != "unknown"))

%{survey | down_channels: down_channels}
end
Expand Down
36 changes: 35 additions & 1 deletion lib/ask_web/controllers/channel_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule AskWeb.ChannelController do
use AskWeb, :api_controller

alias Ask.{Channel, Project, Logger}
alias Ask.Runtime.ChannelBroker
alias Ask.Runtime.{ChannelBroker, ChannelStatusServer}

def index(conn, %{"project_id" => project_id}) do
channels =
Expand Down Expand Up @@ -106,4 +106,38 @@ defmodule AskWeb.ChannelController do

render(conn, "show.json", channel: channel |> Repo.preload([:projects, :user]))
end

def pause(conn, %{"channel_id" => id}) do
pause_channel(conn, id, true)
end

def unpause(conn, %{"channel_id" => id}) do
pause_channel(conn, id, false)
end

defp pause_channel(conn, id, paused) do
channel_params = %{"paused" => paused}

Channel
|> Repo.get!(id)
|> authorize_channel(conn)
|> Repo.preload([:projects, :user])
|> Channel.changeset(channel_params)
|> Repo.update()
|> case do
{:ok, channel} ->
ChannelBroker.on_channel_settings_change(channel.id, channel.settings)
ChannelStatusServer.poll(ChannelStatusServer.server_ref())

render(conn, "show.json", channel: channel |> Repo.preload(:projects))

{:error, changeset} ->
Logger.warn("Error when pausing channel: #{id}")

conn
|> put_status(:unprocessable_entity)
|> put_view(AskWeb.ChangesetView)
|> render("error.json", changeset: changeset)
end
end
end
6 changes: 5 additions & 1 deletion lib/ask_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ defmodule AskWeb.Router do
as: :update_archived_status
end

resources "/channels", ChannelController, except: [:new, :edit]
resources "/channels", ChannelController, except: [:new, :edit] do
post "/pause", ChannelController, :pause, as: :pause
post "/unpause", ChannelController, :unpause, as: :unpause
end

get "/audios/tts", AudioController, :tts
resources "/audios", AudioController, only: [:create, :show]
resources "/authorizations", OAuthClientController, only: [:index, :delete]
Expand Down
9 changes: 9 additions & 0 deletions priv/repo/migrations/20241007192540_pause_channel.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Ask.Repo.Migrations.PauseChannel do
use Ecto.Migration

def change do
alter table(:channels) do
add :paused, :boolean, default: false
end
end
end
4 changes: 3 additions & 1 deletion priv/repo/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ CREATE TABLE `channels` (
`updated_at` datetime NOT NULL,
`base_url` varchar(255) DEFAULT NULL,
`patterns` text,
`paused` tinyint(1) DEFAULT '0',
PRIMARY KEY (`id`),
UNIQUE KEY `id` (`id`),
KEY `channels_user_id_index` (`user_id`),
Expand Down Expand Up @@ -1018,7 +1019,7 @@ CREATE TABLE `users` (
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

-- Dump completed on 2024-04-22 22:06:45
-- Dump completed on 2024-10-07 19:43:26
INSERT INTO `schema_migrations` (version) VALUES (20160812145257);
INSERT INTO `schema_migrations` (version) VALUES (20160816183915);
INSERT INTO `schema_migrations` (version) VALUES (20160830200454);
Expand Down Expand Up @@ -1238,3 +1239,4 @@ INSERT INTO `schema_migrations` (version) VALUES (20230405111657);
INSERT INTO `schema_migrations` (version) VALUES (20230413101342);
INSERT INTO `schema_migrations` (version) VALUES (20230821100203);
INSERT INTO `schema_migrations` (version) VALUES (20240422175453);
INSERT INTO `schema_migrations` (version) VALUES (20241007192540);
30 changes: 30 additions & 0 deletions test/ask/runtime/channel_status_server_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,34 @@ defmodule Ask.Runtime.ChannelStatusServerTest do
ChannelStatusServer.poll(pid)
refute_receive [:email, ^email]
end

test "doesn't send email when a channel is down but status was previously :paused" do
{:ok, pid} = ChannelStatusServer.start_link()
Process.register(self(), :mail_target)
user = insert(:user)
survey = insert(:survey, state: :running)

channel =
TestChannel.create_channel(user, "test", TestChannel.settings(TestChannel.new(), 1, :down), %{paused: true})

setup_surveys_with_channels([survey], [channel])

ChannelStatusServer.poll(pid)
refute_receive [:email, _]
end

test "doesn't send email when :error is received but status was previously :paused" do
{:ok, pid} = ChannelStatusServer.start_link()
Process.register(self(), :mail_target)
user = insert(:user)
survey = insert(:survey, state: :running)

channel =
TestChannel.create_channel(user, "test", TestChannel.settings(TestChannel.new(), 1, :error), %{paused: true})

setup_surveys_with_channels([survey], [channel])

ChannelStatusServer.poll(pid)
refute_receive [:email, _]
end
end
Loading

0 comments on commit c82e6c0

Please sign in to comment.