-
Notifications
You must be signed in to change notification settings - Fork 0
/
txn_proxy.lua
68 lines (62 loc) · 2.73 KB
/
txn_proxy.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
-- A fiber can't use multiple transactions simultaneously;
-- i.e. [fiber] --? [transaction] in UML parlor.
--
-- This module provides a simple transaction proxy facility
-- to control multiple transactions at once. A proxy executes
-- statements in a worker fiber in order to overcome
-- "one transaction per fiber" limitation.
--
-- Ex:
-- proxy = require('txn_proxy').new()
-- proxy:begin()
-- proxy('box.space.test:replace{1, 42}')
-- proxy:commit() -- or proxy:rollback()
local ffi = require('ffi')
local yaml = require('yaml')
local fiber = require('fiber')
local console = require('console')
local array_mt = { __serialize = 'array' }
local id_gen = 0
stfu = false
local mt = {
__call = function(self, code_str)
self.c1:put(code_str)
local res = yaml.decode(self.c2:get())
if not stfu then print(yaml.encode{'tx#' .. self.id, code_str, res}) end
return type(res) == 'table' and setmetatable(res, array_mt) or res
end,
__index = {
begin = function(self, code) return self('box.begin()' .. (code and ' ' .. code or '')) end,
commit = function(self, code) return self('box.commit()' .. (code and ' ' .. code or '')) end,
rollback = function(self, code) return self('box.rollback()' .. (code and ' ' .. code or '')) end,
get = function(self, key) return self('s:get(' .. require('my').tostring(key) .. ')') end,
select = function(self, key) return self('s:select(' .. require('my').tostring(key) .. ')') end,
replace = function(self, key) return self('s:replace(' .. require('my').tostring(key) .. ')') end,
insert = function(self, key) return self('s:insert(' .. require('my').tostring(key) .. ')') end,
delete = function(self, key) return self('s:delete(' .. require('my').tostring(key) .. ')') end,
update = function(self, key, opts) return self('s:update(' .. require('my').tostring(key) .. ', ' .. require('my').tostring(opts) .. ')') end,
upsert = function(self, key, opts) return self('s:upsert(' .. require('my').tostring(key) .. ', ' .. require('my').tostring(opts) .. ')') end,
close = function(self) self.c1:close(); self.c2:close() end
}
}
local function fiber_main(c1, c2)
local code_str = c1:get()
if code_str then
c2:put(console.eval(code_str))
return fiber_main(c1, c2) -- tail call
end
end
local function new_txn_proxy(id)
local c1, c2 = fiber.channel(), fiber.channel()
local function on_gc() c1:close(); c2:close() end
fiber.create(fiber_main, c1, c2)
if not id then
id_gen = id_gen + 1
id = id_gen
end
return setmetatable({
c1 = c1, c2 = c2, id = id,
__gc = ffi.gc(ffi.new('char[1]'), on_gc)
}, mt)
end
return { new = new_txn_proxy }