Skip to content

Commit

Permalink
fix: remove user specified compaction if the corresponding appenv was…
Browse files Browse the repository at this point in the history
… removed (#814)
  • Loading branch information
levy5307 authored Sep 16, 2021
1 parent c2e6ea2 commit d5976e2
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 71 deletions.
27 changes: 13 additions & 14 deletions src/server/compaction_filter_rule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@

namespace pegasus {
namespace server {
bool string_pattern_match(const std::string &value,
bool string_pattern_match(dsn::string_view value,
string_match_type type,
const std::string &filter_pattern)
dsn::string_view filter_pattern)
{
if (filter_pattern.empty())
return false;
Expand All @@ -38,7 +38,7 @@ bool string_pattern_match(const std::string &value,

switch (type) {
case string_match_type::SMT_MATCH_ANYWHERE:
return dsn::string_view(value).find(filter_pattern) != dsn::string_view::npos;
return value.find(filter_pattern) != dsn::string_view::npos;
case string_match_type::SMT_MATCH_PREFIX:
return memcmp(value.data(), filter_pattern.data(), filter_pattern.length()) == 0;
case string_match_type::SMT_MATCH_POSTFIX:
Expand All @@ -53,30 +53,29 @@ bool string_pattern_match(const std::string &value,

hashkey_pattern_rule::hashkey_pattern_rule(uint32_t data_version) {}

bool hashkey_pattern_rule::match(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value) const
bool hashkey_pattern_rule::match(dsn::string_view hash_key,
dsn::string_view sort_key,
dsn::string_view existing_value) const
{
return string_pattern_match(hash_key, match_type, pattern);
}

sortkey_pattern_rule::sortkey_pattern_rule(uint32_t data_version) {}

bool sortkey_pattern_rule::match(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value) const
bool sortkey_pattern_rule::match(dsn::string_view hash_key,
dsn::string_view sort_key,
dsn::string_view existing_value) const
{
return string_pattern_match(sort_key, match_type, pattern);
}

ttl_range_rule::ttl_range_rule(uint32_t data_version) : data_version(data_version) {}

bool ttl_range_rule::match(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value) const
bool ttl_range_rule::match(dsn::string_view hash_key,
dsn::string_view sort_key,
dsn::string_view existing_value) const
{
uint32_t expire_ts =
pegasus_extract_expire_ts(data_version, utils::to_string_view(existing_value));
uint32_t expire_ts = pegasus_extract_expire_ts(data_version, existing_value);
// if start_ttl and stop_ttl = 0, it means we want to delete keys which have no ttl
if (0 == expire_ts && 0 == start_ttl && 0 == stop_ttl) {
return true;
Expand Down
25 changes: 12 additions & 13 deletions src/server/compaction_filter_rule.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#pragma once

#include <rocksdb/slice.h>
#include <dsn/utility/enum_helper.h>
#include <dsn/cpp/json_helper.h>
#include <gtest/gtest.h>
Expand Down Expand Up @@ -69,9 +68,9 @@ class compaction_filter_rule

// TODO(zhaoliwei): we can use `value_filed` to replace existing_value in the later,
// after the refactor of value schema
virtual bool match(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value) const = 0;
virtual bool match(dsn::string_view hash_key,
dsn::string_view sort_key,
dsn::string_view existing_value) const = 0;
};

enum string_match_type
Expand All @@ -94,9 +93,9 @@ class hashkey_pattern_rule : public compaction_filter_rule
public:
hashkey_pattern_rule(uint32_t data_version = VERSION_MAX);

bool match(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value) const;
bool match(dsn::string_view hash_key,
dsn::string_view sort_key,
dsn::string_view existing_value) const;
DEFINE_JSON_SERIALIZATION(pattern, match_type)

private:
Expand All @@ -116,9 +115,9 @@ class sortkey_pattern_rule : public compaction_filter_rule
public:
sortkey_pattern_rule(uint32_t data_version = VERSION_MAX);

bool match(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value) const;
bool match(dsn::string_view hash_key,
dsn::string_view sort_key,
dsn::string_view existing_value) const;
DEFINE_JSON_SERIALIZATION(pattern, match_type)

private:
Expand All @@ -136,9 +135,9 @@ class ttl_range_rule : public compaction_filter_rule
public:
explicit ttl_range_rule(uint32_t data_version);

bool match(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value) const;
bool match(dsn::string_view hash_key,
dsn::string_view sort_key,
dsn::string_view existing_value) const;
DEFINE_JSON_SERIALIZATION(start_ttl, stop_ttl)

private:
Expand Down
22 changes: 11 additions & 11 deletions src/server/compaction_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ namespace pegasus {
namespace server {
compaction_operation::~compaction_operation() = default;

bool compaction_operation::all_rules_match(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value) const
bool compaction_operation::all_rules_match(dsn::string_view hash_key,
dsn::string_view sort_key,
dsn::string_view existing_value) const
{
if (rules.empty()) {
return false;
Expand All @@ -50,9 +50,9 @@ delete_key::delete_key(filter_rules &&rules, uint32_t data_version)

delete_key::delete_key(uint32_t data_version) : compaction_operation(data_version) {}

bool delete_key::filter(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value,
bool delete_key::filter(dsn::string_view hash_key,
dsn::string_view sort_key,
dsn::string_view existing_value,
std::string *new_value,
bool *value_changed) const
{
Expand All @@ -69,9 +69,9 @@ update_ttl::update_ttl(filter_rules &&rules, uint32_t data_version)

update_ttl::update_ttl(uint32_t data_version) : compaction_operation(data_version) {}

bool update_ttl::filter(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value,
bool update_ttl::filter(dsn::string_view hash_key,
dsn::string_view sort_key,
dsn::string_view existing_value,
std::string *new_value,
bool *value_changed) const
{
Expand All @@ -85,7 +85,7 @@ bool update_ttl::filter(const std::string &hash_key,
new_ts = utils::epoch_now() + value;
break;
case update_ttl_op_type::UTOT_FROM_CURRENT: {
auto ttl = pegasus_extract_expire_ts(data_version, utils::to_string_view(existing_value));
auto ttl = pegasus_extract_expire_ts(data_version, existing_value);
if (ttl == 0) {
return false;
}
Expand All @@ -101,7 +101,7 @@ bool update_ttl::filter(const std::string &hash_key,
return false;
}

*new_value = existing_value.ToString();
*new_value = std::string(existing_value.data(), existing_value.size());
pegasus_update_expire_ts(data_version, *new_value, new_ts);
*value_changed = true;
return false;
Expand Down
24 changes: 12 additions & 12 deletions src/server/compaction_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,18 @@ class compaction_operation
explicit compaction_operation(uint32_t data_version) : data_version(data_version) {}
virtual ~compaction_operation() = 0;

bool all_rules_match(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value) const;
bool all_rules_match(dsn::string_view hash_key,
dsn::string_view sort_key,
dsn::string_view existing_value) const;
void set_rules(filter_rules &&rules);
/**
* @return false indicates that this key-value should be removed
* If you want to modify the existing_value, you can pass it back through new_value and
* value_changed needs to be set to true in this case.
*/
virtual bool filter(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value,
virtual bool filter(dsn::string_view hash_key,
dsn::string_view sort_key,
dsn::string_view existing_value,
std::string *new_value,
bool *value_changed) const = 0;

Expand All @@ -95,9 +95,9 @@ class delete_key : public compaction_operation
delete_key(filter_rules &&rules, uint32_t data_version);
explicit delete_key(uint32_t data_version);

bool filter(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value,
bool filter(dsn::string_view hash_key,
dsn::string_view sort_key,
dsn::string_view existing_value,
std::string *new_value,
bool *value_changed) const;

Expand Down Expand Up @@ -143,9 +143,9 @@ class update_ttl : public compaction_operation
update_ttl(filter_rules &&rules, uint32_t data_version);
explicit update_ttl(uint32_t data_version);

bool filter(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value,
bool filter(dsn::string_view hash_key,
dsn::string_view sort_key,
dsn::string_view existing_value,
std::string *new_value,
bool *value_changed) const;
DEFINE_JSON_SERIALIZATION(type, value)
Expand Down
28 changes: 19 additions & 9 deletions src/server/key_ttl_compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,26 +68,31 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
return false;
}

if (!_user_specified_operations.empty() &&
user_specified_operation_filter(key, existing_value, new_value, value_changed)) {
return true;
}

uint32_t expire_ts =
pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(existing_value));
if (_default_ttl != 0 && expire_ts == 0) {
// should update ttl
expire_ts = utils::epoch_now() + _default_ttl;
*new_value = existing_value.ToString();
pegasus_update_expire_ts(
_pegasus_data_version, *new_value, utils::epoch_now() + _default_ttl);
pegasus_update_expire_ts(_pegasus_data_version, *new_value, expire_ts);
*value_changed = true;
return false;
}

if (!_user_specified_operations.empty()) {
dsn::string_view value_view = utils::to_string_view(existing_value);
if (*value_changed) {
value_view = *new_value;
}
if (user_specified_operation_filter(key, value_view, new_value, value_changed)) {
return true;
}
}

return check_if_ts_expired(utils::epoch_now(), expire_ts) || check_if_stale_split_data(key);
}

bool user_specified_operation_filter(const rocksdb::Slice &key,
const rocksdb::Slice &existing_value,
dsn::string_view existing_value,
std::string *new_value,
bool *value_changed) const
{
Expand Down Expand Up @@ -178,6 +183,11 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor
_user_specified_operations.swap(operations);
}
}
void clear_user_specified_ops()
{
dsn::utils::auto_write_lock l(_lock);
_user_specified_operations.clear();
}

private:
std::atomic<uint32_t> _pegasus_data_version;
Expand Down
8 changes: 8 additions & 0 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2611,9 +2611,17 @@ void pegasus_server_impl::update_user_specified_compaction(
const std::map<std::string, std::string> &envs)
{
auto iter = envs.find(USER_SPECIFIED_COMPACTION);
if (dsn_unlikely(iter == envs.end() && _user_specified_compaction != "")) {
ddebug_replica("clear user specified compaction coz it was deleted");
_key_ttl_compaction_filter_factory->clear_user_specified_ops();
_user_specified_compaction = "";
return;
}
if (dsn_unlikely(iter != envs.end() && iter->second != _user_specified_compaction)) {
ddebug_replica("update user specified compaction coz it was changed");
_key_ttl_compaction_filter_factory->extract_user_specified_ops(iter->second);
_user_specified_compaction = iter->second;
return;
}
}

Expand Down
8 changes: 3 additions & 5 deletions src/server/test/compaction_filter_rule_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@ TEST(hashkey_pattern_rule_test, match)
{"hashkey", "hashkey", SMT_INVALID, false},
};

rocksdb::Slice slice;
hashkey_pattern_rule rule;
for (const auto &test : tests) {
rule.match_type = test.match_type;
rule.pattern = test.pattern;
ASSERT_EQ(rule.match(test.hashkey, "", slice), test.match);
ASSERT_EQ(rule.match(test.hashkey, "", ""), test.match);
}
}

Expand Down Expand Up @@ -88,12 +87,11 @@ TEST(sortkey_pattern_rule_test, match)
{"sortkey", "sortkey", SMT_INVALID, false},
};

rocksdb::Slice slice;
sortkey_pattern_rule rule;
for (const auto &test : tests) {
rule.match_type = test.match_type;
rule.pattern = test.pattern;
ASSERT_EQ(rule.match("", test.sortkey, slice), test.match);
ASSERT_EQ(rule.match("", test.sortkey, ""), test.match);
}
}

Expand Down Expand Up @@ -128,7 +126,7 @@ TEST(ttl_range_rule_test, match)
rule.stop_ttl = test.stop_ttl;
rocksdb::SliceParts svalue =
gen.generate_value(data_version, "", test.expire_ttl + now_ts, 0);
ASSERT_EQ(rule.match("", "", svalue.parts[0]), test.match);
ASSERT_EQ(rule.match("", "", svalue.parts[0].ToString()), test.match);
}
}

Expand Down
14 changes: 7 additions & 7 deletions src/server/test/compaction_operation_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@ TEST(compaction_filter_operation_test, all_rules_match)

rocksdb::SliceParts svalue =
gen.generate_value(data_version, "", test.expire_ttl + now_ts, 0);
ASSERT_EQ(delete_operation.all_rules_match(test.hashkey, test.sortkey, svalue.parts[0]),
ASSERT_EQ(delete_operation.all_rules_match(
test.hashkey, test.sortkey, svalue.parts[0].ToString()),
test.all_match);
}

// all_rules_match will return false if there is no rule in this operation
update_ttl no_rule_operation({}, data_version);
ASSERT_EQ(no_rule_operation.all_rules_match("hash", "sort", rocksdb::Slice()), false);
ASSERT_EQ(no_rule_operation.all_rules_match("hash", "sort", ""), false);
}

TEST(delete_key_test, filter)
Expand All @@ -140,8 +141,7 @@ TEST(delete_key_test, filter)
auto hash_rule = static_cast<hashkey_pattern_rule *>(delete_operation.rules.begin()->get());
hash_rule->pattern = test.hashkey_pattern;
hash_rule->match_type = test.hashkey_match_type;
ASSERT_EQ(test.filter,
delete_operation.filter(test.hashkey, "", rocksdb::Slice(), nullptr, nullptr));
ASSERT_EQ(test.filter, delete_operation.filter(test.hashkey, "", "", nullptr, nullptr));
}
}

Expand Down Expand Up @@ -197,9 +197,9 @@ TEST(update_ttl_test, filter)
bool value_changed = false;
rocksdb::SliceParts svalue = gen.generate_value(data_version, "", test.expire_ts, 0);
uint32_t before_ts = utils::epoch_now();
ASSERT_EQ(
false,
update_operation.filter(test.hashkey, "", svalue.parts[0], &new_value, &value_changed));
ASSERT_EQ(false,
update_operation.filter(
test.hashkey, "", svalue.parts[0].ToString(), &new_value, &value_changed));
ASSERT_EQ(test.value_changed, value_changed);
if (value_changed) {
uint32_t new_ts = pegasus_extract_expire_ts(data_version, new_value);
Expand Down

0 comments on commit d5976e2

Please sign in to comment.