From a479871fd0e793990aab7e0b0ca86ae59277e5ce Mon Sep 17 00:00:00 2001 From: AnaNek Date: Fri, 22 Jul 2022 14:24:01 +0300 Subject: [PATCH] sharding func: fix specifying vshard sharding funcs Starting from 0.11.0 user can specify sharding func to calculate bucket_id with sharding func definition as a part of DDL schema or insert manually to the space `_ddl_sharding_func`. Right now ddl fails with setting schema with vshard sharding function. But even if this bug is fixed, there is also a bug on CRUD side. Inserting manually to the space `_ddl_sharding_func` showed that CRUD search vshard sharding func in `_G` but this approach doesn't work with vshard case. This patch allows to specify `vshard` sharding func inserting manually to the space `_ddl_sharding_func`. Closes #314 --- CHANGELOG.md | 2 + crud/common/sharding/sharding_func.lua | 26 ++++- deps.sh | 2 +- test/entrypoint/srv_ddl.lua | 15 ++- test/integration/ddl_sharding_func_test.lua | 120 +++++++++++++++++++- test/integration/ddl_sharding_key_test.lua | 9 ++ 6 files changed, 168 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f4e6914..e8d13e88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Fixed +- Fix specifying `vshard` sharding funcs (#314). + ## [0.12.1] - 21-07-22 ### Fixed diff --git a/crud/common/sharding/sharding_func.lua b/crud/common/sharding/sharding_func.lua index 6a503b45..82a6cf4c 100644 --- a/crud/common/sharding/sharding_func.lua +++ b/crud/common/sharding/sharding_func.lua @@ -9,6 +9,10 @@ local ShardingFuncError = errors.new_class('ShardingFuncError', {capture_stack local sharding_func_module = {} +local sharding_module_names = { + ['vshard'] = true, +} + local function is_callable(object) if type(object) == 'function' then return true @@ -43,13 +47,33 @@ end local function get_function_from_G(func_name) local chunks = string.split(func_name, '.') local sharding_func = _G + local sharding_module = false + local ok + + if sharding_module_names[chunks[1]] then + ok, sharding_func = pcall(require, chunks[1]) + if not ok then + return nil + end + + sharding_module = true + table.remove(chunks, 1) + end -- check is the each chunk an identifier for _, chunk in pairs(chunks) do if not utils.check_name_isident(chunk) or sharding_func == nil then return nil end - sharding_func = rawget(sharding_func, chunk) + + -- `vshard` store sharding functions in metatable, + -- this metatable is common for all `vshard` routers. + -- That's why for `vshard` case we can't use rawget. + if sharding_module then + sharding_func = sharding_func[chunk] + else + sharding_func = rawget(sharding_func, chunk) + end end return sharding_func diff --git a/deps.sh b/deps.sh index d5fd5833..6f07a917 100755 --- a/deps.sh +++ b/deps.sh @@ -27,7 +27,7 @@ rm "${LUACOV_COVERALLS_ROCKSPEC_FILE}" rmdir "${TMPDIR}" tarantoolctl rocks install cartridge 2.7.4 -tarantoolctl rocks install ddl 1.6.0 +tarantoolctl rocks install ddl 1.6.2 tarantoolctl rocks install migrations 0.4.2 tarantoolctl rocks make diff --git a/test/entrypoint/srv_ddl.lua b/test/entrypoint/srv_ddl.lua index ab8fc5ce..6e7d9170 100755 --- a/test/entrypoint/srv_ddl.lua +++ b/test/entrypoint/srv_ddl.lua @@ -161,6 +161,14 @@ package.preload['customers-storage'] = function() local customers_G_func_schema = table.deepcopy(customers_id_key_schema) customers_G_func_schema.sharding_func = 'some_module.sharding_func' + local customers_empty_sharding_func_schema = table.deepcopy(customers_id_key_schema) + + local customers_vshard_mpcrc32_schema = table.deepcopy(customers_id_key_schema) + customers_vshard_mpcrc32_schema.sharding_func = 'vshard.router.bucket_id_mpcrc32' + + local customers_vshard_strcrc32_schema = table.deepcopy(customers_id_key_schema) + customers_vshard_strcrc32_schema.sharding_func = 'vshard.router.bucket_id_strcrc32' + local schema = { spaces = { customers = customers_id_schema, @@ -173,6 +181,9 @@ package.preload['customers-storage'] = function() customers_name_age_key_three_fields_index = customers_name_age_key_three_fields_index_schema, customers_G_func = customers_G_func_schema, customers_body_func = customers_body_func_schema, + customers_empty_sharding_func = customers_empty_sharding_func_schema, + customers_vshard_mpcrc32 = customers_vshard_mpcrc32_schema, + customers_vshard_strcrc32 = customers_vshard_strcrc32_schema, } } @@ -188,7 +199,9 @@ package.preload['customers-storage'] = function() box.space['_ddl_sharding_key']:update(space_name, {{'=', fieldno_sharding_key, sharding_key_def}}) end) rawset(_G, 'set_sharding_func', function(space_name, fieldno_sharding_func, sharding_func_def) - box.space['_ddl_sharding_func']:update(space_name, {{'=', fieldno_sharding_func, sharding_func_def}}) + local record = {space_name, box.NULL, box.NULL} + record[fieldno_sharding_func] = sharding_func_def + box.space['_ddl_sharding_func']:replace(record) end) end, } diff --git a/test/integration/ddl_sharding_func_test.lua b/test/integration/ddl_sharding_func_test.lua index a3899077..b43e7fa6 100644 --- a/test/integration/ddl_sharding_func_test.lua +++ b/test/integration/ddl_sharding_func_test.lua @@ -21,6 +21,11 @@ local cache_group = t.group('ddl_sharding_func_cache', { {engine = 'vinyl'}, }) +local vshard_group = t.group('ddl_vshard_sharding_func', { + {engine = 'memtx'}, + {engine = 'vinyl'}, +}) + pgroup.before_all(function(g) g.cluster = helpers.Cluster:new({ datadir = fio.tempdir(), @@ -77,6 +82,35 @@ cache_group.before_each(function(g) helpers.truncate_space_on_cluster(g.cluster, 'customers_body_func') end) +vshard_group.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_ddl'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + env = { + ['ENGINE'] = g.params.engine, + }, + }) + g.cluster:start() + local result, err = g.cluster.main_server.net_box:eval([[ + local ddl = require('ddl') + + local ok, err = ddl.get_schema() + return ok, err + ]]) + t.assert_equals(type(result), 'table') + t.assert_equals(err, nil) +end) + +vshard_group.after_all(function(g) helpers.stop_cluster(g.cluster) end) + +vshard_group.before_each(function(g) + helpers.truncate_space_on_cluster(g.cluster, 'customers_vshard_mpcrc32') + helpers.truncate_space_on_cluster(g.cluster, 'customers_vshard_strcrc32') + helpers.truncate_space_on_cluster(g.cluster, 'customers_empty_sharding_func') +end) + pgroup.test_insert_object = function(g) local result, err = g.cluster.main_server.net_box:call( 'crud.insert_object', {g.params.space_name, {id = 158, name = 'Augustus', age = 48}}) @@ -703,7 +737,7 @@ cache_group.test_update_cache_with_incorrect_func = function(g) -- records for all spaces exist local cache_size = helpers.get_sharding_func_cache_size(g.cluster) - t.assert_equals(cache_size, 2) + t.assert_equals(cache_size, 4) -- no error just warning local space_name = 'customers_G_func' @@ -719,7 +753,7 @@ cache_group.test_update_cache_with_incorrect_func = function(g) -- cache['customers_G_func'] == nil (space with incorrect func) -- other records for correct spaces exist in cache cache_size = helpers.get_sharding_func_cache_size(g.cluster) - t.assert_equals(cache_size, 1) + t.assert_equals(cache_size, 3) -- get data from cache for space with incorrect sharding func local space_name = 'customers_G_func' @@ -736,7 +770,7 @@ cache_group.test_update_cache_with_incorrect_func = function(g) -- cache['customers_G_func'] == nil (space with incorrect func) -- other records for correct spaces exist in cache cache_size = helpers.get_sharding_func_cache_size(g.cluster) - t.assert_equals(cache_size, 1) + t.assert_equals(cache_size, 3) end @@ -938,3 +972,83 @@ pgroup.test_gh_278_count_with_explicit_bucket_id_and_ddl = function(g) t.assert_is_not(obj, nil) t.assert_equals(obj, 1) end + +local vshard_cases = { + mpcrc32_not_depends_on_ddl = { + set_sharding_func_to_ddl_space = true, + space_name = 'customers_empty_sharding_func', + sharding_func_name = 'vshard.router.bucket_id_mpcrc32', + bucket_id = 1614, + srv_with_data = 's1-master', + srv_without_data = 's2-master', + }, + strcrc32_not_depends_on_ddl = { + set_sharding_func_to_ddl_space = true, + space_name = 'customers_empty_sharding_func', + sharding_func_name = 'vshard.router.bucket_id_strcrc32', + bucket_id = 477, + srv_with_data = 's2-master', + srv_without_data = 's1-master', + }, + mpcrc32_depends_on_ddl = { + space_name = 'customers_vshard_mpcrc32', + sharding_func_name = 'vshard.router.bucket_id_mpcrc32', + bucket_id = 1614, + srv_with_data = 's1-master', + srv_without_data = 's2-master', + }, + strcrc32_depends_on_ddl = { + space_name = 'customers_vshard_strcrc32', + sharding_func_name = 'vshard.router.bucket_id_strcrc32', + bucket_id = 477, + srv_with_data = 's2-master', + srv_without_data = 's1-master', + } +} + +for name, case in pairs(vshard_cases) do + local test_name = ('test_vshard_%s'):format(name) + + vshard_group[test_name] = function(g) + local space_name = case.space_name + + if case.set_sharding_func_to_ddl_space then + local fieldno_sharding_func_name = 2 + + helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) + server.net_box:call('set_sharding_func', + {space_name, fieldno_sharding_func_name, case.sharding_func_name}) + end) + + local record_exist, err = helpers.update_sharding_func_cache(g.cluster, space_name) + t.assert_equals(err, nil) + t.assert_equals(record_exist, true) + end + + -- Insert a tuple. + local result, err = g.cluster.main_server.net_box:call( + 'crud.insert', {space_name, {1, box.NULL, 'Ivan', 25}}) + t.assert_equals(err, nil) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows[1], {1, case.bucket_id, 'Ivan', 25}) + + -- There is a tuple on server that we inserted with crud.insert(). + local conn_srv_with_data = g.cluster:server(case.srv_with_data).net_box + local result = conn_srv_with_data.space[space_name]:get({1}) + t.assert_equals(result, {1, case.bucket_id, 'Ivan', 25}) + + -- There is no tuple on server that we inserted with crud.insert(). + local conn_srv_without_data = g.cluster:server(case.srv_without_data).net_box + local result = conn_srv_without_data.space[space_name]:get({1}) + t.assert_equals(result, nil) + + local conditions = {{'==', 'id', 1}} + local result, err = g.cluster.main_server.net_box:call('crud.select', { + space_name, conditions, + }) + + t.assert_equals(err, nil) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows[1], {1, case.bucket_id, 'Ivan', 25}) + end +end diff --git a/test/integration/ddl_sharding_key_test.lua b/test/integration/ddl_sharding_key_test.lua index d9697531..ab8daaba 100644 --- a/test/integration/ddl_sharding_key_test.lua +++ b/test/integration/ddl_sharding_key_test.lua @@ -898,6 +898,7 @@ pgroup.test_update_cache_with_incorrect_key = function(g) customers = {parts = {{fieldno = 1}}}, customers_G_func = {parts = {{fieldno = 1}}}, customers_body_func = {parts = {{fieldno = 1}}}, + customers_empty_sharding_func = {parts = {{fieldno = 1}}}, customers_age_key = {parts = {{fieldno = 4}}}, customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}}, customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}}, @@ -905,6 +906,8 @@ pgroup.test_update_cache_with_incorrect_key = function(g) customers_name_key_non_uniq_index = {parts = {{fieldno = 3}}}, customers_name_key_uniq_index = {parts = {{fieldno = 3}}}, customers_secondary_idx_name_key = {parts = {{fieldno = 3}}}, + customers_vshard_mpcrc32 = {parts = {{fieldno = 1}}}, + customers_vshard_strcrc32 = {parts = {{fieldno = 1}}} }) -- no error just warning @@ -925,12 +928,15 @@ pgroup.test_update_cache_with_incorrect_key = function(g) customers = {parts = {{fieldno = 1}}}, customers_G_func = {parts = {{fieldno = 1}}}, customers_body_func = {parts = {{fieldno = 1}}}, + customers_empty_sharding_func = {parts = {{fieldno = 1}}}, customers_age_key = {parts = {{fieldno = 4}}}, customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}}, customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}}, customers_name_key_non_uniq_index = {parts = {{fieldno = 3}}}, customers_name_key_uniq_index = {parts = {{fieldno = 3}}}, customers_secondary_idx_name_key = {parts = {{fieldno = 3}}}, + customers_vshard_mpcrc32 = {parts = {{fieldno = 1}}}, + customers_vshard_strcrc32 = {parts = {{fieldno = 1}}} }) -- get data from cache for space with incorrect sharding key @@ -951,12 +957,15 @@ pgroup.test_update_cache_with_incorrect_key = function(g) customers = {parts = {{fieldno = 1}}}, customers_G_func = {parts = {{fieldno = 1}}}, customers_body_func = {parts = {{fieldno = 1}}}, + customers_empty_sharding_func = {parts = {{fieldno = 1}}}, customers_age_key = {parts = {{fieldno = 4}}}, customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}}, customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}}, customers_name_key_non_uniq_index = {parts = {{fieldno = 3}}}, customers_name_key_uniq_index = {parts = {{fieldno = 3}}}, customers_secondary_idx_name_key = {parts = {{fieldno = 3}}}, + customers_vshard_mpcrc32 = {parts = {{fieldno = 1}}}, + customers_vshard_strcrc32 = {parts = {{fieldno = 1}}} }) end