-
Notifications
You must be signed in to change notification settings - Fork 0
/
kvs_partition.rb
72 lines (61 loc) · 2.11 KB
/
kvs_partition.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
require 'rubygems'
require 'bud'
class Kvs
include Bud
state do
channel :get, [:@addr, :sender, :key]
channel :put, [:@addr, :key, :value]
channel :get_res, [:@addr, :key, :value]
table :servers, [:part, :addr]
# server-only
table :db, [:key] => [:value]
end
bootstrap do
servers <= (0...SERVER_PORTS.length).map { |i| [i, "#{LOCALHOST}:#{SERVER_PORTS[i]}"] }
end
bloom :client do # "client" is aware of partitioning in this model.
get <~ (stdio * servers).pairs do |io, server|
[server.addr, ip_port, key_of_get(io.line)] if is_get(io.line) && server.part == partition_of_key(key_of_get(io.line))
end
put <~ (stdio * servers).pairs do |io, server|
[server.addr, key_of_put(io.line), value_of_put(io.line)] if is_put(io.line) && server.part == partition_of_key(key_of_put(io.line))
end
stdio <~ get_res { |res| ["#{res.key}: #{res.value}"] }
end
bloom :server do
get_res <~ (get * db).pairs(:key => :key) { |req, row| [req.sender, req.key, row.value] }
# Implicit ordering done by server
db <+- put { |req| [req.key, req.value] }
end
def partition_of_key(key)
key.hash % SERVER_PORTS.length
end
# GET expected input format: "GET key"
def is_get(input)
input.start_with?("GET ")
end
def key_of_get(input)
input["GET ".length, input.length]
end
# PUT expected input format: "PUT key:value"
def is_put(input)
input.start_with?("PUT ")
end
def key_of_put(input)
remainder = input["PUT ".length, input.length]
remainder.split(":")[0]
end
def value_of_put(input)
remainder = input["PUT ".length, input.length]
remainder.split(":")[1]
end
end
LOCALHOST = "127.0.0.1"
CLIENT_PORT = "10000"
SERVER_PORTS = ["10001", "10002", "10003"]
SERVER_PORTS.each do |port|
server = Kvs.new(:stdin => $stdin, :ip => LOCALHOST, :port => port.to_i)
server.run_bg
end
client = Kvs.new(:stdin => $stdin, :ip => LOCALHOST, :port => CLIENT_PORT.to_i)
client.run_fg