Skip to content

Commit

Permalink
Implementation of batch upsert
Browse files Browse the repository at this point in the history
Batch upsert is mostly used for operation with
one bucket / one Tarantool node in a transaction.
In this case batch upsert is more efficient
then upserting tuple-by-tuple.
Right now CRUD cannot provide batch upsert with full consistency.
CRUD offers batch upsert with partial consistency. That means
that full consistency can be provided only on single replicaset
using `box` transactions.

Closes #193
  • Loading branch information
AnaNek committed Feb 4, 2022
1 parent e9132cb commit e6ea1ce
Show file tree
Hide file tree
Showing 5 changed files with 855 additions and 2 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

* Batch insert operation `crud.batch_insert()`/`crud.batch_insert_object()`
* Batch insert/upsert operation
`crud.batch_insert()`/`crud.batch_insert_object()`/
`crud.batch_upsert()`/`crud.batch_upsert_object()`
with partial consistency

### Changed

### Fixed
Expand Down
80 changes: 80 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,86 @@ crud.upsert_object('customers',
...
```

### Batch upsert

Right now CRUD cannot provide batch upsert with full consistency.
CRUD offers batch upsert with partial consistency. That means
that full consistency can be provided only on single replicaset
using `box` transactions.

```lua
-- Batch upsert tuples
local result, err = crud.batch_upsert(space_name, tuples, operations, opts)
-- Batch upsert objects
local result, err = crud.batch_upsert_object(space_name, objects, operations, opts)
```

where:

* `space_name` (`string`) - name of the space to insert an object
* `tuples` / `objects` (`table`) - array of tuples/objects to insert
* `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/#box-space-update) if there is an existing tuple which matches the key fields of tuple
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `fields` (`?table`) - field names for getting only a subset of fields

Returns metadata and array of empty arrays, array of errors (
one error corresponds to one replicaset for which the error occurred).
Error object can contain `tuple` field. This field contains the tuple
for which the error occurred.

**Example:**

```lua
crud.batch_upsert('customers', {
{1, box.NULL, 'Elizabeth', 23},
{2, box.NULL, 'Anastasia', 22},
}, {{'+', 'age', 1}})
---
- metadata:
- {'name': 'id', 'type': 'unsigned'}
- {'name': 'bucket_id', 'type': 'unsigned'}
- {'name': 'name', 'type': 'string'}
- {'name': 'age', 'type': 'number'}
rows:
- []
- []
...
crud.batch_upsert_object('customers', {
{id = 3, name = 'Elizabeth', age = 24},
{id = 10, name = 'Anastasia', age = 21},
}, {{'+', 'age', 1}})
---
- metadata:
- {'name': 'id', 'type': 'unsigned'}
- {'name': 'bucket_id', 'type': 'unsigned'}
- {'name': 'name', 'type': 'string'}
- {'name': 'age', 'type': 'number'}
rows:
- []
- []

-- Partial success
local res, errs = crud.batch_insert_object('customers', {
{id = 22, name = 'Alex', age = 34},
{id = 3, name = 'Anastasia', age = 22},
{id = 5, name = 'Sergey', age = 25},
}, {{'=', 'age', 'invalid type'}})
---
res
- metadata:
- {'name': 'id', 'type': 'unsigned'}
- {'name': 'bucket_id', 'type': 'unsigned'}
- {'name': 'name', 'type': 'string'}
- {'name': 'age', 'type': 'number'}
rows:
- [],
- [],

errs
- [err = 'Tuple field 4 (age) type does not match one required by operation ...', ..., tuple = [3, 2804, 'Anastasia', 22]]
...
```

### Select

Expand Down
10 changes: 10 additions & 0 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ local replace = require('crud.replace')
local get = require('crud.get')
local update = require('crud.update')
local upsert = require('crud.upsert')
local batch_upsert = require('crud.batch_upsert')
local delete = require('crud.delete')
local select = require('crud.select')
local truncate = require('crud.truncate')
Expand Down Expand Up @@ -57,6 +58,14 @@ crud.update = update.call
-- @function upsert
crud.upsert = upsert.tuple

-- @refer batch_upsert.tuples_batch
-- @function batch_upsert
crud.batch_upsert = batch_upsert.tuples_batch

-- @refer batch_upsert.objects_batch
-- @function batch_upsert_object
crud.batch_upsert_object = batch_upsert.objects_batch

-- @refer upsert.object
-- @function upsert
crud.upsert_object = upsert.object
Expand Down Expand Up @@ -119,6 +128,7 @@ function crud.init_storage()
replace.init()
update.init()
upsert.init()
batch_upsert.init()
delete.init()
select.init()
truncate.init()
Expand Down
185 changes: 185 additions & 0 deletions crud/batch_upsert.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
local checks = require('checks')
local errors = require('errors')
local vshard = require('vshard')

local call = require('crud.common.call')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')

local BatchUpsertError = errors.new_class('BatchUpsertError', {capture_stack = false})

local batch_upsert = {}

local BATCH_UPSERT_FUNC_NAME = '_crud.batch_upsert_on_storage'

local function batch_upsert_on_storage(space_name, batch, operations)
dev_checks('string', 'table', 'table')

local space = box.space[space_name]
if space == nil then
return nil, BatchUpsertError:new("Space %q doesn't exist", space_name)
end

local inserted_tuples = {}

box.begin()
for _, tuple in ipairs(batch) do
local insert_result = schema.wrap_box_space_func_result(space, 'upsert', {tuple, operations}, {})

table.insert(inserted_tuples, insert_result)
if insert_result.err ~= nil then
box.commit()
return nil, {
err = insert_result.err,
tuple = tuple,
}
end
end
box.commit()

return inserted_tuples
end

function batch_upsert.init()
_G._crud.batch_upsert_on_storage = batch_upsert_on_storage
end

-- returns result, err, need_reload
-- need_reload indicates if reloading schema could help
-- see crud.common.schema.wrap_func_reload()
local function call_batch_upsert_on_router(space_name, tuples, user_operations, opts)
dev_checks('string', 'table', 'table', {
timeout = '?number',
fields = '?table',
})

opts = opts or {}

local space = utils.get_space(space_name, vshard.router.routeall())
if space == nil then
return nil, {BatchUpsertError:new("Space %q doesn't exist", space_name)}, true
end

local space_format = space:format()
local operations = user_operations
local err
if not utils.tarantool_supports_fieldpaths() then
operations, err = utils.convert_operations(user_operations, space_format)
if err ~= nil then
return nil, {BatchUpsertError:new("Wrong operations are specified: %s", err)}, true
end
end

local batches_by_replicasets, err = sharding.split_tuples_by_replicaset(tuples, space)
if err ~= nil then
return nil, {err}, true
end

local call_opts = {
timeout = opts.timeout,
is_async = true,
}

local futures_by_replicasets = {}
for replicaset, batch in pairs(batches_by_replicasets) do
local func_args = {
space_name,
batch,
operations,
}

local future = replicaset:call(BATCH_UPSERT_FUNC_NAME, func_args, call_opts)
futures_by_replicasets[replicaset.uuid] = future
end

local results, errs = call.batch(
futures_by_replicasets,
BATCH_UPSERT_FUNC_NAME,
opts.timeout
)

local rows = {}
for _, result in pairs(results) do
rows = utils.table_extend(rows, result[1])
end

if next(rows) == nil then
return nil, errs
end

local res, err = utils.format_result(rows, space, opts.fields)
if err ~= nil then
return nil, {err}
end

return res, errs
end

--- Batch update or insert tuples to the specified space
--
-- @function tuples_batch
--
-- @param string space_name
-- A space name
--
-- @param table tuples
-- Tuples
--
-- @tparam ?table opts
-- Options of batch_upsert.tuples_batch
--
-- @return[1] tuples
-- @treturn[2] nil
-- @treturn[2] table of tables Error description

function batch_upsert.tuples_batch(space_name, tuples, user_operations, opts)
checks('string', 'table', 'table', {
timeout = '?number',
fields = '?table',
})

return schema.wrap_func_reload(call_batch_upsert_on_router, space_name, tuples, user_operations, opts)
end

--- Batch update or insert objects to the specified space
--
-- @function objects_batch
--
-- @param string space_name
-- A space name
--
-- @param table objs
-- Objects
--
-- @tparam ?table opts
-- Options of batch_upsert.tuples_batch
--
-- @return[1] objects
-- @treturn[2] nil
-- @treturn[2] table of tables Error description

function batch_upsert.objects_batch(space_name, objs, user_operations, opts)
checks('string', 'table', 'table', {
timeout = '?number',
fields = '?table',
})

local tuples = {}
for _, obj in ipairs(objs) do

local tuple, err = utils.flatten_obj_reload(space_name, obj)
if err ~= nil then
local err_obj = BatchUpsertError:new("Failed to flatten object: %s", err)
err_obj.tuple = obj
return nil, {err_obj}
end

table.insert(tuples, tuple)
end

return batch_upsert.tuples_batch(space_name, tuples, user_operations, opts)
end

return batch_upsert
Loading

0 comments on commit e6ea1ce

Please sign in to comment.