Skip to content

Commit

Permalink
sharding func: fix specifying vshard sharding funcs
Browse files Browse the repository at this point in the history
Starting from 0.11.0 user can specify sharding func to
calculate bucket_id with sharding func definition as a part of
DDL schema or insert manually to the space `_ddl_sharding_func`.

Right now ddl fails with setting schema with vshard sharding function.
But even if this bug is fixed, there is also a bug on CRUD side.
Inserting manually to the space `_ddl_sharding_func` showed
that CRUD search vshard sharding func in `_G` but this
approach doesn't work with vshard case.

This patch allows to specify `vshard` sharding func
inserting manually to the space `_ddl_sharding_func`.

Closes #314
  • Loading branch information
AnaNek committed Aug 23, 2022
1 parent ddfc251 commit 90f0c71
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Fixed

- Fix specifying `vshard` sharding funcs (#314).

## [0.12.1] - 21-07-22

### Fixed
Expand Down
23 changes: 22 additions & 1 deletion crud/common/sharding/sharding_func.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ local ShardingFuncError = errors.new_class('ShardingFuncError', {capture_stack

local sharding_func_module = {}

local sharding_module_names = {
['vshard'] = true,
}

local function is_callable(object)
if type(object) == 'function' then
return true
Expand Down Expand Up @@ -43,13 +47,30 @@ end
local function get_function_from_G(func_name)
local chunks = string.split(func_name, '.')
local sharding_func = _G
local sharding_module = false
local ok

if sharding_module_names[chunks[1]] then
ok, sharding_func = pcall(require, chunks[1])
if not ok then
return nil
end

sharding_module = true
table.remove(chunks, 1)
end

-- check is the each chunk an identifier
for _, chunk in pairs(chunks) do
if not utils.check_name_isident(chunk) or sharding_func == nil then
return nil
end
sharding_func = rawget(sharding_func, chunk)

if sharding_module then
sharding_func = sharding_func[chunk]
else
sharding_func = rawget(sharding_func, chunk)
end
end

return sharding_func
Expand Down
2 changes: 1 addition & 1 deletion deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ rm "${LUACOV_COVERALLS_ROCKSPEC_FILE}"
rmdir "${TMPDIR}"

tarantoolctl rocks install cartridge 2.7.4
tarantoolctl rocks install ddl 1.6.0
tarantoolctl rocks install ddl 1.6.2
tarantoolctl rocks install migrations 0.4.2

tarantoolctl rocks make
15 changes: 14 additions & 1 deletion test/entrypoint/srv_ddl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ package.preload['customers-storage'] = function()
local customers_G_func_schema = table.deepcopy(customers_id_key_schema)
customers_G_func_schema.sharding_func = 'some_module.sharding_func'

local customers_empty_sharding_func_schema = table.deepcopy(customers_id_key_schema)

local customers_vshard_mpcrc32_schema = table.deepcopy(customers_id_key_schema)
customers_vshard_mpcrc32_schema.sharding_func = 'vshard.router.bucket_id_mpcrc32'

local customers_vshard_strcrc32_schema = table.deepcopy(customers_id_key_schema)
customers_vshard_strcrc32_schema.sharding_func = 'vshard.router.bucket_id_strcrc32'

local schema = {
spaces = {
customers = customers_id_schema,
Expand All @@ -173,6 +181,9 @@ package.preload['customers-storage'] = function()
customers_name_age_key_three_fields_index = customers_name_age_key_three_fields_index_schema,
customers_G_func = customers_G_func_schema,
customers_body_func = customers_body_func_schema,
customers_empty_sharding_func = customers_empty_sharding_func_schema,
customers_vshard_mpcrc32 = customers_vshard_mpcrc32_schema,
customers_vshard_strcrc32 = customers_vshard_strcrc32_schema,
}
}

Expand All @@ -188,7 +199,9 @@ package.preload['customers-storage'] = function()
box.space['_ddl_sharding_key']:update(space_name, {{'=', fieldno_sharding_key, sharding_key_def}})
end)
rawset(_G, 'set_sharding_func', function(space_name, fieldno_sharding_func, sharding_func_def)
box.space['_ddl_sharding_func']:update(space_name, {{'=', fieldno_sharding_func, sharding_func_def}})
local record = {space_name, box.NULL, box.NULL}
record[fieldno_sharding_func] = sharding_func_def
box.space['_ddl_sharding_func']:replace(record)
end)
end,
}
Expand Down
126 changes: 123 additions & 3 deletions test/integration/ddl_sharding_func_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ local cache_group = t.group('ddl_sharding_func_cache', {
{engine = 'vinyl'},
})

local vshard_group = t.group('ddl_vshard_sharding_func', {
{engine = 'memtx'},
{engine = 'vinyl'},
})

pgroup.before_all(function(g)
g.cluster = helpers.Cluster:new({
datadir = fio.tempdir(),
Expand Down Expand Up @@ -77,6 +82,35 @@ cache_group.before_each(function(g)
helpers.truncate_space_on_cluster(g.cluster, 'customers_body_func')
end)

vshard_group.before_all(function(g)
g.cluster = helpers.Cluster:new({
datadir = fio.tempdir(),
server_command = helpers.entrypoint('srv_ddl'),
use_vshard = true,
replicasets = helpers.get_test_replicasets(),
env = {
['ENGINE'] = g.params.engine,
},
})
g.cluster:start()
local result, err = g.cluster.main_server.net_box:eval([[
local ddl = require('ddl')
local ok, err = ddl.get_schema()
return ok, err
]])
t.assert_equals(type(result), 'table')
t.assert_equals(err, nil)
end)

vshard_group.after_all(function(g) helpers.stop_cluster(g.cluster) end)

vshard_group.before_each(function(g)
helpers.truncate_space_on_cluster(g.cluster, 'customers_vshard_mpcrc32')
helpers.truncate_space_on_cluster(g.cluster, 'customers_vshard_strcrc32')
helpers.truncate_space_on_cluster(g.cluster, 'customers_empty_sharding_func')
end)

pgroup.test_insert_object = function(g)
local result, err = g.cluster.main_server.net_box:call(
'crud.insert_object', {g.params.space_name, {id = 158, name = 'Augustus', age = 48}})
Expand Down Expand Up @@ -703,7 +737,7 @@ cache_group.test_update_cache_with_incorrect_func = function(g)

-- records for all spaces exist
local cache_size = helpers.get_sharding_func_cache_size(g.cluster)
t.assert_equals(cache_size, 2)
t.assert_equals(cache_size, 4)

-- no error just warning
local space_name = 'customers_G_func'
Expand All @@ -719,7 +753,7 @@ cache_group.test_update_cache_with_incorrect_func = function(g)
-- cache['customers_G_func'] == nil (space with incorrect func)
-- other records for correct spaces exist in cache
cache_size = helpers.get_sharding_func_cache_size(g.cluster)
t.assert_equals(cache_size, 1)
t.assert_equals(cache_size, 3)

-- get data from cache for space with incorrect sharding func
local space_name = 'customers_G_func'
Expand All @@ -736,7 +770,7 @@ cache_group.test_update_cache_with_incorrect_func = function(g)
-- cache['customers_G_func'] == nil (space with incorrect func)
-- other records for correct spaces exist in cache
cache_size = helpers.get_sharding_func_cache_size(g.cluster)
t.assert_equals(cache_size, 1)
t.assert_equals(cache_size, 3)
end


Expand Down Expand Up @@ -938,3 +972,89 @@ pgroup.test_gh_278_count_with_explicit_bucket_id_and_ddl = function(g)
t.assert_is_not(obj, nil)
t.assert_equals(obj, 1)
end

local vshard_cases = {
mpcrc32_not_depends_on_ddl = {
set_sharding_func_to_ddl_space = true,
space_name = 'customers_empty_sharding_func',
sharding_func_name = 'vshard.router.bucket_id_mpcrc32',
bucket_id = 1614,
srv_with_data = 's1-master',
srv_without_data = 's2-master',
},
strcrc32_not_depends_on_ddl = {
set_sharding_func_to_ddl_space = true,
space_name = 'customers_empty_sharding_func',
sharding_func_name = 'vshard.router.bucket_id_strcrc32',
bucket_id = 477,
srv_with_data = 's2-master',
srv_without_data = 's1-master',
},
mpcrc32_depends_on_ddl = {
space_name = 'customers_vshard_mpcrc32',
sharding_func_name = 'vshard.router.bucket_id_mpcrc32',
bucket_id = 1614,
srv_with_data = 's1-master',
srv_without_data = 's2-master',
},
strcrc32_depends_on_ddl = {
space_name = 'customers_vshard_strcrc32',
sharding_func_name = 'vshard.router.bucket_id_strcrc32',
bucket_id = 477,
srv_with_data = 's2-master',
srv_without_data = 's1-master',
}
}

for name, case in pairs(vshard_cases) do
local test_name = ('test_vshard_%s'):format(name)

vshard_group[test_name] = function(g)
local space_name = case.space_name

if case.set_sharding_func_to_ddl_space then
local fieldno_sharding_func_name = 2

helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server)
server.net_box:call('set_sharding_func',
{space_name, fieldno_sharding_func_name, case.sharding_func_name})
end)

local record_exist, err = helpers.update_sharding_func_cache(g.cluster, space_name)
t.assert_equals(err, nil)
t.assert_equals(record_exist, true)
end

-- Insert a tuple.
local result, err = g.cluster.main_server.net_box:call(
'crud.insert', {space_name, {1, box.NULL, 'Ivan', 25}})
t.assert_equals(err, nil)
t.assert_equals(result.metadata, {
{is_nullable = false, name = 'id', type = 'unsigned'},
{is_nullable = false, name = 'bucket_id', type = 'unsigned'},
{is_nullable = false, name = 'name', type = 'string'},
{is_nullable = false, name = 'age', type = 'number'},
})
t.assert_equals(#result.rows, 1)
t.assert_equals(result.rows[1], {1, case.bucket_id, 'Ivan', 25})

-- There is a tuple on server that we inserted before using crud.insert().
local conn_srv_with_data = g.cluster:server(case.srv_with_data).net_box
local result = conn_srv_with_data.space[space_name]:get({1})
t.assert_equals(result, {1, case.bucket_id, 'Ivan', 25})

-- There is no tuple on server that we inserted before using crud.insert().
local conn_srv_without_data = g.cluster:server(case.srv_without_data).net_box
local result = conn_srv_without_data.space[space_name]:get({1})
t.assert_equals(result, nil)

local conditions = {{'==', 'id', 1}}
local result, err = g.cluster.main_server.net_box:call('crud.select', {
space_name, conditions,
})

t.assert_equals(err, nil)
t.assert_equals(#result.rows, 1)
t.assert_equals(result.rows[1], {1, case.bucket_id, 'Ivan', 25})
end
end
9 changes: 9 additions & 0 deletions test/integration/ddl_sharding_key_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -898,13 +898,16 @@ pgroup.test_update_cache_with_incorrect_key = function(g)
customers = {parts = {{fieldno = 1}}},
customers_G_func = {parts = {{fieldno = 1}}},
customers_body_func = {parts = {{fieldno = 1}}},
customers_empty_sharding_func = {parts = {{fieldno = 1}}},
customers_age_key = {parts = {{fieldno = 4}}},
customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}},
customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}},
customers_name_key = {parts = {{fieldno = 3}}},
customers_name_key_non_uniq_index = {parts = {{fieldno = 3}}},
customers_name_key_uniq_index = {parts = {{fieldno = 3}}},
customers_secondary_idx_name_key = {parts = {{fieldno = 3}}},
customers_vshard_mpcrc32 = {parts = {{fieldno = 1}}},
customers_vshard_strcrc32 = {parts = {{fieldno = 1}}}
})

-- no error just warning
Expand All @@ -925,12 +928,15 @@ pgroup.test_update_cache_with_incorrect_key = function(g)
customers = {parts = {{fieldno = 1}}},
customers_G_func = {parts = {{fieldno = 1}}},
customers_body_func = {parts = {{fieldno = 1}}},
customers_empty_sharding_func = {parts = {{fieldno = 1}}},
customers_age_key = {parts = {{fieldno = 4}}},
customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}},
customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}},
customers_name_key_non_uniq_index = {parts = {{fieldno = 3}}},
customers_name_key_uniq_index = {parts = {{fieldno = 3}}},
customers_secondary_idx_name_key = {parts = {{fieldno = 3}}},
customers_vshard_mpcrc32 = {parts = {{fieldno = 1}}},
customers_vshard_strcrc32 = {parts = {{fieldno = 1}}}
})

-- get data from cache for space with incorrect sharding key
Expand All @@ -951,12 +957,15 @@ pgroup.test_update_cache_with_incorrect_key = function(g)
customers = {parts = {{fieldno = 1}}},
customers_G_func = {parts = {{fieldno = 1}}},
customers_body_func = {parts = {{fieldno = 1}}},
customers_empty_sharding_func = {parts = {{fieldno = 1}}},
customers_age_key = {parts = {{fieldno = 4}}},
customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}},
customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}},
customers_name_key_non_uniq_index = {parts = {{fieldno = 3}}},
customers_name_key_uniq_index = {parts = {{fieldno = 3}}},
customers_secondary_idx_name_key = {parts = {{fieldno = 3}}},
customers_vshard_mpcrc32 = {parts = {{fieldno = 1}}},
customers_vshard_strcrc32 = {parts = {{fieldno = 1}}}
})
end

Expand Down

0 comments on commit 90f0c71

Please sign in to comment.