Skip to content

Commit

Permalink
schema: fix stale schema for metadata generation
Browse files Browse the repository at this point in the history
Corrects using of an stale schema to generate metadata during
`crud.update`, `crud.insert`, `crud.insert_*`, `crud.replace`,
`crud.replace_*`, `crud.upsert`, `crud.upsert_*`, `crud.delete`,
`crud.max`, `crud.min`, `crud.select` and `crud.get` working.

If the implemented `fetch_latest_metadata` option is used, it is guaranteed
that the metadata will be up-to-date. Before receiving the space format,
a mismatch check will be performed between the scheme version on all
involved storage and the scheme version in the net_box connection of
the router. In case of mismatch, the schema reload will be triggered.

Closes #236
  • Loading branch information
GRISHNOV committed Jun 1, 2023
1 parent 781ef16 commit 1a895f8
Show file tree
Hide file tree
Showing 25 changed files with 753 additions and 88 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
`upsert`, `upsert_object`, `upsert_many`, `upsert_object_many`,
`update`, `delete`. (#267).

### Fixed
* Crud DML operations was returning stale schema for metadata generation (#236).

## [1.1.1] - 24-03-23

### Changed
Expand Down
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ where:
the sole responsibility of the developer
* `noreturn` (`?boolean`) - suppress successfully processed tuple
(first return value is `nil`). `false` by default
* `fetch_latest_metadata` (`?boolean`) - guarantees the
up-to-date metadata (space format) in first return value, otherwise
it may not take into account the latest migration of the data format.
Performance overhead is up to 15%. `false` by default
Returns metadata and array contains one inserted row, error.
Expand Down Expand Up @@ -303,6 +307,10 @@ where:
the sole responsibility of the developer
* `noreturn` (`?boolean`) - suppress successfully processed tuples
(first return value is `nil`). `false` by default
* `fetch_latest_metadata` (`?boolean`) - guarantees the
up-to-date metadata (space format) in first return value, otherwise
it may not take into account the latest migration of the data format.
Performance overhead is up to 15%. `false` by default
Returns metadata and array with inserted rows, array of errors.
Each error object can contain field `operation_data`.
Expand Down Expand Up @@ -441,6 +449,10 @@ where:
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster
* `fetch_latest_metadata` (`?boolean`) - guarantees the
up-to-date metadata (space format) in first return value, otherwise
it may not take into account the latest migration of the data format.
Performance overhead is up to 15%. `false` by default
Returns metadata and array contains one row, error.
Expand Down Expand Up @@ -480,6 +492,10 @@ where:
a part of the default vshard cluster
* `noreturn` (`?boolean`) - suppress successfully processed tuple
(first return value is `nil`). `false` by default
* `fetch_latest_metadata` (`?boolean`) - guarantees the
up-to-date metadata (space format) in first return value, otherwise
it may not take into account the latest migration of the data format.
Performance overhead is up to 15%. `false` by default
Returns metadata and array contains one updated row, error.
Expand Down Expand Up @@ -518,6 +534,10 @@ where:
a part of the default vshard cluster
* `noreturn` (`?boolean`) - suppress successfully processed tuple
(first return value is `nil`). `false` by default
* `fetch_latest_metadata` (`?boolean`) - guarantees the
up-to-date metadata (space format) in first return value, otherwise
it may not take into account the latest migration of the data format.
Performance overhead is up to 15%. `false` by default
Returns metadata and array contains one deleted row (empty for vinyl), error.
Expand Down Expand Up @@ -567,6 +587,10 @@ where:
the sole responsibility of the developer
* `noreturn` (`?boolean`) - suppress successfully processed tuple
(first return value is `nil`). `false` by default
* `fetch_latest_metadata` (`?boolean`) - guarantees the
up-to-date metadata (space format) in first return value, otherwise
it may not take into account the latest migration of the data format.
Performance overhead is up to 15%. `false` by default
Returns inserted or replaced rows and metadata or nil with error.
Expand Down Expand Up @@ -634,6 +658,10 @@ where:
the sole responsibility of the developer
* `noreturn` (`?boolean`) - suppress successfully processed tuples
(first return value is `nil`). `false` by default
* `fetch_latest_metadata` (`?boolean`) - guarantees the
up-to-date metadata (space format) in first return value, otherwise
it may not take into account the latest migration of the data format.
Performance overhead is up to 15%. `false` by default
Returns metadata and array with inserted/replaced rows, array of errors.
Each error object can contain field `operation_data`.
Expand Down Expand Up @@ -772,6 +800,10 @@ where:
a part of the default vshard cluster
* `noreturn` (`?boolean`) - suppress successfully processed tuple
(first return value is `nil`). `false` by default
* `fetch_latest_metadata` (`?boolean`) - guarantees the
up-to-date metadata (space format) in first return value, otherwise
it may not take into account the latest migration of the data format.
Performance overhead is up to 15%. `false` by default
Returns metadata and empty array of rows or nil, error.
Expand Down Expand Up @@ -835,6 +867,10 @@ where:
a part of the default vshard cluster
* `noreturn` (`?boolean`) - suppress successfully processed tuples
(first return value is `nil`). `false` by default
* `fetch_latest_metadata` (`?boolean`) - guarantees the
up-to-date metadata (space format) in first return value, otherwise
it may not take into account the latest migration of the data format.
Performance overhead is up to 15%. `false` by default
Returns metadata and array of errors.
Each error object can contain field `operation_data`.
Expand Down Expand Up @@ -977,6 +1013,10 @@ where:
a part of the default vshard cluster
* `yield_every` (`?number`) - number of tuples processed on storage to yield after,
`yield_every` should be > 0, default value is 1000
* `fetch_latest_metadata` (`?boolean`) - guarantees the
up-to-date metadata (space format) in first return value, otherwise
it may not take into account the latest migration of the data format.
Performance overhead is up to 15%. `false` by default
Returns metadata and array of rows, error.
Expand Down
20 changes: 15 additions & 5 deletions crud/borders.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ local borders = {}
local STAT_FUNC_NAME = '_crud.get_border_on_storage'


local function get_border_on_storage(border_name, space_name, index_id, field_names)
dev_checks('string', 'string', 'number', '?table')
local function get_border_on_storage(border_name, space_name, index_id, field_names, fetch_latest_metadata)
dev_checks('string', 'string', 'number', '?table', '?boolean')

assert(border_name == 'min' or border_name == 'max')

Expand All @@ -38,6 +38,7 @@ local function get_border_on_storage(border_name, space_name, index_id, field_na
return schema.wrap_func_result(space, get_index_border, {index}, {
add_space_schema_hash = true,
field_names = field_names,
fetch_latest_metadata = fetch_latest_metadata,
})
end

Expand Down Expand Up @@ -71,9 +72,10 @@ local function call_get_border_on_router(vshard_router, border_name, space_name,
timeout = '?number',
fields = '?table',
vshard_router = '?string|table',
fetch_latest_metadata = '?boolean',
})

local space, err = utils.get_space(space_name, vshard_router, opts.timeout)
local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
if err ~= nil then
return nil, BorderError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
end
Expand Down Expand Up @@ -108,9 +110,9 @@ local function call_get_border_on_router(vshard_router, border_name, space_name,
replicasets = replicasets,
timeout = opts.timeout,
}
local results, err = call.map(vshard_router,
local results, err, storages_info = call.map(vshard_router,
STAT_FUNC_NAME,
{border_name, space_name, index.id, field_names},
{border_name, space_name, index.id, field_names, opts.fetch_latest_metadata},
call_opts
)

Expand Down Expand Up @@ -154,6 +156,14 @@ local function call_get_border_on_router(vshard_router, border_name, space_name,
end
end

if opts.fetch_latest_metadata == true then
-- This option is temporary and is related to [1], [2].
-- [1] https://github.com/tarantool/crud/issues/236
-- [2] https://github.com/tarantool/crud/issues/361
space = utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts,
storages_info, netbox_schema_version)
end

local result = utils.format_result({res_tuple}, space, field_names)

if opts.fields ~= nil then
Expand Down
10 changes: 9 additions & 1 deletion crud/common/map_call_cases/base_postprocessor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ function BasePostprocessor:new(vshard_router)
early_exit = false,
errs = nil,
vshard_router = vshard_router,
storage_info = {},
}

setmetatable(postprocessor, self)
Expand Down Expand Up @@ -52,6 +53,12 @@ function BasePostprocessor:collect(result_info, err_info)
wrapper_args = '?table',
})

if result_info.value ~= nil and type(result_info.value[1]) == 'table' then
self.storage_info[result_info.key] = {}
local replica_schema_version = result_info.value[1].replica_schema_version
self.storage_info[result_info.key].replica_schema_version = replica_schema_version
end

local err = err_info.err
if err == nil and result_info.value[1] == nil then
err = result_info.value[2]
Expand All @@ -78,8 +85,9 @@ end
--
-- @return[1] table results
-- @return[2] table errs
-- @return[3] table storage_info
function BasePostprocessor:get()
return self.results, self.errs
return self.results, self.errs, self.storage_info
end

return BasePostprocessor
5 changes: 5 additions & 0 deletions crud/common/map_call_cases/batch_postprocessor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ function BatchPostprocessor:collect(result_info, err_info)
wrapper_args = '?table',
})

if result_info.value ~= nil then
self.storage_info[result_info.key] = {}
self.storage_info[result_info.key].replica_schema_version = result_info.value[3]
end

local errs = {err_info.err}
if err_info.err == nil then
errs = result_info.value[2]
Expand Down
21 changes: 18 additions & 3 deletions crud/common/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ end
local reload_in_progress = {}
local reload_schema_cond = {}

local function reload_schema(vshard_router)
function schema.reload_schema(vshard_router)
local replicasets = vshard_router:routeall()
local vshard_router_name = vshard_router.name

Expand Down Expand Up @@ -96,14 +96,16 @@ function schema.wrap_func_reload(vshard_router, func, ...)
break
end

local ok, reload_schema_err = reload_schema(vshard_router)
local ok, reload_schema_err = schema.reload_schema(vshard_router)
if not ok then
log.warn("Failed to reload schema: %s", reload_schema_err)
break
end

i = i + 1
if i > const.RELOAD_RETRIES_NUM then
local warn_msg = "Number of attempts to reload schema has been ended: %s"
log.warn(warn_msg, const.RELOAD_RETRIES_NUM)
break
end
end
Expand Down Expand Up @@ -221,6 +223,17 @@ function schema.wrap_func_result(space, func, args, opts)
end
end

if opts.fetch_latest_metadata == true then
local replica_schema_version
if box.info.schema_version ~= nil then
replica_schema_version = box.info.schema_version
else
replica_schema_version = box.internal.schema_version()
end
result.replica_uuid = box.info().uuid
result.replica_schema_version = replica_schema_version
end

return result
end

Expand All @@ -235,7 +248,9 @@ function schema.wrap_box_space_func_result(space, box_space_func_name, box_space
return space[box_space_func_name](space, unpack(box_space_func_args))
end

return schema.wrap_func_result(space, func, {space, box_space_func_name, box_space_func_args}, opts)
local res, err = schema.wrap_func_result(space, func, {space, box_space_func_name, box_space_func_args}, opts)

return res, err
end

-- schema.result_needs_reload checks that schema reload can
Expand Down
8 changes: 8 additions & 0 deletions crud/common/stash.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,20 @@ local stash = {}
-- @tfield string stats_metrics_registry
-- Stash for metrics rocks statistics registry.
--
-- @tfield string storages_info_on_select
-- Stash for storages info during select working registry.
--
-- @tfield string select_module_compat_info
-- Stash for select compatability version registry.
--
stash.name = {
cfg = '__crud_cfg',
stats_internal = '__crud_stats_internal',
stats_local_registry = '__crud_stats_local_registry',
stats_metrics_registry = '__crud_stats_metrics_registry',
ddl_triggers = '__crud_ddl_spaces_triggers',
storages_info_on_select = '__storages_info_on_select',
select_module_compat_info = '__select_module_compat_info',
}

--- Setup Tarantool Cartridge reload.
Expand Down
Loading

0 comments on commit 1a895f8

Please sign in to comment.