From e6ea1cecde4e611a8cb82d61c929c53718c8707e Mon Sep 17 00:00:00 2001 From: AnaNek Date: Sat, 5 Feb 2022 00:13:11 +0300 Subject: [PATCH] Implementation of batch upsert Batch upsert is mostly used for operation with one bucket / one Tarantool node in a transaction. In this case batch upsert is more efficient then upserting tuple-by-tuple. Right now CRUD cannot provide batch upsert with full consistency. CRUD offers batch upsert with partial consistency. That means that full consistency can be provided only on single replicaset using `box` transactions. Closes #193 --- CHANGELOG.md | 6 +- README.md | 80 +++ crud.lua | 10 + crud/batch_upsert.lua | 185 +++++++ test/integration/batch_operations_test.lua | 576 +++++++++++++++++++++ 5 files changed, 855 insertions(+), 2 deletions(-) create mode 100644 crud/batch_upsert.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index 530b68a27..da9f7a390 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,9 +9,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added -* Batch insert operation `crud.batch_insert()`/`crud.batch_insert_object()` +* Batch insert/upsert operation + `crud.batch_insert()`/`crud.batch_insert_object()`/ + `crud.batch_upsert()`/`crud.batch_upsert_object()` with partial consistency - + ### Changed ### Fixed diff --git a/README.md b/README.md index ff3d0781e..4892c599a 100644 --- a/README.md +++ b/README.md @@ -429,6 +429,86 @@ crud.upsert_object('customers', ... ``` +### Batch upsert + +Right now CRUD cannot provide batch upsert with full consistency. +CRUD offers batch upsert with partial consistency. That means +that full consistency can be provided only on single replicaset +using `box` transactions. + +```lua +-- Batch upsert tuples +local result, err = crud.batch_upsert(space_name, tuples, operations, opts) +-- Batch upsert objects +local result, err = crud.batch_upsert_object(space_name, objects, operations, opts) +``` + +where: + +* `space_name` (`string`) - name of the space to insert an object +* `tuples` / `objects` (`table`) - array of tuples/objects to insert +* `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/#box-space-update) if there is an existing tuple which matches the key fields of tuple +* `opts`: + * `timeout` (`?number`) - `vshard.call` timeout (in seconds) + * `fields` (`?table`) - field names for getting only a subset of fields + +Returns metadata and array of empty arrays, array of errors ( +one error corresponds to one replicaset for which the error occurred). +Error object can contain `tuple` field. This field contains the tuple +for which the error occurred. + +**Example:** + +```lua +crud.batch_upsert('customers', { + {1, box.NULL, 'Elizabeth', 23}, + {2, box.NULL, 'Anastasia', 22}, +}, {{'+', 'age', 1}}) +--- +- metadata: + - {'name': 'id', 'type': 'unsigned'} + - {'name': 'bucket_id', 'type': 'unsigned'} + - {'name': 'name', 'type': 'string'} + - {'name': 'age', 'type': 'number'} + rows: + - [] + - [] +... +crud.batch_upsert_object('customers', { + {id = 3, name = 'Elizabeth', age = 24}, + {id = 10, name = 'Anastasia', age = 21}, +}, {{'+', 'age', 1}}) +--- +- metadata: + - {'name': 'id', 'type': 'unsigned'} + - {'name': 'bucket_id', 'type': 'unsigned'} + - {'name': 'name', 'type': 'string'} + - {'name': 'age', 'type': 'number'} + rows: + - [] + - [] + +-- Partial success +local res, errs = crud.batch_insert_object('customers', { + {id = 22, name = 'Alex', age = 34}, + {id = 3, name = 'Anastasia', age = 22}, + {id = 5, name = 'Sergey', age = 25}, +}, {{'=', 'age', 'invalid type'}}) +--- +res +- metadata: + - {'name': 'id', 'type': 'unsigned'} + - {'name': 'bucket_id', 'type': 'unsigned'} + - {'name': 'name', 'type': 'string'} + - {'name': 'age', 'type': 'number'} + rows: + - [], + - [], + +errs +- [err = 'Tuple field 4 (age) type does not match one required by operation ...', ..., tuple = [3, 2804, 'Anastasia', 22]] +... +``` ### Select diff --git a/crud.lua b/crud.lua index ac235a134..e5f55831e 100644 --- a/crud.lua +++ b/crud.lua @@ -8,6 +8,7 @@ local replace = require('crud.replace') local get = require('crud.get') local update = require('crud.update') local upsert = require('crud.upsert') +local batch_upsert = require('crud.batch_upsert') local delete = require('crud.delete') local select = require('crud.select') local truncate = require('crud.truncate') @@ -57,6 +58,14 @@ crud.update = update.call -- @function upsert crud.upsert = upsert.tuple +-- @refer batch_upsert.tuples_batch +-- @function batch_upsert +crud.batch_upsert = batch_upsert.tuples_batch + +-- @refer batch_upsert.objects_batch +-- @function batch_upsert_object +crud.batch_upsert_object = batch_upsert.objects_batch + -- @refer upsert.object -- @function upsert crud.upsert_object = upsert.object @@ -119,6 +128,7 @@ function crud.init_storage() replace.init() update.init() upsert.init() + batch_upsert.init() delete.init() select.init() truncate.init() diff --git a/crud/batch_upsert.lua b/crud/batch_upsert.lua new file mode 100644 index 000000000..40c189b8f --- /dev/null +++ b/crud/batch_upsert.lua @@ -0,0 +1,185 @@ +local checks = require('checks') +local errors = require('errors') +local vshard = require('vshard') + +local call = require('crud.common.call') +local utils = require('crud.common.utils') +local sharding = require('crud.common.sharding') +local dev_checks = require('crud.common.dev_checks') +local schema = require('crud.common.schema') + +local BatchUpsertError = errors.new_class('BatchUpsertError', {capture_stack = false}) + +local batch_upsert = {} + +local BATCH_UPSERT_FUNC_NAME = '_crud.batch_upsert_on_storage' + +local function batch_upsert_on_storage(space_name, batch, operations) + dev_checks('string', 'table', 'table') + + local space = box.space[space_name] + if space == nil then + return nil, BatchUpsertError:new("Space %q doesn't exist", space_name) + end + + local inserted_tuples = {} + + box.begin() + for _, tuple in ipairs(batch) do + local insert_result = schema.wrap_box_space_func_result(space, 'upsert', {tuple, operations}, {}) + + table.insert(inserted_tuples, insert_result) + if insert_result.err ~= nil then + box.commit() + return nil, { + err = insert_result.err, + tuple = tuple, + } + end + end + box.commit() + + return inserted_tuples +end + +function batch_upsert.init() + _G._crud.batch_upsert_on_storage = batch_upsert_on_storage +end + +-- returns result, err, need_reload +-- need_reload indicates if reloading schema could help +-- see crud.common.schema.wrap_func_reload() +local function call_batch_upsert_on_router(space_name, tuples, user_operations, opts) + dev_checks('string', 'table', 'table', { + timeout = '?number', + fields = '?table', + }) + + opts = opts or {} + + local space = utils.get_space(space_name, vshard.router.routeall()) + if space == nil then + return nil, {BatchUpsertError:new("Space %q doesn't exist", space_name)}, true + end + + local space_format = space:format() + local operations = user_operations + local err + if not utils.tarantool_supports_fieldpaths() then + operations, err = utils.convert_operations(user_operations, space_format) + if err ~= nil then + return nil, {BatchUpsertError:new("Wrong operations are specified: %s", err)}, true + end + end + + local batches_by_replicasets, err = sharding.split_tuples_by_replicaset(tuples, space) + if err ~= nil then + return nil, {err}, true + end + + local call_opts = { + timeout = opts.timeout, + is_async = true, + } + + local futures_by_replicasets = {} + for replicaset, batch in pairs(batches_by_replicasets) do + local func_args = { + space_name, + batch, + operations, + } + + local future = replicaset:call(BATCH_UPSERT_FUNC_NAME, func_args, call_opts) + futures_by_replicasets[replicaset.uuid] = future + end + + local results, errs = call.batch( + futures_by_replicasets, + BATCH_UPSERT_FUNC_NAME, + opts.timeout + ) + + local rows = {} + for _, result in pairs(results) do + rows = utils.table_extend(rows, result[1]) + end + + if next(rows) == nil then + return nil, errs + end + + local res, err = utils.format_result(rows, space, opts.fields) + if err ~= nil then + return nil, {err} + end + + return res, errs +end + +--- Batch update or insert tuples to the specified space +-- +-- @function tuples_batch +-- +-- @param string space_name +-- A space name +-- +-- @param table tuples +-- Tuples +-- +-- @tparam ?table opts +-- Options of batch_upsert.tuples_batch +-- +-- @return[1] tuples +-- @treturn[2] nil +-- @treturn[2] table of tables Error description + +function batch_upsert.tuples_batch(space_name, tuples, user_operations, opts) + checks('string', 'table', 'table', { + timeout = '?number', + fields = '?table', + }) + + return schema.wrap_func_reload(call_batch_upsert_on_router, space_name, tuples, user_operations, opts) +end + +--- Batch update or insert objects to the specified space +-- +-- @function objects_batch +-- +-- @param string space_name +-- A space name +-- +-- @param table objs +-- Objects +-- +-- @tparam ?table opts +-- Options of batch_upsert.tuples_batch +-- +-- @return[1] objects +-- @treturn[2] nil +-- @treturn[2] table of tables Error description + +function batch_upsert.objects_batch(space_name, objs, user_operations, opts) + checks('string', 'table', 'table', { + timeout = '?number', + fields = '?table', + }) + + local tuples = {} + for _, obj in ipairs(objs) do + + local tuple, err = utils.flatten_obj_reload(space_name, obj) + if err ~= nil then + local err_obj = BatchUpsertError:new("Failed to flatten object: %s", err) + err_obj.tuple = obj + return nil, {err_obj} + end + + table.insert(tuples, tuple) + end + + return batch_upsert.tuples_batch(space_name, tuples, user_operations, opts) +end + +return batch_upsert diff --git a/test/integration/batch_operations_test.lua b/test/integration/batch_operations_test.lua index 33308d779..d50641d13 100644 --- a/test/integration/batch_operations_test.lua +++ b/test/integration/batch_operations_test.lua @@ -60,6 +60,38 @@ pgroup.test_non_existent_space = function(g) t.assert_not_equals(errs, nil) t.assert_equals(#errs, 1) t.assert_str_contains(errs[1].err, 'Space "non_existent_space" doesn\'t exist') + + -- batch_upsert + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert', { + 'non_existent_space', + { + {1, box.NULL, 'Alex', 59}, + {2, box.NULL, 'Anna', 23}, + {3, box.NULL, 'Daria', 18} + }, + {{'+', 'age', 1}} + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Space "non_existent_space" doesn\'t exist') + + -- batch_upsert_object + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert_object', { + 'non_existent_space', + { + {id = 1, name = 'Fedor', age = 59}, + {id = 2, name = 'Anna', age = 23}, + {id = 3, name = 'Daria', age = 18} + }, + {{'+', 'age', 1}} + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Space "non_existent_space" doesn\'t exist') end pgroup.test_batch_insert_object_get = function(g) @@ -329,6 +361,424 @@ pgroup.test_batch_insert_get = function(g) t.assert_equals(result, nil) end +pgroup.test_batch_upsert_object_get = function(g) + -- bad format + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert_object', { + 'customers', + { + {id = 1, name = 'Fedor', age = 59}, + {id = 2, name = 'Anna'}, + }, { + {'+', 'age', 25}, + {'=', 'name', 'Leo Tolstoy'}, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Field \"age\" isn\'t nullable') + t.assert_equals(errs[1].tuple, {id = 2, name = 'Anna'}) + + -- batch_upsert_object + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert_object', { + 'customers', + { + {id = 1, name = 'Fedor', age = 59}, + {id = 2, name = 'Anna', age = 23}, + {id = 3, name = 'Daria', age = 18} + }, { + {'+', 'age', 25}, + {'=', 'name', 'Leo Tolstoy'}, + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + t.assert_equals(result.rows, {{}, {}, {}}) + + -- get + -- primary key = 1 -> bucket_id = 477 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(1) + t.assert_equals(result, {1, 477, 'Fedor', 59}) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- batch_insert_object again + -- success with updating one record + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert_object', { + 'customers', + { + {id = 1, name = 'Alex', age = 34}, + {id = 81, name = 'Anastasia', age = 22}, + {id = 92, name = 'Sergey', age = 25}, + }, { + {'+', 'age', 10}, + {'=', 'name', 'Leo Tolstoy'}, + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + t.assert_equals(result.rows, {{}, {}, {}}) + + -- get + -- primary key = 1 -> bucket_id = 477 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(1) + t.assert_equals(result, {1, 477, 'Leo Tolstoy', 69}) + + -- primary key = 81 -> bucket_id = 2205 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(81) + t.assert_equals(result, {81, 2205, 'Anastasia', 22}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, {92, 2040, 'Sergey', 25}) + + -- batch_insert_object again + -- success with updating all records + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert_object', { + 'customers', + { + {id = 1, name = 'Alex', age = 34}, + {id = 81, name = 'Anastasia', age = 22}, + {id = 92, name = 'Sergey', age = 25}, + }, { + {'+', 'age', 1}, + {'=', 'name', 'Peter'}, + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + t.assert_equals(result.rows, {{}, {}, {}}) + + -- get + -- primary key = 1 -> bucket_id = 477 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(1) + t.assert_equals(result, {1, 477, 'Peter', 70}) + + -- primary key = 81 -> bucket_id = 2205 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(81) + t.assert_equals(result, {81, 2205, 'Peter', 23}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, {92, 2040, 'Peter', 26}) + + -- batch_upsert_object again + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert_object', { + 'customers', + { + {id = 22, name = 'Alex', age = 34}, + {id = 3, name = 'Anastasia', age = 22}, + {id = 5, name = 'Sergey', age = 25}, + }, { + {'=', 'age', 'invalid type'}, + {'=', 'name', 'Leo Tolstoy'}, + } + }) + + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Tuple field 4 (age) type does not match one required by operation') + t.assert_equals(errs[1].tuple, {3, 2804, 'Anastasia', 22}) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + t.assert_equals(result.rows, {{}, {}}) + + -- get + -- primary key = 22 -> bucket_id = 655 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(22) + t.assert_equals(result, {22, 655, 'Alex', 34}) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(5) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- batch_insert_object again + -- fails for both: s1-master s2-master + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert_object', { + 'customers', + { + {id = 2, name = 'Alex', age = 34}, + {id = 3, name = 'Anastasia', age = 22}, + {id = 10, name = 'Sergey', age = 25}, + }, { + {'=', 'age', 'invalid type'}, + {'=', 'name', 'Leo Tolstoy'}, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 2) + + table.sort(errs, function(err1, err2) return err1.tuple[1] < err2.tuple[1] end) + + t.assert_str_contains(errs[1].err, 'Tuple field 4 (age) type does not match one required by operation') + t.assert_equals(errs[1].tuple, {2, 401, 'Alex', 34}) + + t.assert_str_contains(errs[2].err, 'Tuple field 4 (age) type does not match one required by operation') + t.assert_equals(errs[2].tuple, {3, 2804, 'Anastasia', 22}) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 10 -> bucket_id = 569 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(10) + t.assert_equals(result, nil) +end + +pgroup.test_batch_upsert_get = function(g) + -- batch_upsert + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert', { + 'customers', + { + {1, box.NULL, 'Fedor', 59}, + {2, box.NULL, 'Anna', 23}, + {3, box.NULL, 'Daria', 18} + }, { + {'+', 'age', 25}, + {'=', 'name', 'Leo Tolstoy'}, + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + t.assert_equals(result.rows, {{}, {}, {}}) + + -- get + -- primary key = 1 -> bucket_id = 477 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(1) + t.assert_equals(result, {1, 477, 'Fedor', 59}) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- batch_insert again + -- success with updating one record + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert', { + 'customers', + { + {1, box.NULL, 'Alex', 34}, + {81, box.NULL, 'Anastasia', 22}, + {92, box.NULL, 'Sergey', 25}, + }, { + {'+', 'age', 10}, + {'=', 'name', 'Leo Tolstoy'}, + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + t.assert_equals(result.rows, {{}, {}, {}}) + + -- get + -- primary key = 1 -> bucket_id = 477 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(1) + t.assert_equals(result, {1, 477, 'Leo Tolstoy', 69}) + + -- primary key = 81 -> bucket_id = 2205 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(81) + t.assert_equals(result, {81, 2205, 'Anastasia', 22}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, {92, 2040, 'Sergey', 25}) + + -- batch_insert again + -- success with updating all records + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert', { + 'customers', + { + {1, box.NULL, 'Alex', 34}, + {81, box.NULL, 'Anastasia', 22}, + {92, box.NULL, 'Sergey', 25}, + }, { + {'+', 'age', 1}, + {'=', 'name', 'Peter'}, + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + t.assert_equals(result.rows, {{}, {}, {}}) + + -- get + -- primary key = 1 -> bucket_id = 477 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(1) + t.assert_equals(result, {1, 477, 'Peter', 70}) + + -- primary key = 81 -> bucket_id = 2205 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(81) + t.assert_equals(result, {81, 2205, 'Peter', 23}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, {92, 2040, 'Peter', 26}) + + -- batch_upsert again + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert', { + 'customers', + { + {22, box.NULL, 'Alex', 34}, + {3, box.NULL, 'Anastasia', 22}, + {5, box.NULL, 'Sergey', 25}, + }, { + {'=', 'age', 'invalid type'}, + {'=', 'name', 'Leo Tolstoy'}, + } + }) + + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Tuple field 4 (age) type does not match one required by operation') + t.assert_equals(errs[1].tuple, {3, 2804, 'Anastasia', 22}) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + t.assert_equals(result.rows, {{}, {}}) + + -- get + -- primary key = 22 -> bucket_id = 655 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(22) + t.assert_equals(result, {22, 655, 'Alex', 34}) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(5) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- batch_insert again + -- fails for both: s1-master s2-master + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert', { + 'customers', + { + {2, box.NULL, 'Alex', 34}, + {3, box.NULL, 'Anastasia', 22}, + {10, box.NULL, 'Sergey', 25}, + }, { + {'=', 'age', 'invalid type'}, + {'=', 'name', 'Leo Tolstoy'}, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 2) + + table.sort(errs, function(err1, err2) return err1.tuple[1] < err2.tuple[1] end) + + t.assert_str_contains(errs[1].err, 'Tuple field 4 (age) type does not match one required by operation') + t.assert_equals(errs[1].tuple, {2, 401, 'Alex', 34}) + + t.assert_str_contains(errs[2].err, 'Tuple field 4 (age) type does not match one required by operation') + t.assert_equals(errs[2].tuple, {3, 2804, 'Anastasia', 22}) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 10 -> bucket_id = 569 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(10) + t.assert_equals(result, nil) +end + pgroup.test_batch_insert_partial_result = function(g) -- bad fields format local result, errs = g.cluster.main_server.net_box:call('crud.batch_insert', { @@ -405,6 +855,92 @@ pgroup.test_batch_insert_object_partial_result = function(g) t.assert_equals(objects, {{id = 1, name = 'Fedor'}, {id = 2, name = 'Anna'}, {id = 3, name = 'Daria'}}) end +pgroup.test_batch_upsert_partial_result = function(g) + -- bad fields format + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert', { + 'customers', + { + {15, box.NULL, 'Fedor', 59}, + {25, box.NULL, 'Anna', 23}, + }, + { + {'+', 'age', 1}, + {'=', 'name', 'Peter'}, + }, + {fields = {'id', 'invalid'}}, + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Space format doesn\'t contain field named "invalid"') + + -- batch_upsert + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert', { + 'customers', + { + {1, box.NULL, 'Fedor', 59}, + {2, box.NULL, 'Anna', 23}, + {3, box.NULL, 'Daria', 18} + }, + { + {'+', 'age', 1}, + {'=', 'name', 'Peter'}, + }, + {fields = {'id', 'name'}}, + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + }) + t.assert_equals(result.rows, {{}, {}, {}}) +end + +pgroup.test_batch_upsert_object_partial_result = function(g) + -- bad fields format + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert_object', { + 'customers', + { + {id = 15, name = 'Fedor', age = 59}, + {id = 25, name = 'Anna', age = 23}, + }, + { + {'+', 'age', 1}, + {'=', 'name', 'Peter'}, + }, + {fields = {'id', 'invalid'}}, + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Space format doesn\'t contain field named "invalid"') + + -- batch_upsert_object + local result, errs = g.cluster.main_server.net_box:call('crud.batch_upsert_object', { + 'customers', + { + {id = 1, name = 'Fedor', age = 59}, + {id = 2, name = 'Anna', age = 23}, + {id = 3, name = 'Daria', age = 18} + }, + { + {'+', 'age', 1}, + {'=', 'name', 'Peter'}, + }, + {fields = {'id', 'name'}}, + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + }) + t.assert_equals(result.rows, {{}, {}, {}}) +end + pgroup.test_opts_not_damaged = function(g) -- batch insert local batch_insert_opts = {timeout = 1, fields = {'name', 'age'}} @@ -439,4 +975,44 @@ pgroup.test_opts_not_damaged = function(g) t.assert_equals(err, nil) t.assert_equals(new_batch_insert_opts, batch_insert_opts) + + -- batch upsert + local batch_upsert_opts = {timeout = 1, fields = {'name', 'age'}} + local new_batch_upsert_opts, err = g.cluster.main_server:eval([[ + local crud = require('crud') + + local batch_upsert_opts = ... + + local _, err = crud.batch_upsert('customers', { + {1, box.NULL, 'Alex', 59} + }, { + {'+', 'age', 25}, + {'=', 'name', 'Leo Tolstoy'}, + }, batch_upsert_opts) + + return batch_upsert_opts, err + ]], {batch_upsert_opts}) + + t.assert_equals(err, nil) + t.assert_equals(new_batch_upsert_opts, batch_upsert_opts) + + -- batch upsert_object + local batch_upsert_opts = {timeout = 1, fields = {'name', 'age'}} + local new_batch_upsert_opts, err = g.cluster.main_server:eval([[ + local crud = require('crud') + + local batch_upsert_opts = ... + + local _, err = crud.batch_upsert_object('customers', { + {id = 2, name = 'Fedor', age = 59} + }, { + {'+', 'age', 25}, + {'=', 'name', 'Leo Tolstoy'}, + }, batch_upsert_opts) + + return batch_upsert_opts, err + ]], {batch_upsert_opts}) + + t.assert_equals(err, nil) + t.assert_equals(new_batch_upsert_opts, batch_upsert_opts) end