Skip to content

Commit

Permalink
feat(one-time backup): support user specified backup path (#814)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyifan27 authored Apr 16, 2021
1 parent 4ee361b commit 03a441a
Show file tree
Hide file tree
Showing 16 changed files with 255 additions and 30 deletions.
2 changes: 2 additions & 0 deletions include/dsn/dist/block_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ class block_filesystem
const remove_path_callback &cb,
dsn::task_tracker *tracker = nullptr) = 0;

virtual bool is_root_path_set() const { return false; }

virtual ~block_filesystem() {}
};

Expand Down
8 changes: 6 additions & 2 deletions src/block_service/hdfs/hdfs_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,15 @@ hdfs_service::~hdfs_service()

error_code hdfs_service::initialize(const std::vector<std::string> &args)
{
if (args.size() != 2) {
if (args.size() < 1) {
return ERR_INVALID_PARAMETERS;
}
// Name_node and root_path should be set in args of block_service configuration.
// If no path was configured, just use "/" as default root path.
_hdfs_name_node = args[0];
_hdfs_path = args[1];
_hdfs_path = args.size() >= 2 ? args[1] : "/";
ddebug_f("hdfs backup root path is initialized to {}.", _hdfs_path);

return create_fs();
}

Expand Down
2 changes: 2 additions & 0 deletions src/block_service/hdfs/hdfs_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class hdfs_service : public block_filesystem

static std::string get_hdfs_entry_name(const std::string &hdfs_path);

bool is_root_path_set() const override { return _hdfs_path != "/"; }

private:
hdfsFS _fs;
std::string _hdfs_name_node;
Expand Down
16 changes: 11 additions & 5 deletions src/common/backup.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ struct backup_request
2:policy_info policy;
3:string app_name;
4:i64 backup_id;
// user specified backup_path.
5:optional string backup_path;
}

struct backup_response
Expand Down Expand Up @@ -157,8 +159,10 @@ struct configuration_query_restore_response

struct start_backup_app_request
{
1:string backup_provider_type;
2:i32 app_id;
1:string backup_provider_type;
2:i32 app_id;
// user specified backup_path.
3:optional string backup_path;
}

struct start_backup_app_response
Expand All @@ -177,9 +181,11 @@ struct backup_item
1:i64 backup_id;
2:string app_name;
3:string backup_provider_type;
4:i64 start_time_ms;
5:i64 end_time_ms;
6:bool is_backup_failed;
// user specified backup_path.
4:string backup_path;
5:i64 start_time_ms;
6:i64 end_time_ms;
7:bool is_backup_failed;
}

struct query_backup_status_request
Expand Down
31 changes: 24 additions & 7 deletions src/meta/backup_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/filesystem.h>

#include "common/backup_utils.h"
#include "common/replication_common.h"
Expand All @@ -25,7 +26,7 @@ namespace dsn {
namespace replication {

backup_engine::backup_engine(backup_service *service)
: _backup_service(service), _block_service(nullptr), _is_backup_failed(false)
: _backup_service(service), _block_service(nullptr), _backup_path(""), _is_backup_failed(false)
{
}

Expand Down Expand Up @@ -71,6 +72,16 @@ error_code backup_engine::set_block_service(const std::string &provider)
return ERR_OK;
}

error_code backup_engine::set_backup_path(const std::string &path)
{
if (_block_service && _block_service->is_root_path_set()) {
return ERR_INVALID_PARAMETERS;
}
ddebug_f("backup path is set to {}.", path);
_backup_path = path;
return ERR_OK;
}

error_code backup_engine::write_backup_file(const std::string &file_name,
const dsn::blob &write_buffer)
{
Expand Down Expand Up @@ -121,10 +132,10 @@ error_code backup_engine::backup_app_meta()
app_info_buffer = dsn::json::json_forwarder<app_info>::encode(tmp);
}

std::string file_name = cold_backup::get_app_metadata_file(_backup_service->backup_root(),
_cur_backup.app_name,
_cur_backup.app_id,
_cur_backup.backup_id);
std::string backup_root =
dsn::utils::filesystem::path_combine(_backup_path, _backup_service->backup_root());
std::string file_name = cold_backup::get_app_metadata_file(
backup_root, _cur_backup.app_name, _cur_backup.app_id, _cur_backup.backup_id);
return write_backup_file(file_name, app_info_buffer);
}

Expand Down Expand Up @@ -165,6 +176,10 @@ void backup_engine::backup_app_partition(const gpid &pid)
req->policy = backup_policy_info;
req->backup_id = _cur_backup.backup_id;
req->app_name = _cur_backup.app_name;
if (!_backup_path.empty()) {
req->__isset.backup_path = true;
req->backup_path = _backup_path;
}

ddebug_f("backup_id({}): send backup request to partition {}, target_addr = {}",
_cur_backup.backup_id,
Expand Down Expand Up @@ -250,8 +265,9 @@ void backup_engine::on_backup_reply(error_code err,

void backup_engine::write_backup_info()
{
std::string file_name =
cold_backup::get_backup_info_file(_backup_service->backup_root(), _cur_backup.backup_id);
std::string backup_root =
dsn::utils::filesystem::path_combine(_backup_path, _backup_service->backup_root());
std::string file_name = cold_backup::get_backup_info_file(backup_root, _cur_backup.backup_id);
blob buf = dsn::json::json_forwarder<app_backup_info>::encode(_cur_backup);
error_code err = write_backup_file(file_name, buf);
if (err == ERR_FS_INTERNAL) {
Expand Down Expand Up @@ -324,6 +340,7 @@ backup_item backup_engine::get_backup_item() const
backup_item item;
item.backup_id = _cur_backup.backup_id;
item.app_name = _cur_backup.app_name;
item.backup_path = _backup_path;
item.backup_provider_type = _provider_type;
item.start_time_ms = _cur_backup.start_time_ms;
item.end_time_ms = _cur_backup.end_time_ms;
Expand Down
4 changes: 4 additions & 0 deletions src/meta/backup_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class backup_engine

error_code init_backup(int32_t app_id);
error_code set_block_service(const std::string &provider);
error_code set_backup_path(const std::string &path);

error_code start();

Expand All @@ -66,6 +67,8 @@ class backup_engine

private:
friend class backup_engine_test;
friend class backup_service_test;

FRIEND_TEST(backup_engine_test, test_on_backup_reply);
FRIEND_TEST(backup_engine_test, test_backup_completed);
FRIEND_TEST(backup_engine_test, test_write_backup_info_failed);
Expand All @@ -87,6 +90,7 @@ class backup_engine

backup_service *_backup_service;
dist::block_service::block_filesystem *_block_service;
std::string _backup_path;
std::string _provider_type;
dsn::task_tracker _tracker;

Expand Down
11 changes: 11 additions & 0 deletions src/meta/meta_backup_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,17 @@ void backup_service::start_backup_app(start_backup_app_rpc rpc)
return;
}

if (request.__isset.backup_path) {
err = engine->set_backup_path(request.backup_path);
if (err != ERR_OK) {
response.err = err;
response.hint_message = "Backup failed: the default backup path has already configured "
"in `hdfs_service`, please modify the configuration if you "
"want to use a specific backup path.";
return;
}
}

{
zauto_lock l(_lock);
for (const auto &backup : _backup_states) {
Expand Down
1 change: 1 addition & 0 deletions src/meta/meta_backup_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ class backup_service
std::string get_backup_path(const std::string &policy_name, int64_t backup_id);

private:
friend class backup_service_test;
friend class meta_service_test_app;

FRIEND_TEST(backup_service_test, test_init_backup);
Expand Down
4 changes: 4 additions & 0 deletions src/meta/test/config-test.ini
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ logfile = zoolog.log
type = local_service
args = ./block_service

[block_service.local_service_empty_root]
type = local_service
args =

[block_service.fds_service]
type = fds_service
args =
Expand Down
50 changes: 49 additions & 1 deletion src/meta/test/meta_backup_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <dsn/utility/fail_point.h>
#include <dsn/utility/filesystem.h>
#include <gtest/gtest.h>

#include "common/backup_utils.h"
Expand Down Expand Up @@ -49,11 +50,16 @@ class backup_service_test : public meta_test_base
create_app(_app_name);
}

start_backup_app_response start_backup(int32_t app_id, const std::string &provider)
start_backup_app_response
start_backup(int32_t app_id, const std::string &provider, const std::string &backup_path = "")
{
auto request = dsn::make_unique<start_backup_app_request>();
request->app_id = app_id;
request->backup_provider_type = provider;
if (!backup_path.empty()) {
request->__isset.backup_path = true;
request->backup_path = backup_path;
}

start_backup_app_rpc rpc(std::move(request), RPC_CM_START_BACKUP_APP);
_backup_service->start_backup_app(rpc);
Expand All @@ -74,6 +80,41 @@ class backup_service_test : public meta_test_base
return rpc.response();
}

bool write_metadata_succeed(int32_t app_id,
int64_t backup_id,
const std::string &user_specified_path)
{
std::string backup_root = dsn::utils::filesystem::path_combine(
user_specified_path, _backup_service->backup_root());
auto app = _ms->_state->get_app(app_id);
std::string metadata_file =
cold_backup::get_app_metadata_file(backup_root, app->app_name, app_id, backup_id);

int64_t metadata_file_size = 0;
if (!dsn::utils::filesystem::file_size(metadata_file, metadata_file_size)) {
return false;
}
return metadata_file_size > 0;
}

void test_specific_backup_path(int32_t test_app_id, const std::string &user_specified_path = "")
{
auto resp = start_backup(test_app_id, "local_service_empty_root", user_specified_path);
ASSERT_EQ(ERR_OK, resp.err);
ASSERT_TRUE(resp.__isset.backup_id);
ASSERT_EQ(1, _backup_service->_backup_states.size());

auto backup_engine = _backup_service->_backup_states[0];
if (user_specified_path.empty()) {
ASSERT_TRUE(backup_engine->_backup_path.empty());
} else {
ASSERT_EQ(user_specified_path, backup_engine->_backup_path);
}

int64_t backup_id = resp.backup_id;
ASSERT_TRUE(write_metadata_succeed(test_app_id, backup_id, user_specified_path));
}

protected:
const std::string _policy_root;
const std::string _backup_root;
Expand Down Expand Up @@ -121,6 +162,13 @@ TEST_F(backup_service_test, test_write_backup_metadata_failed)
fail::teardown();
}

TEST_F(backup_service_test, test_backup_app_with_no_specific_path) { test_specific_backup_path(1); }

TEST_F(backup_service_test, test_backup_app_with_user_specified_path)
{
test_specific_backup_path(1, "test/backup");
}

TEST_F(backup_service_test, test_query_backup_status)
{
// query a backup that does not exist
Expand Down
2 changes: 1 addition & 1 deletion src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

/////////////////////////////////////////////////////////////////
// cold backup
void generate_backup_checkpoint(cold_backup_context_ptr backup_context);
virtual void generate_backup_checkpoint(cold_backup_context_ptr backup_context);
void trigger_async_checkpoint_for_backup(cold_backup_context_ptr backup_context);
void wait_async_checkpoint_for_backup(cold_backup_context_ptr backup_context);
void local_create_backup_checkpoint(cold_backup_context_ptr backup_context);
Expand Down
22 changes: 21 additions & 1 deletion src/replica/replica_backup.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <boost/lexical_cast.hpp>

#include <dsn/utility/filesystem.h>
Expand Down Expand Up @@ -57,7 +74,10 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo
dassert(r.second, "");
backup_context = r.first->second;
backup_context->block_service = block_service;
backup_context->backup_root = _options->cold_backup_root;
backup_context->backup_root = request.__isset.backup_path
? dsn::utils::filesystem::path_combine(
request.backup_path, _options->cold_backup_root)
: _options->cold_backup_root;
}

dcheck_eq_replica(backup_context->request.policy.policy_name, policy_name);
Expand Down
22 changes: 20 additions & 2 deletions src/replica/test/clear.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,20 @@
#!/ bin / sh
rm - rf core.* data/
#!/bin/sh
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

rm -rf core.* data/ log.* replica.* tag* test* test_cluster/
6 changes: 5 additions & 1 deletion src/replica/test/config-test.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type = replica
run = true
count = 1
ports = 54321
pools = THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_REPLICATION,THREAD_POOL_SLOG,THREAD_POOL_PLOG
pools = THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_REPLICATION,THREAD_POOL_SLOG,THREAD_POOL_PLOG,THREAD_POOL_BLOCK_SERVICE

[core]
;tool = simulator
Expand Down Expand Up @@ -65,3 +65,7 @@ allow_inline = false
rpc_call_channel = RPC_CHANNEL_TCP
rpc_message_header_format = dsn
rpc_timeout_milliseconds = 5000

[block_service.local_service]
type = local_service
args =
Loading

0 comments on commit 03a441a

Please sign in to comment.