-
Notifications
You must be signed in to change notification settings - Fork 1
/
instance.ex
104 lines (89 loc) · 2.88 KB
/
instance.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
defmodule Virta.Instance do
use GenServer
# Client API's
def start_link(graph) do
GenServer.start_link(__MODULE__, { :ok, graph })
end
def execute(server, data) do
GenServer.call(server, { :execute, data })
end
def inports(server) do
GenServer.call(server, { :inports })
end
def outports(server) do
GenServer.call(server, { :outports })
end
# Server Callbacks
def init({ :ok, graph }) do
lookup_table = graph
|> Graph.topsort
|> Enum.reverse
|> Enum.reduce(Map.new(), fn(node, lookup_table) ->
outport_args = get_outport_args(graph, node, lookup_table, graph)
module = Module.concat("Elixir", Map.get(node, :module))
{ :ok, pid } = Task.start_link(module, :loop, [ %{}, outport_args, self() ])
Map.put(lookup_table, node, pid)
end)
state = Map.new()
|> Map.put(:graph, graph)
|> Map.put(:lookup_table, lookup_table)
{ :ok, state }
end
def handle_call({ :execute, data }, { pid, _ref }, state) do
Map.to_list(data)
|> Enum.map(fn({ node, messages }) ->
pid = Map.get(Map.get(state, :lookup_table), node)
messages
|> Enum.map(fn(message) ->
send(pid, message)
end)
end)
{ :reply, :ok, Map.put(state, :from, pid) }
end
def handle_call({ :inports }, _from, state) do
graph = Map.get(state, :graph)
vertex = graph
|> Graph.topsort
|> Enum.at(0)
{ :reply, Graph.out_edges(graph, vertex), state }
end
def handle_call({ :outports }, _from, state) do
graph = Map.get(state, :graph)
vertex = graph
|> Graph.topsort
|> Enum.reverse
|> Enum.at(0)
{ :reply, Graph.in_edges(graph, vertex), state }
end
def handle_info({ request_id, :output, output }, state) do
send(Map.get(state, :from), { request_id, output })
{ :noreply, state}
end
# Private functions
defp get_outport_args(graph, node, lookup_table, graph) do
module = Module.concat("Elixir", Map.get(node, :module))
cond do
Keyword.has_key?(module.__info__(:functions), :final) && module.final ->
in_edges = Graph.in_edges(graph, node)
Enum.map(in_edges, fn(edge) ->
Map.get(edge, :label)
end)
Keyword.has_key?(module.__info__(:functions), :workflow) && module.workflow ->
in_edges = Graph.in_edges(graph, node)
out_edges = Graph.out_edges(graph, node)
Enum.map(in_edges, fn(edge) ->
%{ v2: to_node, label: label } = edge
Map.put(label, :ref, to_node)
end) ++ Enum.map(out_edges, fn(edge) ->
%{ v2: to_node, label: label } = edge
Map.put(label, :pid, Map.get(lookup_table, to_node))
end)
true ->
out_edges = Graph.out_edges(graph, node)
Enum.map(out_edges, fn(edge) ->
%{ v2: to_node, label: label } = edge
Map.put(label, :pid, Map.get(lookup_table, to_node))
end)
end
end
end