Skip to content

Commit

Permalink
Implementation of batch insert
Browse files Browse the repository at this point in the history
Batch insert is mostly used for operation with
one bucket / one Tarantool node in a transaction.
In this case batch insert is more efficient
then inserting tuple-by-tuple.
Right now CRUD cannot provide batch insert with full consistency.
CRUD offers batch insert with partial consistency. That means
that full consistency can be provided only on single replicaset
using `box` transactions.

Part of #193
  • Loading branch information
AnaNek committed Feb 4, 2022
1 parent 46c78c0 commit e9132cb
Show file tree
Hide file tree
Showing 8 changed files with 790 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

* Batch insert operation `crud.batch_insert()`/`crud.batch_insert_object()`
with partial consistency

### Changed

### Fixed
Expand Down
80 changes: 80 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,86 @@ crud.insert_object('customers', {
...
```

### Batch insert

Right now CRUD cannot provide batch insert with full consistency.
CRUD offers batch insert with partial consistency. That means
that full consistency can be provided only on single replicaset
using `box` transactions.

```lua
-- Batch insert tuples
local result, err = crud.batch_insert(space_name, tuples, opts)
-- Batch insert objects
local result, err = crud.batch_insert_object(space_name, objects, opts)
```

where:

* `space_name` (`string`) - name of the space to insert an object
* `tuples` / `objects` (`table`) - array of tuples/objects to insert
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `fields` (`?table`) - field names for getting only a subset of fields

Returns metadata and array contains inserted rows, array of errors (
one error corresponds to one replicaset for which the error occurred).
Error object can contain `tuple` field. This field contains the tuple
for which the error occurred.

**Example:**

```lua
crud.batch_insert('customers', {
{1, box.NULL, 'Elizabeth', 23},
{2, box.NULL, 'Anastasia', 22},
})
---
- metadata:
- {'name': 'id', 'type': 'unsigned'}
- {'name': 'bucket_id', 'type': 'unsigned'}
- {'name': 'name', 'type': 'string'}
- {'name': 'age', 'type': 'number'}
rows:
- [1, 477, 'Elizabeth', 23]
- [2, 401, 'Anastasia', 22]
...
crud.batch_insert_object('customers', {
{id = 3, name = 'Elizabeth', age = 24},
{id = 10, name = 'Anastasia', age = 21},
})
---
- metadata:
- {'name': 'id', 'type': 'unsigned'}
- {'name': 'bucket_id', 'type': 'unsigned'}
- {'name': 'name', 'type': 'string'}
- {'name': 'age', 'type': 'number'}
rows:
- [3, 2804, 'Elizabeth', 24]
- [10, 569, 'Anastasia', 21]

-- Partial success
local res, errs = crud.batch_insert_object('customers', {
{id = 22, name = 'Alex', age = 34},
{id = 3, name = 'Anastasia', age = 22},
{id = 5, name = 'Sergey', age = 25},
})
---
res
- metadata:
- {'name': 'id', 'type': 'unsigned'}
- {'name': 'bucket_id', 'type': 'unsigned'}
- {'name': 'name', 'type': 'string'}
- {'name': 'age', 'type': 'number'}
rows:
- [5, 1172, 'Sergey', 25],
- [22, 655, 'Alex', 34],

errs
- [err = 'Duplicate key exists ...', ..., tuple = [3, 2804, 'Anastasia', 22]]
...
```

### Get

```lua
Expand Down
10 changes: 10 additions & 0 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
-- @module crud

local insert = require('crud.insert')
local batch_insert = require('crud.batch_insert')
local replace = require('crud.replace')
local get = require('crud.get')
local update = require('crud.update')
Expand All @@ -28,6 +29,14 @@ crud.insert = insert.tuple
-- @function insert_object
crud.insert_object = insert.object

-- @refer batch_insert.tuples_batch
-- @function batch_insert
crud.batch_insert = batch_insert.tuples_batch

-- @refer batch_insert.objects_batch
-- @function batch_insert_object
crud.batch_insert_object = batch_insert.objects_batch

-- @refer get.call
-- @function get
crud.get = get.call
Expand Down Expand Up @@ -105,6 +114,7 @@ function crud.init_storage()
end

insert.init()
batch_insert.init()
get.init()
replace.init()
update.init()
Expand Down
185 changes: 185 additions & 0 deletions crud/batch_insert.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
local checks = require('checks')
local errors = require('errors')
local vshard = require('vshard')

local call = require('crud.common.call')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')

local BatchInsertError = errors.new_class('BatchInsertError', {capture_stack = false})

local batch_insert = {}

local BATCH_INSERT_FUNC_NAME = '_crud.batch_insert_on_storage'

local function batch_insert_on_storage(space_name, batch, opts)
dev_checks('string', 'table', {
fields = '?table',
})

opts = opts or {}

local space = box.space[space_name]
if space == nil then
return nil, BatchInsertError:new("Space %q doesn't exist", space_name)
end

local inserted_tuples = {}

box.begin()
for _, tuple in ipairs(batch) do
local insert_result = schema.wrap_box_space_func_result(space, 'insert', {tuple}, {
field_names = opts.fields,
})

table.insert(inserted_tuples, insert_result.res)
if insert_result.err ~= nil then
box.commit()
return nil, {
err = insert_result.err,
tuple = tuple,
}
end
end
box.commit()

return inserted_tuples
end

function batch_insert.init()
_G._crud.batch_insert_on_storage = batch_insert_on_storage
end

-- returns result, err, need_reload
-- need_reload indicates if reloading schema could help
-- see crud.common.schema.wrap_func_reload()
local function call_batch_insert_on_router(space_name, tuples, opts)
dev_checks('string', 'table', {
timeout = '?number',
fields = '?table',
})

opts = opts or {}

local space = utils.get_space(space_name, vshard.router.routeall())
if space == nil then
return nil, {BatchInsertError:new("Space %q doesn't exist", space_name)}, true
end

local batches_by_replicasets, err = sharding.split_tuples_by_replicaset(tuples, space)
if err ~= nil then
return nil, {err}, true
end

local batch_insert_on_storage_opts = {
fields = opts.fields,
}

local call_opts = {
timeout = opts.timeout,
is_async = true,
}

local futures_by_replicasets = {}
for replicaset, batch in pairs(batches_by_replicasets) do
local func_args = {
space_name,
batch,
batch_insert_on_storage_opts,
}

local future = replicaset:call(BATCH_INSERT_FUNC_NAME, func_args, call_opts)
futures_by_replicasets[replicaset.uuid] = future
end

local results, errs = call.batch(
futures_by_replicasets,
BATCH_INSERT_FUNC_NAME,
opts.timeout
)

local rows = {}
for _, result in pairs(results) do
rows = utils.table_extend(rows, result[1])
end

if next(rows) == nil then
return nil, errs
end

local res, err = utils.format_result(rows, space, opts.fields)
if err ~= nil then
return nil, {err}
end

return res, errs
end

--- Batch inserts tuples to the specified space
--
-- @function tuples_batch
--
-- @param string space_name
-- A space name
--
-- @param table tuples
-- Tuples
--
-- @tparam ?table opts
-- Options of batch_insert.tuples_batch
--
-- @return[1] tuples
-- @treturn[2] nil
-- @treturn[2] table of tables Error description

function batch_insert.tuples_batch(space_name, tuples, opts)
checks('string', 'table', {
timeout = '?number',
fields = '?table',
})

return schema.wrap_func_reload(call_batch_insert_on_router, space_name, tuples, opts)
end

--- Batch inserts objects to the specified space
--
-- @function objects_batch
--
-- @param string space_name
-- A space name
--
-- @param table objs
-- Objects
--
-- @tparam ?table opts
-- Options of batch_insert.tuples_batch
--
-- @return[1] objects
-- @treturn[2] nil
-- @treturn[2] table of tables Error description

function batch_insert.objects_batch(space_name, objs, opts)
checks('string', 'table', {
timeout = '?number',
fields = '?table',
})

local tuples = {}
for _, obj in ipairs(objs) do

local tuple, err = utils.flatten_obj_reload(space_name, obj)
if err ~= nil then
local err_obj = BatchInsertError:new("Failed to flatten object: %s", err)
err_obj.tuple = obj
return nil, {err_obj}
end

table.insert(tuples, tuple)
end

return batch_insert.tuples_batch(space_name, tuples, opts)
end

return batch_insert
34 changes: 34 additions & 0 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,40 @@ local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id)
))
end

function call.batch(futures_by_replicasets, func_name, timeout)
dev_checks('table', 'string', '?number')

local timeout = timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local errs

local results = {}
local deadline = fiber_clock() + timeout
for replicaset_uuid, future in pairs(futures_by_replicasets) do
local wait_timeout = deadline - fiber_clock()
if wait_timeout < 0 then
wait_timeout = 0
end

local result, err = future:wait_result(wait_timeout)
if err == nil and result[1] == nil then
err = result[2]
end

if err ~= nil then
local err_obj = wrap_vshard_err(err.err or err, func_name, replicaset_uuid)
err_obj.tuple = err.tuple

errs = errs or {}
table.insert(errs, err_obj)
else
table.insert(results, result)
end

end

return results, errs
end

function call.map(func_name, func_args, opts)
dev_checks('string', '?table', {
mode = 'string',
Expand Down
23 changes: 23 additions & 0 deletions crud/common/sharding.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ local vshard = require('vshard')
local errors = require('errors')

local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
local GetReplicasetsError = errors.new_class("GetReplicasetsError", {capture_stack = false})

local utils = require('crud.common.utils')
local sharding_key_module = require('crud.common.sharding_key')
Expand Down Expand Up @@ -65,4 +66,26 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_
return bucket_id
end

function sharding.split_tuples_by_replicaset(tuples, space)
local batches = {}

for _, tuple in ipairs(tuples) do
local bucket_id, err = sharding.tuple_set_and_return_bucket_id(tuple, space)
if err ~= nil then
return nil, BucketIDError:new("Failed to get bucket ID: %s", err)
end

local replicaset, err = vshard.router.route(bucket_id)
if replicaset == nil then
return nil, GetReplicasetsError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)
end

local tuples_by_replicaset = batches[replicaset] or {}
table.insert(tuples_by_replicaset, tuple)
batches[replicaset] = tuples_by_replicaset
end

return batches
end

return sharding
Loading

0 comments on commit e9132cb

Please sign in to comment.