From e9132cbeeb28f32a1463b290438f8894bb10ebc9 Mon Sep 17 00:00:00 2001 From: AnaNek Date: Thu, 11 Nov 2021 12:32:26 +0300 Subject: [PATCH] Implementation of batch insert Batch insert is mostly used for operation with one bucket / one Tarantool node in a transaction. In this case batch insert is more efficient then inserting tuple-by-tuple. Right now CRUD cannot provide batch insert with full consistency. CRUD offers batch insert with partial consistency. That means that full consistency can be provided only on single replicaset using `box` transactions. Part of #193 --- CHANGELOG.md | 3 + README.md | 80 ++++ crud.lua | 10 + crud/batch_insert.lua | 185 +++++++++ crud/common/call.lua | 34 ++ crud/common/sharding.lua | 23 ++ crud/common/utils.lua | 13 + test/integration/batch_operations_test.lua | 442 +++++++++++++++++++++ 8 files changed, 790 insertions(+) create mode 100644 crud/batch_insert.lua create mode 100644 test/integration/batch_operations_test.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f606f62..530b68a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +* Batch insert operation `crud.batch_insert()`/`crud.batch_insert_object()` + with partial consistency + ### Changed ### Fixed diff --git a/README.md b/README.md index 7ddb6085..ff3d0781 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,86 @@ crud.insert_object('customers', { ... ``` +### Batch insert + +Right now CRUD cannot provide batch insert with full consistency. +CRUD offers batch insert with partial consistency. That means +that full consistency can be provided only on single replicaset +using `box` transactions. + +```lua +-- Batch insert tuples +local result, err = crud.batch_insert(space_name, tuples, opts) +-- Batch insert objects +local result, err = crud.batch_insert_object(space_name, objects, opts) +``` + +where: + +* `space_name` (`string`) - name of the space to insert an object +* `tuples` / `objects` (`table`) - array of tuples/objects to insert +* `opts`: + * `timeout` (`?number`) - `vshard.call` timeout (in seconds) + * `fields` (`?table`) - field names for getting only a subset of fields + +Returns metadata and array contains inserted rows, array of errors ( +one error corresponds to one replicaset for which the error occurred). +Error object can contain `tuple` field. This field contains the tuple +for which the error occurred. + +**Example:** + +```lua +crud.batch_insert('customers', { + {1, box.NULL, 'Elizabeth', 23}, + {2, box.NULL, 'Anastasia', 22}, +}) +--- +- metadata: + - {'name': 'id', 'type': 'unsigned'} + - {'name': 'bucket_id', 'type': 'unsigned'} + - {'name': 'name', 'type': 'string'} + - {'name': 'age', 'type': 'number'} + rows: + - [1, 477, 'Elizabeth', 23] + - [2, 401, 'Anastasia', 22] +... +crud.batch_insert_object('customers', { + {id = 3, name = 'Elizabeth', age = 24}, + {id = 10, name = 'Anastasia', age = 21}, +}) +--- +- metadata: + - {'name': 'id', 'type': 'unsigned'} + - {'name': 'bucket_id', 'type': 'unsigned'} + - {'name': 'name', 'type': 'string'} + - {'name': 'age', 'type': 'number'} + rows: + - [3, 2804, 'Elizabeth', 24] + - [10, 569, 'Anastasia', 21] + +-- Partial success +local res, errs = crud.batch_insert_object('customers', { + {id = 22, name = 'Alex', age = 34}, + {id = 3, name = 'Anastasia', age = 22}, + {id = 5, name = 'Sergey', age = 25}, +}) +--- +res +- metadata: + - {'name': 'id', 'type': 'unsigned'} + - {'name': 'bucket_id', 'type': 'unsigned'} + - {'name': 'name', 'type': 'string'} + - {'name': 'age', 'type': 'number'} + rows: + - [5, 1172, 'Sergey', 25], + - [22, 655, 'Alex', 34], + +errs +- [err = 'Duplicate key exists ...', ..., tuple = [3, 2804, 'Anastasia', 22]] +... +``` + ### Get ```lua diff --git a/crud.lua b/crud.lua index 2777013e..ac235a13 100644 --- a/crud.lua +++ b/crud.lua @@ -3,6 +3,7 @@ -- @module crud local insert = require('crud.insert') +local batch_insert = require('crud.batch_insert') local replace = require('crud.replace') local get = require('crud.get') local update = require('crud.update') @@ -28,6 +29,14 @@ crud.insert = insert.tuple -- @function insert_object crud.insert_object = insert.object +-- @refer batch_insert.tuples_batch +-- @function batch_insert +crud.batch_insert = batch_insert.tuples_batch + +-- @refer batch_insert.objects_batch +-- @function batch_insert_object +crud.batch_insert_object = batch_insert.objects_batch + -- @refer get.call -- @function get crud.get = get.call @@ -105,6 +114,7 @@ function crud.init_storage() end insert.init() + batch_insert.init() get.init() replace.init() update.init() diff --git a/crud/batch_insert.lua b/crud/batch_insert.lua new file mode 100644 index 00000000..5ecae2c2 --- /dev/null +++ b/crud/batch_insert.lua @@ -0,0 +1,185 @@ +local checks = require('checks') +local errors = require('errors') +local vshard = require('vshard') + +local call = require('crud.common.call') +local utils = require('crud.common.utils') +local sharding = require('crud.common.sharding') +local dev_checks = require('crud.common.dev_checks') +local schema = require('crud.common.schema') + +local BatchInsertError = errors.new_class('BatchInsertError', {capture_stack = false}) + +local batch_insert = {} + +local BATCH_INSERT_FUNC_NAME = '_crud.batch_insert_on_storage' + +local function batch_insert_on_storage(space_name, batch, opts) + dev_checks('string', 'table', { + fields = '?table', + }) + + opts = opts or {} + + local space = box.space[space_name] + if space == nil then + return nil, BatchInsertError:new("Space %q doesn't exist", space_name) + end + + local inserted_tuples = {} + + box.begin() + for _, tuple in ipairs(batch) do + local insert_result = schema.wrap_box_space_func_result(space, 'insert', {tuple}, { + field_names = opts.fields, + }) + + table.insert(inserted_tuples, insert_result.res) + if insert_result.err ~= nil then + box.commit() + return nil, { + err = insert_result.err, + tuple = tuple, + } + end + end + box.commit() + + return inserted_tuples +end + +function batch_insert.init() + _G._crud.batch_insert_on_storage = batch_insert_on_storage +end + +-- returns result, err, need_reload +-- need_reload indicates if reloading schema could help +-- see crud.common.schema.wrap_func_reload() +local function call_batch_insert_on_router(space_name, tuples, opts) + dev_checks('string', 'table', { + timeout = '?number', + fields = '?table', + }) + + opts = opts or {} + + local space = utils.get_space(space_name, vshard.router.routeall()) + if space == nil then + return nil, {BatchInsertError:new("Space %q doesn't exist", space_name)}, true + end + + local batches_by_replicasets, err = sharding.split_tuples_by_replicaset(tuples, space) + if err ~= nil then + return nil, {err}, true + end + + local batch_insert_on_storage_opts = { + fields = opts.fields, + } + + local call_opts = { + timeout = opts.timeout, + is_async = true, + } + + local futures_by_replicasets = {} + for replicaset, batch in pairs(batches_by_replicasets) do + local func_args = { + space_name, + batch, + batch_insert_on_storage_opts, + } + + local future = replicaset:call(BATCH_INSERT_FUNC_NAME, func_args, call_opts) + futures_by_replicasets[replicaset.uuid] = future + end + + local results, errs = call.batch( + futures_by_replicasets, + BATCH_INSERT_FUNC_NAME, + opts.timeout + ) + + local rows = {} + for _, result in pairs(results) do + rows = utils.table_extend(rows, result[1]) + end + + if next(rows) == nil then + return nil, errs + end + + local res, err = utils.format_result(rows, space, opts.fields) + if err ~= nil then + return nil, {err} + end + + return res, errs +end + +--- Batch inserts tuples to the specified space +-- +-- @function tuples_batch +-- +-- @param string space_name +-- A space name +-- +-- @param table tuples +-- Tuples +-- +-- @tparam ?table opts +-- Options of batch_insert.tuples_batch +-- +-- @return[1] tuples +-- @treturn[2] nil +-- @treturn[2] table of tables Error description + +function batch_insert.tuples_batch(space_name, tuples, opts) + checks('string', 'table', { + timeout = '?number', + fields = '?table', + }) + + return schema.wrap_func_reload(call_batch_insert_on_router, space_name, tuples, opts) +end + +--- Batch inserts objects to the specified space +-- +-- @function objects_batch +-- +-- @param string space_name +-- A space name +-- +-- @param table objs +-- Objects +-- +-- @tparam ?table opts +-- Options of batch_insert.tuples_batch +-- +-- @return[1] objects +-- @treturn[2] nil +-- @treturn[2] table of tables Error description + +function batch_insert.objects_batch(space_name, objs, opts) + checks('string', 'table', { + timeout = '?number', + fields = '?table', + }) + + local tuples = {} + for _, obj in ipairs(objs) do + + local tuple, err = utils.flatten_obj_reload(space_name, obj) + if err ~= nil then + local err_obj = BatchInsertError:new("Failed to flatten object: %s", err) + err_obj.tuple = obj + return nil, {err_obj} + end + + table.insert(tuples, tuple) + end + + return batch_insert.tuples_batch(space_name, tuples, opts) +end + +return batch_insert diff --git a/crud/common/call.lua b/crud/common/call.lua index 65fc5e98..4f366490 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -64,6 +64,40 @@ local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id) )) end +function call.batch(futures_by_replicasets, func_name, timeout) + dev_checks('table', 'string', '?number') + + local timeout = timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local errs + + local results = {} + local deadline = fiber_clock() + timeout + for replicaset_uuid, future in pairs(futures_by_replicasets) do + local wait_timeout = deadline - fiber_clock() + if wait_timeout < 0 then + wait_timeout = 0 + end + + local result, err = future:wait_result(wait_timeout) + if err == nil and result[1] == nil then + err = result[2] + end + + if err ~= nil then + local err_obj = wrap_vshard_err(err.err or err, func_name, replicaset_uuid) + err_obj.tuple = err.tuple + + errs = errs or {} + table.insert(errs, err_obj) + else + table.insert(results, result) + end + + end + + return results, errs +end + function call.map(func_name, func_args, opts) dev_checks('string', '?table', { mode = 'string', diff --git a/crud/common/sharding.lua b/crud/common/sharding.lua index c7802d2e..181e838c 100644 --- a/crud/common/sharding.lua +++ b/crud/common/sharding.lua @@ -2,6 +2,7 @@ local vshard = require('vshard') local errors = require('errors') local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false}) +local GetReplicasetsError = errors.new_class("GetReplicasetsError", {capture_stack = false}) local utils = require('crud.common.utils') local sharding_key_module = require('crud.common.sharding_key') @@ -65,4 +66,26 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_ return bucket_id end +function sharding.split_tuples_by_replicaset(tuples, space) + local batches = {} + + for _, tuple in ipairs(tuples) do + local bucket_id, err = sharding.tuple_set_and_return_bucket_id(tuple, space) + if err ~= nil then + return nil, BucketIDError:new("Failed to get bucket ID: %s", err) + end + + local replicaset, err = vshard.router.route(bucket_id) + if replicaset == nil then + return nil, GetReplicasetsError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err) + end + + local tuples_by_replicaset = batches[replicaset] or {} + table.insert(tuples_by_replicaset, tuple) + batches[replicaset] = tuples_by_replicaset + end + + return batches +end + return sharding diff --git a/crud/common/utils.lua b/crud/common/utils.lua index d7a62941..6d7c5624 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -606,4 +606,17 @@ function utils.merge_options(opts_a, opts_b) return fun.chain(opts_a or {}, opts_b or {}):tomap() end +function utils.table_extend(list, values) + if values == nil then + return list + end + + list = list or {} + + return fun.reduce( + function(list, value) table.insert(list, value) return list end, + list, pairs(values) + ) +end + return utils diff --git a/test/integration/batch_operations_test.lua b/test/integration/batch_operations_test.lua new file mode 100644 index 00000000..33308d77 --- /dev/null +++ b/test/integration/batch_operations_test.lua @@ -0,0 +1,442 @@ +local fio = require('fio') + +local t = require('luatest') +local crud = require('crud') + +local helpers = require('test.helper') + +local pgroup = t.group('batch_operations', { + {engine = 'memtx'}, + {engine = 'vinyl'}, +}) + +pgroup.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_simple_operations'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + env = { + ['ENGINE'] = g.params.engine, + }, + }) + + g.cluster:start() +end) + +pgroup.after_all(function(g) helpers.stop_cluster(g.cluster) end) + +pgroup.before_each(function(g) + helpers.truncate_space_on_cluster(g.cluster, 'customers') +end) + +pgroup.test_non_existent_space = function(g) + -- batch_insert + local result, errs = g.cluster.main_server.net_box:call('crud.batch_insert', { + 'non_existent_space', + { + {1, box.NULL, 'Alex', 59}, + {2, box.NULL, 'Anna', 23}, + {3, box.NULL, 'Daria', 18} + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Space "non_existent_space" doesn\'t exist') + + -- batch_insert_object + local result, errs = g.cluster.main_server.net_box:call('crud.batch_insert_object', { + 'non_existent_space', + { + {id = 1, name = 'Fedor', age = 59}, + {id = 2, name = 'Anna', age = 23}, + {id = 3, name = 'Daria', age = 18} + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Space "non_existent_space" doesn\'t exist') +end + +pgroup.test_batch_insert_object_get = function(g) + -- bad format + local result, errs = g.cluster.main_server.net_box:call('crud.batch_insert_object', { + 'customers', + { + {id = 1, name = 'Fedor', age = 59}, + {id = 2, name = 'Anna'}, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Field \"age\" isn\'t nullable') + t.assert_equals(errs[1].tuple, {id = 2, name = 'Anna'}) + + -- batch_insert_object + local result, errs = g.cluster.main_server.net_box:call('crud.batch_insert_object', { + 'customers', + { + {id = 1, name = 'Fedor', age = 59}, + {id = 2, name = 'Anna', age = 23}, + {id = 3, name = 'Daria', age = 18} + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + table.sort(objects, function(obj1, obj2) return obj1.id < obj2.id end) + t.assert_equals(objects, { + {id = 1, name = 'Fedor', age = 59, bucket_id = 477}, + {id = 2, name = 'Anna', age = 23, bucket_id = 401}, + {id = 3, name = 'Daria', age = 18, bucket_id = 2804}, + }) + + -- get + -- primary key = 1 -> bucket_id = 477 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(1) + t.assert_equals(result, {1, 477, 'Fedor', 59}) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- batch_insert_object again + local result, errs = g.cluster.main_server.net_box:call('crud.batch_insert_object', { + 'customers', + { + {id = 22, name = 'Alex', age = 34}, + {id = 3, name = 'Anastasia', age = 22}, + {id = 5, name = 'Sergey', age = 25}, + } + }) + + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].tuple, {3, 2804, 'Anastasia', 22}) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + table.sort(objects, function(obj1, obj2) return obj1.id < obj2.id end) + t.assert_equals(objects, { + {id = 5, name = 'Sergey', age = 25, bucket_id = 1172}, + {id = 22, name = 'Alex', age = 34, bucket_id = 655}, + }) + + -- get + -- primary key = 22 -> bucket_id = 655 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(22) + t.assert_equals(result, {22, 655, 'Alex', 34}) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(5) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- batch_insert_object again + -- fails for both: s1-master s2-master + local result, errs = g.cluster.main_server.net_box:call('crud.batch_insert_object', { + 'customers', + { + {id = 2, name = 'Alex', age = 34}, + {id = 3, name = 'Anastasia', age = 22}, + {id = 10, name = 'Sergey', age = 25}, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 2) + + table.sort(errs, function(err1, err2) return err1.tuple[1] < err2.tuple[1] end) + + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].tuple, {2, 401, 'Alex', 34}) + + t.assert_str_contains(errs[2].err, 'Duplicate key exists') + t.assert_equals(errs[2].tuple, {3, 2804, 'Anastasia', 22}) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 10 -> bucket_id = 569 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(10) + t.assert_equals(result, nil) +end + +pgroup.test_batch_insert_get = function(g) + -- batch_insert + local result, errs = g.cluster.main_server.net_box:call('crud.batch_insert', { + 'customers', + { + {1, box.NULL, 'Fedor', 59}, + {2, box.NULL, 'Anna', 23}, + {3, box.NULL, 'Daria', 18} + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + table.sort(objects, function(obj1, obj2) return obj1.id < obj2.id end) + t.assert_equals(objects, { + {id = 1, name = 'Fedor', age = 59, bucket_id = 477}, + {id = 2, name = 'Anna', age = 23, bucket_id = 401}, + {id = 3, name = 'Daria', age = 18, bucket_id = 2804}, + }) + + -- get + -- primary key = 1 -> bucket_id = 477 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(1) + t.assert_equals(result, {1, 477, 'Fedor', 59}) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- batch_insert again + local result, errs = g.cluster.main_server.net_box:call('crud.batch_insert', { + 'customers', + { + {22, box.NULL, 'Alex', 34}, + {3, box.NULL, 'Anastasia', 22}, + {5, box.NULL, 'Sergey', 25}, + } + }) + + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].tuple, {3, 2804, 'Anastasia', 22}) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + table.sort(objects, function(obj1, obj2) return obj1.id < obj2.id end) + t.assert_equals(objects, { + {id = 5, name = 'Sergey', age = 25, bucket_id = 1172}, + {id = 22, name = 'Alex', age = 34, bucket_id = 655}, + }) + + -- get + -- primary key = 22 -> bucket_id = 655 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(22) + t.assert_equals(result, {22, 655, 'Alex', 34}) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(5) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- batch_insert again + -- fails for both: s1-master s2-master + local result, errs = g.cluster.main_server.net_box:call('crud.batch_insert', { + 'customers', + { + {2, box.NULL, 'Alex', 34}, + {3, box.NULL, 'Anastasia', 22}, + {10, box.NULL, 'Sergey', 25}, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 2) + + table.sort(errs, function(err1, err2) return err1.tuple[1] < err2.tuple[1] end) + + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].tuple, {2, 401, 'Alex', 34}) + + t.assert_str_contains(errs[2].err, 'Duplicate key exists') + t.assert_equals(errs[2].tuple, {3, 2804, 'Anastasia', 22}) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 10 -> bucket_id = 569 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(10) + t.assert_equals(result, nil) +end + +pgroup.test_batch_insert_partial_result = function(g) + -- bad fields format + local result, errs = g.cluster.main_server.net_box:call('crud.batch_insert', { + 'customers', + { + {15, box.NULL, 'Fedor', 59}, + {25, box.NULL, 'Anna', 23}, + }, + {fields = {'id', 'invalid'}}, + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Space format doesn\'t contain field named "invalid"') + + -- batch_insert + local result, errs = g.cluster.main_server.net_box:call('crud.batch_insert', { + 'customers', + { + {1, box.NULL, 'Fedor', 59}, + {2, box.NULL, 'Anna', 23}, + {3, box.NULL, 'Daria', 18} + }, + {fields = {'id', 'name'}}, + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + table.sort(objects, function(obj1, obj2) return obj1.id < obj2.id end) + t.assert_equals(objects, {{id = 1, name = 'Fedor'}, {id = 2, name = 'Anna'}, {id = 3, name = 'Daria'}}) +end + +pgroup.test_batch_insert_object_partial_result = function(g) + -- bad fields format + local result, errs = g.cluster.main_server.net_box:call('crud.batch_insert_object', { + 'customers', + { + {id = 15, name = 'Fedor', age = 59}, + {id = 25, name = 'Anna', age = 23}, + }, + {fields = {'id', 'invalid'}}, + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Space format doesn\'t contain field named "invalid"') + + -- batch_insert_object + local result, errs = g.cluster.main_server.net_box:call('crud.batch_insert_object', { + 'customers', + { + {id = 1, name = 'Fedor', age = 59}, + {id = 2, name = 'Anna', age = 23}, + {id = 3, name = 'Daria', age = 18} + }, + {fields = {'id', 'name'}}, + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + table.sort(objects, function(obj1, obj2) return obj1.id < obj2.id end) + t.assert_equals(objects, {{id = 1, name = 'Fedor'}, {id = 2, name = 'Anna'}, {id = 3, name = 'Daria'}}) +end + +pgroup.test_opts_not_damaged = function(g) + -- batch insert + local batch_insert_opts = {timeout = 1, fields = {'name', 'age'}} + local new_batch_insert_opts, err = g.cluster.main_server:eval([[ + local crud = require('crud') + + local batch_insert_opts = ... + + local _, err = crud.batch_insert('customers', { + {1, box.NULL, 'Alex', 59} + }, batch_insert_opts) + + return batch_insert_opts, err + ]], {batch_insert_opts}) + + t.assert_equals(err, nil) + t.assert_equals(new_batch_insert_opts, batch_insert_opts) + + -- batch insert_object + local batch_insert_opts = {timeout = 1, fields = {'name', 'age'}} + local new_batch_insert_opts, err = g.cluster.main_server:eval([[ + local crud = require('crud') + + local batch_insert_opts = ... + + local _, err = crud.batch_insert_object('customers', { + {id = 2, name = 'Fedor', age = 59} + }, batch_insert_opts) + + return batch_insert_opts, err + ]], {batch_insert_opts}) + + t.assert_equals(err, nil) + t.assert_equals(new_batch_insert_opts, batch_insert_opts) +end