From 1a895f8d3d634a761aad99dd1467b853a1e2d43d Mon Sep 17 00:00:00 2001 From: Ilya Grishnov Date: Wed, 31 May 2023 15:23:40 +0300 Subject: [PATCH] schema: fix stale schema for metadata generation Corrects using of an stale schema to generate metadata during `crud.update`, `crud.insert`, `crud.insert_*`, `crud.replace`, `crud.replace_*`, `crud.upsert`, `crud.upsert_*`, `crud.delete`, `crud.max`, `crud.min`, `crud.select` and `crud.get` working. If the implemented `fetch_latest_metadata` option is used, it is guaranteed that the metadata will be up-to-date. Before receiving the space format, a mismatch check will be performed between the scheme version on all involved storage and the scheme version in the net_box connection of the router. In case of mismatch, the schema reload will be triggered. Closes #236 --- CHANGELOG.md | 3 + README.md | 40 ++++ crud/borders.lua | 20 +- .../map_call_cases/base_postprocessor.lua | 10 +- .../map_call_cases/batch_postprocessor.lua | 5 + crud/common/schema.lua | 21 +- crud/common/stash.lua | 8 + crud/common/utils.lua | 92 +++++++- crud/delete.lua | 15 +- crud/get.lua | 15 +- crud/insert.lua | 16 +- crud/insert_many.lua | 30 ++- crud/replace.lua | 16 +- crud/replace_many.lua | 31 ++- crud/select.lua | 54 +++-- crud/select/compat/select.lua | 67 +++++- crud/select/compat/select_old.lua | 75 +++++-- crud/select/iterator.lua | 6 +- crud/select/merger.lua | 9 + crud/update.lua | 15 +- crud/upsert.lua | 16 +- crud/upsert_many.lua | 30 ++- doc/dev/schema.md | 3 + test/entrypoint/srv_simple_operations.lua | 35 ++- test/integration/simple_operations_test.lua | 209 +++++++++++++++++- 25 files changed, 753 insertions(+), 88 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e68de567..3c8705a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. `upsert`, `upsert_object`, `upsert_many`, `upsert_object_many`, `update`, `delete`. (#267). +### Fixed +* Crud DML operations was returning stale schema for metadata generation (#236). + ## [1.1.1] - 24-03-23 ### Changed diff --git a/README.md b/README.md index 2bb9d0c8..68e90541 100644 --- a/README.md +++ b/README.md @@ -236,6 +236,10 @@ where: the sole responsibility of the developer * `noreturn` (`?boolean`) - suppress successfully processed tuple (first return value is `nil`). `false` by default + * `fetch_latest_metadata` (`?boolean`) - guarantees the + up-to-date metadata (space format) in first return value, otherwise + it may not take into account the latest migration of the data format. + Performance overhead is up to 15%. `false` by default Returns metadata and array contains one inserted row, error. @@ -303,6 +307,10 @@ where: the sole responsibility of the developer * `noreturn` (`?boolean`) - suppress successfully processed tuples (first return value is `nil`). `false` by default + * `fetch_latest_metadata` (`?boolean`) - guarantees the + up-to-date metadata (space format) in first return value, otherwise + it may not take into account the latest migration of the data format. + Performance overhead is up to 15%. `false` by default Returns metadata and array with inserted rows, array of errors. Each error object can contain field `operation_data`. @@ -441,6 +449,10 @@ where: * `vshard_router` (`?string|table`) - Cartridge vshard group name or vshard router instance. Set this parameter if your space is not a part of the default vshard cluster + * `fetch_latest_metadata` (`?boolean`) - guarantees the + up-to-date metadata (space format) in first return value, otherwise + it may not take into account the latest migration of the data format. + Performance overhead is up to 15%. `false` by default Returns metadata and array contains one row, error. @@ -480,6 +492,10 @@ where: a part of the default vshard cluster * `noreturn` (`?boolean`) - suppress successfully processed tuple (first return value is `nil`). `false` by default + * `fetch_latest_metadata` (`?boolean`) - guarantees the + up-to-date metadata (space format) in first return value, otherwise + it may not take into account the latest migration of the data format. + Performance overhead is up to 15%. `false` by default Returns metadata and array contains one updated row, error. @@ -518,6 +534,10 @@ where: a part of the default vshard cluster * `noreturn` (`?boolean`) - suppress successfully processed tuple (first return value is `nil`). `false` by default + * `fetch_latest_metadata` (`?boolean`) - guarantees the + up-to-date metadata (space format) in first return value, otherwise + it may not take into account the latest migration of the data format. + Performance overhead is up to 15%. `false` by default Returns metadata and array contains one deleted row (empty for vinyl), error. @@ -567,6 +587,10 @@ where: the sole responsibility of the developer * `noreturn` (`?boolean`) - suppress successfully processed tuple (first return value is `nil`). `false` by default + * `fetch_latest_metadata` (`?boolean`) - guarantees the + up-to-date metadata (space format) in first return value, otherwise + it may not take into account the latest migration of the data format. + Performance overhead is up to 15%. `false` by default Returns inserted or replaced rows and metadata or nil with error. @@ -634,6 +658,10 @@ where: the sole responsibility of the developer * `noreturn` (`?boolean`) - suppress successfully processed tuples (first return value is `nil`). `false` by default + * `fetch_latest_metadata` (`?boolean`) - guarantees the + up-to-date metadata (space format) in first return value, otherwise + it may not take into account the latest migration of the data format. + Performance overhead is up to 15%. `false` by default Returns metadata and array with inserted/replaced rows, array of errors. Each error object can contain field `operation_data`. @@ -772,6 +800,10 @@ where: a part of the default vshard cluster * `noreturn` (`?boolean`) - suppress successfully processed tuple (first return value is `nil`). `false` by default + * `fetch_latest_metadata` (`?boolean`) - guarantees the + up-to-date metadata (space format) in first return value, otherwise + it may not take into account the latest migration of the data format. + Performance overhead is up to 15%. `false` by default Returns metadata and empty array of rows or nil, error. @@ -835,6 +867,10 @@ where: a part of the default vshard cluster * `noreturn` (`?boolean`) - suppress successfully processed tuples (first return value is `nil`). `false` by default + * `fetch_latest_metadata` (`?boolean`) - guarantees the + up-to-date metadata (space format) in first return value, otherwise + it may not take into account the latest migration of the data format. + Performance overhead is up to 15%. `false` by default Returns metadata and array of errors. Each error object can contain field `operation_data`. @@ -977,6 +1013,10 @@ where: a part of the default vshard cluster * `yield_every` (`?number`) - number of tuples processed on storage to yield after, `yield_every` should be > 0, default value is 1000 + * `fetch_latest_metadata` (`?boolean`) - guarantees the + up-to-date metadata (space format) in first return value, otherwise + it may not take into account the latest migration of the data format. + Performance overhead is up to 15%. `false` by default Returns metadata and array of rows, error. diff --git a/crud/borders.lua b/crud/borders.lua index c716a930..68a579e1 100644 --- a/crud/borders.lua +++ b/crud/borders.lua @@ -16,8 +16,8 @@ local borders = {} local STAT_FUNC_NAME = '_crud.get_border_on_storage' -local function get_border_on_storage(border_name, space_name, index_id, field_names) - dev_checks('string', 'string', 'number', '?table') +local function get_border_on_storage(border_name, space_name, index_id, field_names, fetch_latest_metadata) + dev_checks('string', 'string', 'number', '?table', '?boolean') assert(border_name == 'min' or border_name == 'max') @@ -38,6 +38,7 @@ local function get_border_on_storage(border_name, space_name, index_id, field_na return schema.wrap_func_result(space, get_index_border, {index}, { add_space_schema_hash = true, field_names = field_names, + fetch_latest_metadata = fetch_latest_metadata, }) end @@ -71,9 +72,10 @@ local function call_get_border_on_router(vshard_router, border_name, space_name, timeout = '?number', fields = '?table', vshard_router = '?string|table', + fetch_latest_metadata = '?boolean', }) - local space, err = utils.get_space(space_name, vshard_router, opts.timeout) + local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, BorderError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end @@ -108,9 +110,9 @@ local function call_get_border_on_router(vshard_router, border_name, space_name, replicasets = replicasets, timeout = opts.timeout, } - local results, err = call.map(vshard_router, + local results, err, storages_info = call.map(vshard_router, STAT_FUNC_NAME, - {border_name, space_name, index.id, field_names}, + {border_name, space_name, index.id, field_names, opts.fetch_latest_metadata}, call_opts ) @@ -154,6 +156,14 @@ local function call_get_border_on_router(vshard_router, border_name, space_name, end end + if opts.fetch_latest_metadata == true then + -- This option is temporary and is related to [1], [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + space = utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts, + storages_info, netbox_schema_version) + end + local result = utils.format_result({res_tuple}, space, field_names) if opts.fields ~= nil then diff --git a/crud/common/map_call_cases/base_postprocessor.lua b/crud/common/map_call_cases/base_postprocessor.lua index 7502cf71..fa138da8 100644 --- a/crud/common/map_call_cases/base_postprocessor.lua +++ b/crud/common/map_call_cases/base_postprocessor.lua @@ -13,6 +13,7 @@ function BasePostprocessor:new(vshard_router) early_exit = false, errs = nil, vshard_router = vshard_router, + storage_info = {}, } setmetatable(postprocessor, self) @@ -52,6 +53,12 @@ function BasePostprocessor:collect(result_info, err_info) wrapper_args = '?table', }) + if result_info.value ~= nil and type(result_info.value[1]) == 'table' then + self.storage_info[result_info.key] = {} + local replica_schema_version = result_info.value[1].replica_schema_version + self.storage_info[result_info.key].replica_schema_version = replica_schema_version + end + local err = err_info.err if err == nil and result_info.value[1] == nil then err = result_info.value[2] @@ -78,8 +85,9 @@ end -- -- @return[1] table results -- @return[2] table errs +-- @return[3] table storage_info function BasePostprocessor:get() - return self.results, self.errs + return self.results, self.errs, self.storage_info end return BasePostprocessor diff --git a/crud/common/map_call_cases/batch_postprocessor.lua b/crud/common/map_call_cases/batch_postprocessor.lua index 73a23983..33ed79cf 100644 --- a/crud/common/map_call_cases/batch_postprocessor.lua +++ b/crud/common/map_call_cases/batch_postprocessor.lua @@ -39,6 +39,11 @@ function BatchPostprocessor:collect(result_info, err_info) wrapper_args = '?table', }) + if result_info.value ~= nil then + self.storage_info[result_info.key] = {} + self.storage_info[result_info.key].replica_schema_version = result_info.value[3] + end + local errs = {err_info.err} if err_info.err == nil then errs = result_info.value[2] diff --git a/crud/common/schema.lua b/crud/common/schema.lua index bfb74c9d..91ff2a67 100644 --- a/crud/common/schema.lua +++ b/crud/common/schema.lua @@ -51,7 +51,7 @@ end local reload_in_progress = {} local reload_schema_cond = {} -local function reload_schema(vshard_router) +function schema.reload_schema(vshard_router) local replicasets = vshard_router:routeall() local vshard_router_name = vshard_router.name @@ -96,7 +96,7 @@ function schema.wrap_func_reload(vshard_router, func, ...) break end - local ok, reload_schema_err = reload_schema(vshard_router) + local ok, reload_schema_err = schema.reload_schema(vshard_router) if not ok then log.warn("Failed to reload schema: %s", reload_schema_err) break @@ -104,6 +104,8 @@ function schema.wrap_func_reload(vshard_router, func, ...) i = i + 1 if i > const.RELOAD_RETRIES_NUM then + local warn_msg = "Number of attempts to reload schema has been ended: %s" + log.warn(warn_msg, const.RELOAD_RETRIES_NUM) break end end @@ -221,6 +223,17 @@ function schema.wrap_func_result(space, func, args, opts) end end + if opts.fetch_latest_metadata == true then + local replica_schema_version + if box.info.schema_version ~= nil then + replica_schema_version = box.info.schema_version + else + replica_schema_version = box.internal.schema_version() + end + result.replica_uuid = box.info().uuid + result.replica_schema_version = replica_schema_version + end + return result end @@ -235,7 +248,9 @@ function schema.wrap_box_space_func_result(space, box_space_func_name, box_space return space[box_space_func_name](space, unpack(box_space_func_args)) end - return schema.wrap_func_result(space, func, {space, box_space_func_name, box_space_func_args}, opts) + local res, err = schema.wrap_func_result(space, func, {space, box_space_func_name, box_space_func_args}, opts) + + return res, err end -- schema.result_needs_reload checks that schema reload can diff --git a/crud/common/stash.lua b/crud/common/stash.lua index 97c54c78..1eb52f5f 100644 --- a/crud/common/stash.lua +++ b/crud/common/stash.lua @@ -20,12 +20,20 @@ local stash = {} -- @tfield string stats_metrics_registry -- Stash for metrics rocks statistics registry. -- +-- @tfield string storages_info_on_select +-- Stash for storages info during select working registry. +-- +-- @tfield string select_module_compat_info +-- Stash for select compatability version registry. +-- stash.name = { cfg = '__crud_cfg', stats_internal = '__crud_stats_internal', stats_local_registry = '__crud_stats_local_registry', stats_metrics_registry = '__crud_stats_metrics_registry', ddl_triggers = '__crud_ddl_spaces_triggers', + storages_info_on_select = '__storages_info_on_select', + select_module_compat_info = '__select_module_compat_info', } --- Setup Tarantool Cartridge reload. diff --git a/crud/common/utils.lua b/crud/common/utils.lua index 45e1a4df..d028bb06 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -98,7 +98,19 @@ function utils.format_replicaset_error(replicaset_uuid, msg, ...) ) end -function utils.get_space(space_name, vshard_router, timeout) +local function get_replicaset_by_replica_uuid(replicasets, uuid) + for replicaset_uuid, replicaset in pairs(replicasets) do + for replica_uuid, _ in pairs(replicaset.replicas) do + if replica_uuid == uuid then + return replicasets[replicaset_uuid] + end + end + end + + return nil +end + +function utils.get_space(space_name, vshard_router, timeout, replica_uuid) local replicasets, replicaset timeout = timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local deadline = fiber.clock() + timeout @@ -110,7 +122,16 @@ function utils.get_space(space_name, vshard_router, timeout) -- Try to get master with timeout. fiber.yield() replicasets = vshard_router:routeall() - replicaset = select(2, next(replicasets)) + if replica_uuid ~= nil then + -- Get the same replica on which the last DML operation was performed. + -- This approach is temporary and is related to [1], [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + replicaset = get_replicaset_by_replica_uuid(replicasets, replica_uuid) + break + else + replicaset = select(2, next(replicasets)) + end if replicaset ~= nil and replicaset.master ~= nil and replicaset.master.conn.error == nil then @@ -141,7 +162,7 @@ function utils.get_space(space_name, vshard_router, timeout) local space = replicaset.master.conn.space[space_name] - return space + return space, nil, replicaset.master.conn.schema_version end function utils.get_space_format(space_name, vshard_router) @@ -158,6 +179,71 @@ function utils.get_space_format(space_name, vshard_router) return space_format end +function utils.fetch_latest_metadata_when_single_storage(space, space_name, netbox_schema_version, + vshard_router, opts, storage_result) + -- Checking the relevance of the schema version is necessary + -- to prevent the irrelevant metadata of the DML operation. + -- This approach is temporary and is related to [1], [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + local latest_space, err + if storage_result.replica_schema_version ~= netbox_schema_version then + local ok, reload_schema_err = schema.reload_schema(vshard_router) + if ok then + latest_space, err = utils.get_space(space_name, vshard_router, + opts.timeout, storage_result.replica_uuid) + if err ~= nil then + local warn_msg = "Failed to fetch space for latest schema actualization, metadata may be outdated: %s" + log.warn(warn_msg, err) + end + if latest_space == nil then + log.warn("Failed to find space for latest schema actualization, metadata may be outdated") + end + else + log.warn("Failed to reload schema, metadata may be outdated: %s", reload_schema_err) + end + end + if err == nil and latest_space ~= nil then + space = latest_space + end + + return space +end + +function utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts, + storages_info, netbox_schema_version) + -- Checking the relevance of the schema version is necessary + -- to prevent the irrelevant metadata of the DML operation. + -- This approach is temporary and is related to [1], [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + local latest_space, err + for _, storage_info in pairs(storages_info) do + if storage_info.replica_schema_version ~= netbox_schema_version then + local ok, reload_schema_err = schema.reload_schema(vshard_router) + if ok then + latest_space, err = utils.get_space(space_name, vshard_router, opts.timeout) + if err ~= nil then + local warn_msg = "Failed to fetch space for latest schema actualization, " .. + "metadata may be outdated: %s" + log.warn(warn_msg, err) + end + if latest_space == nil then + log.warn("Failed to find space for latest schema actualization, metadata may be outdated") + end + else + log.warn("Failed to reload schema, metadata may be outdated: %s", reload_schema_err) + end + if err == nil and latest_space ~= nil then + space = latest_space + end + break + end + end + + return space +end + local function append(lines, s, ...) table.insert(lines, string.format(s, ...)) end diff --git a/crud/delete.lua b/crud/delete.lua index 356fdddf..6796d358 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -22,6 +22,7 @@ local function delete_on_storage(space_name, key, field_names, opts) sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} @@ -46,6 +47,7 @@ local function delete_on_storage(space_name, key, field_names, opts) add_space_schema_hash = false, field_names = field_names, noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, }) end @@ -63,9 +65,10 @@ local function call_delete_on_router(vshard_router, space_name, key, opts) fields = '?table', vshard_router = '?string|table', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) - local space, err = utils.get_space(space_name, vshard_router, opts.timeout) + local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, DeleteError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end @@ -112,6 +115,7 @@ local function call_delete_on_router(vshard_router, space_name, key, opts) sharding_key_hash = sharding_key_hash, skip_sharding_hash_check = skip_sharding_hash_check, noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, } local call_opts = { @@ -145,6 +149,14 @@ local function call_delete_on_router(vshard_router, space_name, key, opts) local tuple = storage_result.res + if opts.fetch_latest_metadata == true then + -- This option is temporary and is related to [1], [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + space = utils.fetch_latest_metadata_when_single_storage(space, space_name, netbox_schema_version, + vshard_router, opts, storage_result) + end + return utils.format_result({tuple}, space, opts.fields) end @@ -184,6 +196,7 @@ function delete.call(space_name, key, opts) fields = '?table', vshard_router = '?string|table', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} diff --git a/crud/get.lua b/crud/get.lua index 46e1d871..93e14b96 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -21,6 +21,7 @@ local function get_on_storage(space_name, key, field_names, opts) sharding_key_hash = '?number', sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} @@ -44,6 +45,7 @@ local function get_on_storage(space_name, key, field_names, opts) return schema.wrap_box_space_func_result(space, 'get', {key}, { add_space_schema_hash = false, field_names = field_names, + fetch_latest_metadata = opts.fetch_latest_metadata, }) end @@ -63,9 +65,10 @@ local function call_get_on_router(vshard_router, space_name, key, opts) balance = '?boolean', mode = '?string', vshard_router = '?string|table', + fetch_latest_metadata = '?boolean', }) - local space, err = utils.get_space(space_name, vshard_router, opts.timeout) + local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, GetError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end @@ -111,6 +114,7 @@ local function call_get_on_router(vshard_router, space_name, key, opts) sharding_func_hash = bucket_id_data.sharding_func_hash, sharding_key_hash = sharding_key_hash, skip_sharding_hash_check = skip_sharding_hash_check, + fetch_latest_metadata = opts.fetch_latest_metadata, } local call_opts = { @@ -147,6 +151,14 @@ local function call_get_on_router(vshard_router, space_name, key, opts) tuple = nil end + if opts.fetch_latest_metadata == true then + -- This option is temporary and is related to [1], [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + space = utils.fetch_latest_metadata_when_single_storage(space, space_name, netbox_schema_version, + vshard_router, opts, storage_result) + end + return utils.format_result({tuple}, space, opts.fields) end @@ -191,6 +203,7 @@ function get.call(space_name, key, opts) balance = '?boolean', mode = '?string', vshard_router = '?string|table', + fetch_latest_metadata = '?boolean', }) opts = opts or {} diff --git a/crud/insert.lua b/crud/insert.lua index c17eae12..4b8f14dc 100644 --- a/crud/insert.lua +++ b/crud/insert.lua @@ -22,6 +22,7 @@ local function insert_on_storage(space_name, tuple, opts) sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} @@ -47,6 +48,7 @@ local function insert_on_storage(space_name, tuple, opts) add_space_schema_hash = opts.add_space_schema_hash, field_names = opts.fields, noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, }) end @@ -66,9 +68,10 @@ local function call_insert_on_router(vshard_router, space_name, original_tuple, vshard_router = '?string|table', skip_nullability_check_on_flatten = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) - local space, err = utils.get_space(space_name, vshard_router, opts.timeout) + local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, InsertError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end @@ -90,6 +93,7 @@ local function call_insert_on_router(vshard_router, space_name, original_tuple, sharding_key_hash = sharding_data.sharding_key_hash, skip_sharding_hash_check = sharding_data.skip_sharding_hash_check, noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, } local call_opts = { @@ -129,6 +133,14 @@ local function call_insert_on_router(vshard_router, space_name, original_tuple, local tuple = storage_result.res + if opts.fetch_latest_metadata == true then + -- This option is temporary and is related to [1], [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + space = utils.fetch_latest_metadata_when_single_storage(space, space_name, netbox_schema_version, + vshard_router, opts, storage_result) + end + return utils.format_result({tuple}, space, opts.fields) end @@ -169,6 +181,7 @@ function insert.tuple(space_name, tuple, opts) fields = '?table', vshard_router = '?string|table', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} @@ -208,6 +221,7 @@ function insert.object(space_name, obj, opts) vshard_router = '?string|table', skip_nullability_check_on_flatten = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} diff --git a/crud/insert_many.lua b/crud/insert_many.lua index f0cbb921..14232a79 100644 --- a/crud/insert_many.lua +++ b/crud/insert_many.lua @@ -28,6 +28,7 @@ local function insert_many_on_storage(space_name, tuples, opts) sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} @@ -48,6 +49,7 @@ local function insert_many_on_storage(space_name, tuples, opts) local inserted_tuples = {} local errs = {} + local replica_schema_version = nil box.begin() for i, tuple in ipairs(tuples) do @@ -58,7 +60,9 @@ local function insert_many_on_storage(space_name, tuples, opts) add_space_schema_hash = opts.add_space_schema_hash, field_names = opts.fields, noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, }) + replica_schema_version = insert_result.replica_schema_version if insert_result.err ~= nil then local err = { @@ -83,12 +87,12 @@ local function insert_many_on_storage(space_name, tuples, opts) batching_utils.rollback_on_error_msg, inserted_tuples) end - return nil, errs + return nil, errs, replica_schema_version end box.commit() - return inserted_tuples, errs + return inserted_tuples, errs, replica_schema_version end end @@ -103,17 +107,17 @@ local function insert_many_on_storage(space_name, tuples, opts) batching_utils.rollback_on_error_msg, inserted_tuples) end - return nil, errs + return nil, errs, replica_schema_version end box.commit() - return inserted_tuples, errs + return inserted_tuples, errs, replica_schema_version end box.commit() - return inserted_tuples + return inserted_tuples, nil, replica_schema_version end function insert_many.init() @@ -133,9 +137,10 @@ local function call_insert_many_on_router(vshard_router, space_name, original_tu vshard_router = '?string|table', skip_nullability_check_on_flatten = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) - local space, err = utils.get_space(space_name, vshard_router, opts.timeout) + local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, { InsertManyError:new("An error occurred during the operation: %s", err) @@ -153,6 +158,7 @@ local function call_insert_many_on_router(vshard_router, space_name, original_tu stop_on_error = opts.stop_on_error, rollback_on_error = opts.rollback_on_error, noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, } local iter, err = BatchInsertIterator:new({ @@ -167,7 +173,7 @@ local function call_insert_many_on_router(vshard_router, space_name, original_tu local postprocessor = BatchPostprocessor:new(vshard_router) - local rows, errs = call.map(vshard_router, INSERT_MANY_FUNC_NAME, nil, { + local rows, errs, storages_info = call.map(vshard_router, INSERT_MANY_FUNC_NAME, nil, { timeout = opts.timeout, mode = 'write', iter = iter, @@ -189,6 +195,14 @@ local function call_insert_many_on_router(vshard_router, space_name, original_tu return nil, errs end + if opts.fetch_latest_metadata == true then + -- This option is temporary and is related to [1], [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + space = utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts, + storages_info, netbox_schema_version) + end + local res, err = utils.format_result(rows, space, opts.fields) if err ~= nil then errs = errs or {} @@ -225,6 +239,7 @@ function insert_many.tuples(space_name, tuples, opts) rollback_on_error = '?boolean', vshard_router = '?string|table', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} @@ -264,6 +279,7 @@ function insert_many.objects(space_name, objs, opts) vshard_router = '?string|table', skip_nullability_check_on_flatten = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} diff --git a/crud/replace.lua b/crud/replace.lua index 62274b23..e375961a 100644 --- a/crud/replace.lua +++ b/crud/replace.lua @@ -22,6 +22,7 @@ local function replace_on_storage(space_name, tuple, opts) sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} @@ -47,6 +48,7 @@ local function replace_on_storage(space_name, tuple, opts) add_space_schema_hash = opts.add_space_schema_hash, field_names = opts.fields, noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, }) end @@ -66,9 +68,10 @@ local function call_replace_on_router(vshard_router, space_name, original_tuple, vshard_router = '?string|table', skip_nullability_check_on_flatten = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) - local space, err = utils.get_space(space_name, vshard_router, opts.timeout) + local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, ReplaceError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end @@ -90,6 +93,7 @@ local function call_replace_on_router(vshard_router, space_name, original_tuple, sharding_key_hash = sharding_data.sharding_key_hash, skip_sharding_hash_check = sharding_data.skip_sharding_hash_check, noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, } local call_opts = { @@ -128,6 +132,14 @@ local function call_replace_on_router(vshard_router, space_name, original_tuple, local tuple = storage_result.res + if opts.fetch_latest_metadata == true then + -- This option is temporary and is related to [1], [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + space = utils.fetch_latest_metadata_when_single_storage(space, space_name, netbox_schema_version, + vshard_router, opts, storage_result) + end + return utils.format_result({tuple}, space, opts.fields) end @@ -168,6 +180,7 @@ function replace.tuple(space_name, tuple, opts) fields = '?table', vshard_router = '?string|table', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} @@ -207,6 +220,7 @@ function replace.object(space_name, obj, opts) vshard_router = '?string|table', skip_nullability_check_on_flatten = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} diff --git a/crud/replace_many.lua b/crud/replace_many.lua index 2b65d564..eade9696 100644 --- a/crud/replace_many.lua +++ b/crud/replace_many.lua @@ -28,6 +28,7 @@ local function replace_many_on_storage(space_name, tuples, opts) sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} @@ -48,6 +49,7 @@ local function replace_many_on_storage(space_name, tuples, opts) local inserted_tuples = {} local errs = {} + local replica_schema_version = nil box.begin() for i, tuple in ipairs(tuples) do @@ -58,8 +60,11 @@ local function replace_many_on_storage(space_name, tuples, opts) add_space_schema_hash = opts.add_space_schema_hash, field_names = opts.fields, noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, }) + replica_schema_version = insert_result.replica_schema_version + table.insert(errs, err) if insert_result.err ~= nil then @@ -85,12 +90,12 @@ local function replace_many_on_storage(space_name, tuples, opts) batching_utils.rollback_on_error_msg, inserted_tuples) end - return nil, errs + return nil, errs, replica_schema_version end box.commit() - return inserted_tuples, errs + return inserted_tuples, errs, replica_schema_version end end @@ -105,17 +110,17 @@ local function replace_many_on_storage(space_name, tuples, opts) batching_utils.rollback_on_error_msg, inserted_tuples) end - return nil, errs + return nil, errs, replica_schema_version end box.commit() - return inserted_tuples, errs + return inserted_tuples, errs, replica_schema_version end box.commit() - return inserted_tuples + return inserted_tuples, nil, replica_schema_version end function replace_many.init() @@ -135,9 +140,10 @@ local function call_replace_many_on_router(vshard_router, space_name, original_t vshard_router = '?string|table', skip_nullability_check_on_flatten = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) - local space, err = utils.get_space(space_name, vshard_router, opts.timeout) + local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, { ReplaceManyError:new("An error occurred during the operation: %s", err) @@ -155,6 +161,7 @@ local function call_replace_many_on_router(vshard_router, space_name, original_t stop_on_error = opts.stop_on_error, rollback_on_error = opts.rollback_on_error, noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, } local iter, err = BatchInsertIterator:new({ @@ -169,7 +176,7 @@ local function call_replace_many_on_router(vshard_router, space_name, original_t local postprocessor = BatchPostprocessor:new(vshard_router) - local rows, errs = call.map(vshard_router, REPLACE_MANY_FUNC_NAME, nil, { + local rows, errs, storages_info = call.map(vshard_router, REPLACE_MANY_FUNC_NAME, nil, { timeout = opts.timeout, mode = 'write', iter = iter, @@ -191,6 +198,14 @@ local function call_replace_many_on_router(vshard_router, space_name, original_t return nil, errs end + if opts.fetch_latest_metadata == true then + -- This option is temporary and is related to [1], [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + space = utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts, + storages_info, netbox_schema_version) + end + local res, err = utils.format_result(rows, space, opts.fields) if err ~= nil then errs = errs or {} @@ -227,6 +242,7 @@ function replace_many.tuples(space_name, tuples, opts) rollback_on_error = '?boolean', vshard_router = '?string|table', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} @@ -266,6 +282,7 @@ function replace_many.objects(space_name, objs, opts) vshard_router = '?string|table', skip_nullability_check_on_flatten = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} diff --git a/crud/select.lua b/crud/select.lua index fd9bab63..74bffdfa 100644 --- a/crud/select.lua +++ b/crud/select.lua @@ -1,5 +1,6 @@ local errors = require('errors') +local stash = require('crud.common.stash') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local select_executor = require('crud.select.executor') @@ -11,20 +12,15 @@ local SelectError = errors.new_class('SelectError') local select_module +local select_module_compat_info = stash.get(stash.name.select_module_compat_info) local has_merger = (utils.tarantool_supports_external_merger() and package.search('tuple.merger')) or utils.tarantool_has_builtin_merger() if has_merger then select_module = require('crud.select.compat.select') + select_module_compat_info.has_merger = true else select_module = require('crud.select.compat.select_old') -end - -local function make_cursor(data) - local last_tuple = data[#data] - - return { - after_tuple = last_tuple, - } + select_module_compat_info.has_merger = false end function checkers.vshard_call_mode(p) @@ -43,16 +39,31 @@ local function select_on_storage(space_name, index_id, conditions, opts) sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', yield_every = '?number', + fetch_latest_metadata = '?boolean', }) + local cursor = {} + if opts.fetch_latest_metadata then + local replica_schema_version + if box.info.schema_version ~= nil then + replica_schema_version = box.info.schema_version + else + replica_schema_version = box.internal.schema_version() + end + cursor.storage_info = { + replica_uuid = box.info().uuid, + replica_schema_version = replica_schema_version, + } + end + local space = box.space[space_name] if space == nil then - return nil, SelectError:new("Space %q doesn't exist", space_name) + return cursor, SelectError:new("Space %q doesn't exist", space_name) end local index = space.index[index_id] if index == nil then - return nil, SelectError:new("Index with ID %s doesn't exist", index_id) + return cursor, SelectError:new("Index with ID %s doesn't exist", index_id) end local _, err = sharding.check_sharding_hash(space_name, @@ -69,7 +80,7 @@ local function select_on_storage(space_name, index_id, conditions, opts) scan_condition_num = opts.scan_condition_num, }) if err ~= nil then - return nil, SelectError:new("Failed to generate tuples filter: %s", err) + return cursor, SelectError:new("Failed to generate tuples filter: %s", err) end -- execute select @@ -81,14 +92,14 @@ local function select_on_storage(space_name, index_id, conditions, opts) yield_every = opts.yield_every, }) if err ~= nil then - return nil, SelectError:new("Failed to execute select: %s", err) + return cursor, SelectError:new("Failed to execute select: %s", err) end - local cursor if resp.tuples_fetched < opts.limit or opts.limit == 0 then - cursor = {is_end = true} + cursor.is_end = true else - cursor = make_cursor(resp.tuples) + local last_tuple = resp.tuples[#resp.tuples] + cursor.after_tuple = last_tuple end cursor.stats = { @@ -98,7 +109,18 @@ local function select_on_storage(space_name, index_id, conditions, opts) -- getting tuples with user defined fields (if `fields` option is specified) -- and fields that are needed for comparison on router (primary key + scan key) - return cursor, schema.filter_tuples_fields(resp.tuples, opts.field_names) + local filtered_tuples = schema.filter_tuples_fields(resp.tuples, opts.field_names) + + local result = {cursor, filtered_tuples} + + local select_module_compat_info = stash.get(stash.name.select_module_compat_info) + if not select_module_compat_info.has_merger then + if opts.fetch_latest_metadata then + result[3] = cursor.storage_info.replica_schema_version + end + end + + return unpack(result) end function select_module.init() diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index caca8b87..d3e3a5a2 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -1,6 +1,8 @@ local checks = require('checks') local errors = require('errors') +local log = require('log') +local stash = require('crud.common.stash') local const = require('crud.common.const') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') @@ -49,14 +51,13 @@ local function build_select_iterator(vshard_router, space_name, user_conditions, return nil, SelectError:new("Failed to parse conditions: %s", err) end - local space, err = utils.get_space(space_name, vshard_router) + local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router) if err ~= nil then return nil, SelectError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end if space == nil then return nil, SelectError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD end - local space_format = space:format() local sharding_key_data = {} local sharding_func_hash = nil @@ -171,6 +172,7 @@ local function build_select_iterator(vshard_router, space_name, user_conditions, sharding_key_hash = sharding_key_data.hash, skip_sharding_hash_check = skip_sharding_hash_check, yield_every = yield_every, + fetch_latest_metadata = opts.call_opts.fetch_latest_metadata, } local merger = Merger.new(vshard_router, replicasets_to_select, space, plan.index_id, @@ -179,18 +181,12 @@ local function build_select_iterator(vshard_router, space_name, user_conditions, {tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts} ) - -- filter space format by plan.field_names (user defined fields + primary key + scan key) - -- to pass it user as metadata - local filtered_space_format, err = utils.get_fields_format(space_format, plan.field_names) - if err ~= nil then - return nil, err - end - return { tuples_limit = tuples_limit, merger = merger, plan = plan, - space_format = filtered_space_format, + space = space, + netbox_schema_version = netbox_schema_version, } end @@ -249,11 +245,18 @@ function select_module.pairs(space_name, user_conditions, opts) error(string.format("Failed to generate iterator: %s", err)) end + -- filter space format by plan.field_names (user defined fields + primary key + scan key) + -- to pass it user as metadata + local filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names) + if err ~= nil then + return nil, err + end + local gen, param, state = iter.merger:pairs() if opts.use_tomap == true then gen, param, state = gen:map(function(tuple) local result - result, err = utils.unflatten(tuple, iter.space_format) + result, err = utils.unflatten(tuple, filtered_space_format) if err ~= nil then error(string.format("Failed to unflatten next object: %s", err)) end @@ -277,6 +280,7 @@ local function select_module_call_xc(vshard_router, space_name, user_conditions, force_map_call = '?boolean', fields = '?table', fullscan = '?boolean', + fetch_latest_metadata = '?boolean', mode = '?vshard_call_mode', prefer_replica = '?boolean', @@ -307,6 +311,7 @@ local function select_module_call_xc(vshard_router, space_name, user_conditions, prefer_replica = opts.prefer_replica, balance = opts.balance, timeout = opts.timeout, + fetch_latest_metadata = opts.fetch_latest_metadata, }, } @@ -336,8 +341,46 @@ local function select_module_call_xc(vshard_router, space_name, user_conditions, utils.reverse_inplace(tuples) end + if opts.fetch_latest_metadata then + -- Checking the relevance of the schema version is necessary + -- to prevent the irrelevant metadata of the DML operation. + -- This approach is temporary and is related to [1], [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + local storages_info = stash.get(stash.name.storages_info_on_select) + for storage_uuid, storage_info in pairs(storages_info) do + if storage_info.replica_schema_version ~= iter.netbox_schema_version then + local warn_msg = + "Replica with UUID %s actual schema version and net_box schema version has mismatch: " .. + "actual replica schema version: %s; net_box conn schema version to this replica: %s. " .. + "Trying to reload schema" + log.warn(warn_msg, storage_uuid, + storage_info.replica_schema_version, iter.netbox_schema_version) + local ok, reload_schema_err = schema.reload_schema(vshard_router) + if ok then + iter.space, err = utils.get_space(space_name, vshard_router, opts.timeout) + if err ~= nil then + local warn_msg = "Failed to fetch space for latest schema actualization, " .. + "metadata may be outdated: %s" + log.warn(warn_msg, err) + end + else + log.warn("Failed to reload schema, metadata may be outdated: %s", reload_schema_err) + end + break + end + end + end + + -- filter space format by plan.field_names (user defined fields + primary key + scan key) + -- to pass it user as metadata + local filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names) + if err ~= nil then + return nil, err + end + return { - metadata = table.copy(iter.space_format), + metadata = table.copy(filtered_space_format), rows = tuples, } end diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index 35a3d517..770dec37 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -1,8 +1,10 @@ local checks = require('checks') local errors = require('errors') local fun = require('fun') +local log = require('log') local call = require('crud.common.call') +local stash = require('crud.common.stash') local const = require('crud.common.const') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') @@ -53,7 +55,7 @@ local function select_iteration(space_name, plan, opts) space_name, plan.index_id, plan.conditions, storage_select_opts, } - local results, err = call.map(opts.vshard_router, common.SELECT_FUNC_NAME, storage_select_args, { + local results, err, storages_info = call.map(opts.vshard_router, common.SELECT_FUNC_NAME, storage_select_args, { replicasets = opts.replicasets, timeout = call_opts.timeout, mode = call_opts.mode or 'read', @@ -61,6 +63,11 @@ local function select_iteration(space_name, plan, opts) balance = call_opts.balance, }) + local storages_info_on_select_registry = stash.get(stash.name.storages_info_on_select) + for storage_uuid, storage_info in pairs(storages_info) do + storages_info_on_select_registry[storage_uuid] = storage_info + end + if err ~= nil then return nil, err end @@ -116,14 +123,13 @@ local function build_select_iterator(vshard_router, space_name, user_conditions, return nil, SelectError:new("Failed to parse conditions: %s", err) end - local space, err = utils.get_space(space_name, vshard_router) + local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router) if err ~= nil then return nil, SelectError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end if space == nil then return nil, SelectError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD end - local space_format = space:format() local sharding_hash = {} local sharding_key_as_index_obj = nil @@ -195,23 +201,17 @@ local function build_select_iterator(vshard_router, space_name, user_conditions, -- to update fieldno in each part in cmp_key_parts because storage result contains -- fields in order specified by field_names local tuples_comparator = select_comparators.gen_tuples_comparator( - cmp_operator, cmp_key_parts, plan.field_names, space_format + cmp_operator, cmp_key_parts, plan.field_names, space:format() ) local function comparator(node1, node2) return not tuples_comparator(node1.obj, node2.obj) end - -- filter space format by plan.field_names (user defined fields + primary key + scan key) - -- to pass it user as metadata - local filtered_space_format, err = utils.get_fields_format(space_format, plan.field_names) - if err ~= nil then - return nil, err - end - local iter = Iterator.new({ space_name = space_name, - space_format = filtered_space_format, + space = space, + netbox_schema_version = netbox_schema_version, iteration_func = select_iteration, comparator = comparator, @@ -289,6 +289,13 @@ function select_module.pairs(space_name, user_conditions, opts) tuples_limit = math.abs(tuples_limit) end + -- filter space format by plan.field_names (user defined fields + primary key + scan key) + -- to pass it user as metadata + local filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names) + if err ~= nil then + return nil, err + end + local gen = function(_, iter) local tuple, err = iter:get() if err ~= nil then @@ -305,7 +312,7 @@ function select_module.pairs(space_name, user_conditions, opts) local result = tuple if opts.use_tomap == true then - result, err = utils.unflatten(tuple, iter.space_format) + result, err = utils.unflatten(tuple, filtered_space_format) if err ~= nil then error(string.format("Failed to unflatten next object: %s", err)) end @@ -345,6 +352,7 @@ local function select_module_call_xc(vshard_router, space_name, user_conditions, prefer_replica = opts.prefer_replica, balance = opts.balance, timeout = opts.timeout, + fetch_latest_metadata = opts.fetch_latest_metadata, }, } @@ -379,8 +387,46 @@ local function select_module_call_xc(vshard_router, space_name, user_conditions, utils.reverse_inplace(tuples) end + if opts.fetch_latest_metadata then + -- Checking the relevance of the schema version is necessary + -- to prevent the irrelevant metadata of the DML operation. + -- This approach is temporary and is related to [1], [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + local storages_info = stash.get(stash.name.storages_info_on_select) + for storage_uuid, storage_info in pairs(storages_info) do + if storage_info.replica_schema_version ~= iter.netbox_schema_version then + local warn_msg = + "Replica with UUID %s actual schema version and net_box schema version has mismatch: " .. + "actual replica schema version: %s; net_box conn schema version to this replica: %s. " .. + "Trying to reload schema" + log.warn(warn_msg, storage_uuid, + storage_info.replica_schema_version, iter.netbox_schema_version) + local ok, reload_schema_err = schema.reload_schema(vshard_router) + if ok then + iter.space, err = utils.get_space(space_name, vshard_router, opts.timeout) + if err ~= nil then + local warn_msg = "Failed to fetch space for latest schema actualization, " .. + "metadata may be outdated: %s" + log.warn(warn_msg, err) + end + else + log.warn("Failed to reload schema, metadata may be outdated: %s", reload_schema_err) + end + break + end + end + end + + -- filter space format by plan.field_names (user defined fields + primary key + scan key) + -- to pass it user as metadata + local filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names) + if err ~= nil then + return nil, err + end + return { - metadata = table.copy(iter.space_format), + metadata = table.copy(filtered_space_format), rows = tuples, } end @@ -394,6 +440,7 @@ function select_module.call(space_name, user_conditions, opts) force_map_call = '?boolean', fields = '?table', fullscan = '?boolean', + fetch_latest_metadata = '?boolean', mode = '?vshard_call_mode', prefer_replica = '?boolean', diff --git a/crud/select/iterator.lua b/crud/select/iterator.lua index 89c0cd18..44539b8d 100644 --- a/crud/select/iterator.lua +++ b/crud/select/iterator.lua @@ -16,7 +16,8 @@ Iterator.__index = Iterator function Iterator.new(opts) dev_checks({ space_name = 'string', - space_format = 'table', + space = 'table', + netbox_schema_version = '?number', comparator = 'function', iteration_func = 'function', @@ -35,7 +36,8 @@ function Iterator.new(opts) local iter = { space_name = opts.space_name, - space_format = opts.space_format, + space = opts.space, + netbox_schema_version = opts.netbox_schema_version, iteration_func = opts.iteration_func, plan = opts.plan, diff --git a/crud/select/merger.lua b/crud/select/merger.lua index 4788d364..753be288 100644 --- a/crud/select/merger.lua +++ b/crud/select/merger.lua @@ -3,6 +3,7 @@ local errors = require('errors') local msgpack = require('msgpack') local ffi = require('ffi') local call = require('crud.common.call') +local stash = require('crud.common.stash') local sharding = require('crud.common.sharding') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') @@ -132,6 +133,13 @@ local function fetch_chunk(context, state) error(wrapped_err) end + if context.fetch_latest_metadata then + local storages_info_on_select_registry = stash.get(stash.name.storages_info_on_select) + storages_info_on_select_registry[cursor.storage_info.replica_uuid] = {} + storages_info_on_select_registry[cursor.storage_info.replica_uuid]. + replica_schema_version = cursor.storage_info.replica_schema_version + end + -- Extract stats info. -- Stats extracted with callback here and not passed -- outside to wrapper because fetch for pairs can be @@ -188,6 +196,7 @@ local function new(vshard_router, replicasets, space, index_id, func_name, func_ replicaset = replicaset, vshard_call_name = vshard_call_name, timeout = call_opts.timeout, + fetch_latest_metadata = call_opts.fetch_latest_metadata, space_name = space.name, vshard_router = vshard_router, } diff --git a/crud/update.lua b/crud/update.lua index ad9ac080..a52c89cc 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -22,6 +22,7 @@ local function update_on_storage(space_name, key, operations, field_names, opts) sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} @@ -46,6 +47,7 @@ local function update_on_storage(space_name, key, operations, field_names, opts) add_space_schema_hash = false, field_names = field_names, noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, }) if err ~= nil then @@ -85,9 +87,10 @@ local function call_update_on_router(vshard_router, space_name, key, user_operat fields = '?table', vshard_router = '?string|table', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) - local space, err = utils.get_space(space_name, vshard_router, opts.timeout) + local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, UpdateError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end @@ -144,6 +147,7 @@ local function call_update_on_router(vshard_router, space_name, key, user_operat sharding_key_hash = sharding_key_hash, skip_sharding_hash_check = skip_sharding_hash_check, noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, } local call_opts = { @@ -177,6 +181,14 @@ local function call_update_on_router(vshard_router, space_name, key, user_operat local tuple = storage_result.res + if opts.fetch_latest_metadata == true then + -- This option is temporary and is related to [1], [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + space = utils.fetch_latest_metadata_when_single_storage(space, space_name, netbox_schema_version, + vshard_router, opts, storage_result) + end + return utils.format_result({tuple}, space, opts.fields) end @@ -220,6 +232,7 @@ function update.call(space_name, key, user_operations, opts) fields = '?table', vshard_router = '?string|table', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} diff --git a/crud/upsert.lua b/crud/upsert.lua index 90556ab0..8ff0b407 100644 --- a/crud/upsert.lua +++ b/crud/upsert.lua @@ -21,6 +21,7 @@ local function upsert_on_storage(space_name, tuple, operations, opts) sharding_key_hash = '?number', sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} @@ -44,6 +45,7 @@ local function upsert_on_storage(space_name, tuple, operations, opts) -- is flattening object on router return schema.wrap_box_space_func_result(space, 'upsert', {tuple, operations}, { add_space_schema_hash = opts.add_space_schema_hash, + fetch_latest_metadata = opts.fetch_latest_metadata, }) end @@ -63,9 +65,10 @@ local function call_upsert_on_router(vshard_router, space_name, original_tuple, vshard_router = '?string|table', skip_nullability_check_on_flatten = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) - local space, err = utils.get_space(space_name, vshard_router, opts.timeout) + local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, UpsertError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end @@ -95,6 +98,7 @@ local function call_upsert_on_router(vshard_router, space_name, original_tuple, sharding_func_hash = sharding_data.sharding_func_hash, sharding_key_hash = sharding_data.sharding_key_hash, skip_sharding_hash_check = sharding_data.skip_sharding_hash_check, + fetch_latest_metadata = opts.fetch_latest_metadata, } local call_opts = { @@ -132,6 +136,14 @@ local function call_upsert_on_router(vshard_router, space_name, original_tuple, return nil end + if opts.fetch_latest_metadata == true then + -- This option is temporary and is related to [1], [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + space = utils.fetch_latest_metadata_when_single_storage(space, space_name, netbox_schema_version, + vshard_router, opts, storage_result) + end + -- upsert returns only metadata, without rows return utils.format_result({}, space, opts.fields) end @@ -177,6 +189,7 @@ function upsert.tuple(space_name, tuple, user_operations, opts) fields = '?table', vshard_router = '?string|table', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} @@ -219,6 +232,7 @@ function upsert.object(space_name, obj, user_operations, opts) fields = '?table', vshard_router = '?string|table', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} diff --git a/crud/upsert_many.lua b/crud/upsert_many.lua index b48ec0d0..b4e61ce1 100644 --- a/crud/upsert_many.lua +++ b/crud/upsert_many.lua @@ -27,6 +27,7 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts) sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} @@ -47,6 +48,7 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts) local processed_tuples = {} local errs = {} + local replica_schema_version = nil box.begin() for i, tuple in ipairs(tuples) do @@ -55,7 +57,9 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts) -- is flattening object on router local insert_result = schema.wrap_box_space_func_result(space, 'upsert', {tuple, operations[i]}, { add_space_schema_hash = opts.add_space_schema_hash, + fetch_latest_metadata = opts.fetch_latest_metadata, }) + replica_schema_version = insert_result.replica_schema_version if insert_result.err ~= nil then local err = { @@ -80,12 +84,12 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts) batching_utils.rollback_on_error_msg, processed_tuples) end - return nil, errs + return nil, errs, replica_schema_version end box.commit() - return nil, errs + return nil, errs, replica_schema_version end else table.insert(processed_tuples, tuple) @@ -100,17 +104,17 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts) batching_utils.rollback_on_error_msg, processed_tuples) end - return nil, errs + return nil, errs, replica_schema_version end box.commit() - return nil, errs + return nil, errs, replica_schema_version end box.commit() - return nil + return nil, nil, replica_schema_version end function upsert_many.init() @@ -130,9 +134,10 @@ local function call_upsert_many_on_router(vshard_router, space_name, original_tu vshard_router = '?string|table', skip_nullability_check_on_flatten = '?boolean', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) - local space, err = utils.get_space(space_name, vshard_router, opts.timeout) + local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, { UpsertManyError:new("An error occurred during the operation: %s", err) @@ -166,6 +171,7 @@ local function call_upsert_many_on_router(vshard_router, space_name, original_tu add_space_schema_hash = opts.add_space_schema_hash, stop_on_error = opts.stop_on_error, rollback_on_error = opts.rollback_on_error, + fetch_latest_metadata = opts.fetch_latest_metadata, } local iter, err = BatchUpsertIterator:new({ @@ -181,7 +187,7 @@ local function call_upsert_many_on_router(vshard_router, space_name, original_tu local postprocessor = BatchPostprocessor:new(vshard_router) - local _, errs = call.map(vshard_router, UPSERT_MANY_FUNC_NAME, nil, { + local _, errs, storages_info = call.map(vshard_router, UPSERT_MANY_FUNC_NAME, nil, { timeout = opts.timeout, mode = 'write', iter = iter, @@ -207,6 +213,14 @@ local function call_upsert_many_on_router(vshard_router, space_name, original_tu return nil, errs end + if opts.fetch_latest_metadata == true then + -- This option is temporary and is related to [1], [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + space = utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts, + storages_info, netbox_schema_version) + end + local res, err = utils.format_result(nil, space, opts.fields) if err ~= nil then errs = errs or {} @@ -244,6 +258,7 @@ function upsert_many.tuples(space_name, tuples_operation_data, opts) rollback_on_error = '?boolean', vshard_router = '?string|table', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} @@ -283,6 +298,7 @@ function upsert_many.objects(space_name, objs_operation_data, opts) rollback_on_error = '?boolean', vshard_router = '?string|table', noreturn = '?boolean', + fetch_latest_metadata = '?boolean', }) opts = opts or {} diff --git a/doc/dev/schema.md b/doc/dev/schema.md index 727c6465..7205861e 100644 --- a/doc/dev/schema.md +++ b/doc/dev/schema.md @@ -92,6 +92,9 @@ Retry with reload is triggered - ``bucket_id`` calculation has failed due to any reason; - if updating by field name is not supported natively and id resolve has failed; +- conditionally if the flag `fetch_latest_metadata` for DML operation that return metadata is used. + If there is a mismatch between the schema version on all involved storage + and the version schema in the net_box connection of the router, the schema reload will be triggered. - network operation had failed on storage, hash check is on and hashes mismatch. Let's talk a bit more about the last one. To enable hash check, the user diff --git a/test/entrypoint/srv_simple_operations.lua b/test/entrypoint/srv_simple_operations.lua index 579f22fc..a2a75af5 100755 --- a/test/entrypoint/srv_simple_operations.lua +++ b/test/entrypoint/srv_simple_operations.lua @@ -50,10 +50,37 @@ package.preload['customers-storage'] = function() if_not_exists = true, }) - rawset(_G, 'add_extra_field', function(name) - local new_format = box.space.developers:format() - table.insert(new_format, {name = name, type = 'any', is_nullable = true}) - box.space.developers:format(new_format) + rawset(_G, 'add_extra_field', function(space_name, field_name) + local space = box.space[space_name] + local new_format = space:format() + table.insert(new_format, {name = field_name, type = 'any', is_nullable = true}) + space:format(new_format) + end) + + rawset(_G, 'create_space_for_gh_326_cases', function() + local countries_space = box.schema.space.create('countries', { + format = { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'population', type = 'unsigned'}, + }, + if_not_exists = true, + engine = os.getenv('ENGINE') or 'memtx', + }) + countries_space:create_index('id', { + parts = { {field = 'id'} }, + if_not_exists = true, + }) + countries_space:create_index('bucket_id', { + parts = { {field = 'bucket_id'} }, + unique = false, + if_not_exists = true, + }) + end) + + rawset(_G, 'drop_space_for_gh_326_cases', function() + box.space['countries']:drop() end) -- Space with huge amount of nullable fields diff --git a/test/integration/simple_operations_test.lua b/test/integration/simple_operations_test.lua index 13fd2a46..995780c9 100644 --- a/test/integration/simple_operations_test.lua +++ b/test/integration/simple_operations_test.lua @@ -411,7 +411,7 @@ pgroup.test_intermediate_nullable_fields_update = function(g) helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) for i = 1, 12 do - server.net_box:call('add_extra_field', {'extra_' .. tostring(i)}) + server.net_box:call('add_extra_field', {'developers', 'extra_' .. tostring(i)}) end end) @@ -490,6 +490,213 @@ pgroup.test_intermediate_nullable_fields_update = function(g) }) end +local gh_236_metadata_field_name = { + crud_update = 'extra_gh_236_update', + crud_insert = 'extra_gh_236_insert', + crud_insert_object = 'extra_gh_236_insert_object', + crud_insert_many = 'extra_gh_236_insert_many', + crud_insert_object_many = 'extra_gh_236_insert_object_many', + crud_replace = 'extra_gh_236_replace', + crud_replace_object = 'extra_gh_236_replace_object', + crud_replace_many = 'extra_gh_236_replace_many', + crud_replace_object_many = 'extra_gh_236_replace_object_many', + crud_get = 'extra_gh_236_get', + crud_delete = 'extra_gh_236_delete', + crud_upsert = 'extra_gh_236_upsert', + crud_upsert_object = 'extra_gh_236_upsert_object', + crud_upsert_many = 'extra_gh_236_upsert_many', + crud_upsert_object_many = 'extra_gh_236_upsert_object_many', + crud_max = 'extra_gh_236_max', + crud_min = 'extra_gh_236_min', + crud_select = 'extra_gh_236_select', +} + +local gh_236_cases = { + crud_update = { + operation_name = 'crud.update', + input = {'countries', 3, + {{'=', gh_236_metadata_field_name.crud_update, 'testing'}}, + {fetch_latest_metadata = true}}, + need_pre_insert_data = true, + }, + crud_insert = { + operation_name = 'crud.insert', + input = {'countries', {3, box.NULL, 'vatican', 825}, + {fetch_latest_metadata = true}}, + need_pre_insert_data = false, + }, + crud_insert_object = { + operation_name = 'crud.insert_object', + input = {'countries', + {id=3, bucket_id = box.NULL, name = 'vatican', population = 825}, + {fetch_latest_metadata = true}}, + need_pre_insert_data = false, + }, + crud_insert_many = { + operation_name = 'crud.insert_many', + input = {'countries', {{3, box.NULL, 'vatican', 825}}, + {fetch_latest_metadata = true}}, + need_pre_insert_data = false, + }, + crud_insert_object_many = { + operation_name = 'crud.insert_object_many', + input = {'countries', + {{id=3, bucket_id = box.NULL, name = 'vatican', population = 825}}, + {fetch_latest_metadata = true}}, + need_pre_insert_data = false, + }, + crud_replace = { + operation_name = 'crud.replace', + input = {'countries', {3, box.NULL, 'vatican', 825}, + {fetch_latest_metadata = true}}, + need_pre_insert_data = true, + }, + crud_replace_object = { + operation_name = 'crud.replace_object', + input = {'countries', + {id=3, bucket_id = box.NULL, name = 'vatican', population = 825}, + {fetch_latest_metadata = true}}, + need_pre_insert_data = true, + }, + crud_replace_many = { + operation_name = 'crud.replace_many', + input = {'countries', {{3, box.NULL, 'vatican', 825}}, + {fetch_latest_metadata = true}}, + need_pre_insert_data = true, + }, + crud_replace_object_many = { + operation_name = 'crud.replace_object_many', + input = {'countries', + {{id=3, bucket_id = box.NULL, name = 'vatican', population = 825}}, + {fetch_latest_metadata = true}}, + need_pre_insert_data = true, + }, + crud_get = { + operation_name = 'crud.get', + input = {'countries', 3, {fetch_latest_metadata = true}}, + need_pre_insert_data = true, + }, + crud_delete = { + operation_name = 'crud.delete', + input = {'countries', 3, {fetch_latest_metadata = true}}, + need_pre_insert_data = true, + }, + crud_upsert = { + operation_name = 'crud.upsert', + input = {'countries', + {3, box.NULL, 'vatican', 825}, {{'+', 'population', 1}}, + {fetch_latest_metadata = true}}, + need_pre_insert_data = false, + }, + crud_upsert_object = { + operation_name = 'crud.upsert_object', + input = {'countries', + {id=3, bucket_id = box.NULL, name = 'vatican', population = 825}, + {{'+', 'population', 1}}, + {fetch_latest_metadata = true}}, + need_pre_insert_data = false, + }, + crud_upsert_many = { + operation_name = 'crud.upsert_many', + input = {'countries', + { + {{3, box.NULL, 'vatican', 825}, + {{'+', 'population', 1}}} + }, + {fetch_latest_metadata = true}}, + need_pre_insert_data = false, + }, + crud_upsert_object_many = { + operation_name = 'crud.upsert_object_many', + input = {'countries', + { + { + {id=3, bucket_id = box.NULL, name = 'vatican', population = 825}, + {{'+', 'population', 1}} + } + }, + {fetch_latest_metadata = true}}, + need_pre_insert_data = false, + }, + crud_max = { + operation_name = 'crud.max', + input = {'countries', 'id', {fetch_latest_metadata = true}}, + need_pre_insert_data = true, + }, + crud_min = { + operation_name = 'crud.min', + input = {'countries', 'id', {fetch_latest_metadata = true}}, + need_pre_insert_data = true, + }, + crud_select = { + operation_name = 'crud.select', + input = {'countries', {}, {fetch_latest_metadata = true}}, + need_pre_insert_data = true, + }, +} + +local gh_236_case_prepare = function(g) + helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, + function(server) + server.net_box:call('create_space_for_gh_326_cases') + end) +end + +local gh_236_case_after = function(g) + helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, + function(server) + server.net_box:call('drop_space_for_gh_326_cases') + end) +end + +for case_name, case in pairs(gh_236_cases) do + -- These tests check the relevance of metadata when crud DML operation working. + -- The reproduction of the bug in the issue [1] was detected when working + -- with bucket_id 2804 (id = 3 at countries space), which base on another + -- storage relative to bucket 477 (id = 1 at countries space). + -- Related to the issue [2]. + -- [1] https://github.com/tarantool/crud/issues/236 + -- [2] https://github.com/tarantool/crud/issues/361 + + local test_name = ('test_gh_236_dml_operation_return_actual_metadata_%s'):format(case_name) + + pgroup.before_test(test_name, gh_236_case_prepare) + pgroup[test_name] = function(g) + local result, err + + result, err = g.cluster.main_server.net_box:call( + 'crud.delete', {'countries', 3}) + t.assert_not_equals(result, nil) + t.assert_equals(err, nil) + + if case.need_pre_insert_data then + result, err = g.cluster.main_server.net_box:call( + 'crud.insert', {'countries', {3, box.NULL, 'vatican', 825}}) + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + end + + helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, + function(server) + server.net_box:call('add_extra_field', + {'countries', gh_236_metadata_field_name[case_name]}) + end) + + result, err = g.cluster.main_server.net_box:call(case.operation_name, case.input) + t.assert_equals(err, nil) + local found_extra_field = false + for _, v in pairs(result.metadata) do + if v.name == gh_236_metadata_field_name[case_name] then + found_extra_field = true + end + end + t.assert_equals(found_extra_field, true, + string.format('cannot find the expected metadata for case: %s', + case.operation_name)) + end + pgroup.after_test(test_name, gh_236_case_after) +end + pgroup.test_object_with_nullable_fields = function(g) -- Insert local result, err = g.cluster.main_server.net_box:call(