Skip to content

Commit

Permalink
Support SSL authentication with Kafka in routine load job (#1235)
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman authored and imay committed Jun 7, 2019
1 parent cb91e15 commit ff0dd0d
Show file tree
Hide file tree
Showing 105 changed files with 2,803 additions and 540 deletions.
7 changes: 0 additions & 7 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,6 @@ AgentServer::AgentServer(ExecEnv* exec_env,
}
}

// create tmp dir
// boost::filesystem::path tmp_path(config::agent_tmp_dir);
// if (boost::filesystem::exists(tmp_path)) {
// boost::filesystem::remove_all(tmp_path);
// }
// boost::filesystem::create_directories(config::agent_tmp_dir);

// init task worker pool
_create_table_workers = new TaskWorkerPool(
TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
Expand Down
4 changes: 4 additions & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ Status HeartbeatServer::_heartbeat(
}
}

if (master_info.__isset.http_port) {
_master_info->__set_http_port(master_info.http_port);
}

if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_olap_engine->report_notify(true);
Expand Down
17 changes: 6 additions & 11 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,10 @@ namespace config {
CONF_Int32(sleep_one_second, "1");
// sleep time for five seconds
CONF_Int32(sleep_five_seconds, "5");
// trans file tools dir
CONF_String(trans_file_tool_path, "${DORIS_HOME}/tools/trans_file_tool/trans_files.sh");
// agent tmp dir
CONF_String(agent_tmp_dir, "${DORIS_HOME}/tmp");

// log dir
CONF_String(sys_log_dir, "${DORIS_HOME}/log");
CONF_String(user_function_dir, "${DORIS_HOME}/lib/usr");
CONF_String(user_function_dir, "${DORIS_HOME}/lib/udf");
// INFO, WARNING, ERROR, FATAL
CONF_String(sys_log_level, "INFO");
// TIME-DAY, TIME-HOUR, SIZE-MB-nnn
Expand Down Expand Up @@ -208,7 +204,7 @@ namespace config {
CONF_Int32(file_descriptor_cache_clean_interval, "3600");
CONF_Int32(disk_stat_monitor_interval, "5");
CONF_Int32(unused_index_monitor_interval, "30");
CONF_String(storage_root_path, "${DORIS_HOME}/storage");
CONF_String(storage_root_path, "${DORIS_HOME}/data");
CONF_Int32(min_percentage_of_error_disk, "50");
CONF_Int32(default_num_rows_per_data_block, "1024");
CONF_Int32(default_num_rows_per_column_file_block, "1024");
Expand Down Expand Up @@ -256,12 +252,8 @@ namespace config {

// Port to start debug webserver on
CONF_Int32(webserver_port, "8040");
// Interface to start debug webserver on. If blank, webserver binds to 0.0.0.0
CONF_String(webserver_interface, "");
CONF_String(webserver_doc_root, "${DORIS_HOME}");
// Number of webserver workers
CONF_Int32(webserver_num_workers, "5");
// If true, webserver may serve static files from the webserver_doc_root
CONF_Bool(enable_webserver_doc_root, "true");
// Period to update rate counters and sampling counters in ms.
CONF_Int32(periodic_counter_update_period_ms, "500");

Expand Down Expand Up @@ -419,6 +411,9 @@ namespace config {
// same cache size configuration.
// TODO(cmy): use different config to set different client cache if necessary.
CONF_Int32(max_client_cache_size_per_host, "10");

// Dir to save files downloaded by SmallFileMgr
CONF_String(small_file_dir, "${DORIS_HOME}/lib/small_file/");
} // namespace config

} // namespace doris
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ set(RUNTIME_FILES
routine_load/data_consumer_group.cpp
routine_load/data_consumer_pool.cpp
routine_load/routine_load_task_executor.cpp
small_file_mgr.cpp
)

if (WITH_MYSQL)
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TmpFileMgr;
class WebPageHandler;
class StreamLoadExecutor;
class RoutineLoadTaskExecutor;
class SmallFileMgr;

class BackendServiceClient;
class FrontendServiceClient;
Expand Down Expand Up @@ -112,6 +113,7 @@ class ExecEnv {
BufferPool* buffer_pool() { return _buffer_pool; }
TabletWriterMgr* tablet_writer_mgr() { return _tablet_writer_mgr; }
LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr; }
SmallFileMgr* small_file_mgr() { return _small_file_mgr; }

const std::vector<StorePath>& store_paths() const { return _store_paths; }
void set_store_paths(const std::vector<StorePath>& paths) { _store_paths = paths; }
Expand Down Expand Up @@ -167,6 +169,7 @@ class ExecEnv {

StreamLoadExecutor* _stream_load_executor = nullptr;
RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr;
SmallFileMgr* _small_file_mgr = nullptr;
};

}
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "runtime/routine_load/routine_load_task_executor.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/small_file_mgr.h"
#include "util/pretty_printer.h"
#include "util/doris_metrics.h"
#include "util/brpc_stub_cache.h"
Expand Down Expand Up @@ -99,6 +100,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_brpc_stub_cache = new BrpcStubCache();
_stream_load_executor = new StreamLoadExecutor(this);
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
_small_file_mgr = new SmallFileMgr(this, config::small_file_dir);

_backend_client_cache->init_metrics(DorisMetrics::metrics(), "backend");
_frontend_client_cache->init_metrics(DorisMetrics::metrics(), "frontend");
Expand All @@ -118,6 +120,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
exit(-1);
}
_broker_mgr->init();
_small_file_mgr->init();
_init_mem_tracker();
RETURN_IF_ERROR(_tablet_writer_mgr->start_bg_worker());
return Status::OK;
Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/pull_load_task_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ PullLoadTaskMgr::~PullLoadTaskMgr() {
Status PullLoadTaskMgr::init() {
auto st = load_task_ctxes();
if (!st.ok()) {
LOG(WARNING) << "Load task from directory failed. because " << st.get_error_msg();
_dir_exist = false;
}
return Status::OK;
}

Status PullLoadTaskMgr::load_task_ctxes() {
/*
// 1. scan all files
std::vector<std::string> files;
RETURN_IF_ERROR(FileUtils::scan_dir(_path, &files));
Expand All @@ -141,8 +141,9 @@ Status PullLoadTaskMgr::load_task_ctxes() {
<< ", status:" << status.get_error_msg();
}
}
*/

return Status::OK;
return Status("Not implemented");
}

Status PullLoadTaskMgr::load_task_ctx(const std::string& file_path) {
Expand Down
111 changes: 104 additions & 7 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "common/status.h"
#include "service/backend_options.h"
#include "runtime/small_file_mgr.h"
#include "util/defer_op.h"
#include "util/stopwatch.hpp"
#include "util/uid_util.h"
Expand Down Expand Up @@ -52,9 +53,17 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {

std::string errstr;
auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
if (conf->set(conf_key, conf_val, errstr) != RdKafka::Conf::CONF_OK) {
RdKafka::Conf::ConfResult res = conf->set(conf_key, conf_val, errstr);
if (res == RdKafka::Conf::CONF_UNKNOWN) {
// ignore unknown config
return Status::OK;
} else if (errstr.find("not supported") != std::string::npos) {
// some java-only properties may be passed to here, and librdkafak will return 'xxx' not supported
// ignore it
return Status::OK;
} else if (res != RdKafka::Conf::CONF_OK) {
std::stringstream ss;
ss << "failed to set '" << conf_key << "'";
ss << "PAUSE: failed to set '" << conf_key << "', value: '" << conf_val << "', err: " << errstr;
LOG(WARNING) << ss.str();
return Status(ss.str());
}
Expand All @@ -73,21 +82,40 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0"));

for (auto& item : ctx->kafka_info->properties) {
RETURN_IF_ERROR(set_conf(item.first, item.second));
if (boost::algorithm::starts_with(item.second, "FILE:")) {
// file property should has format: FILE:file_id:md5
std::vector<std::string> parts;
boost::split(parts, item.second, boost::is_any_of(":"));
if (parts.size() != 3) {
return Status("PAUSE: Invalid file property of kafka: " + item.second);
}
int64_t file_id = std::stol(parts[1]);
std::string file_path;
Status st = ctx->exec_env()->small_file_mgr()->get_file(file_id, parts[2], &file_path);
if (!st.ok()) {
std::stringstream ss;
ss << "PAUSE: failed to get file for config: " << item.first << ", error: " << st.get_error_msg();
return Status(ss.str());
}
RETURN_IF_ERROR(set_conf(item.first, file_path));
} else {
RETURN_IF_ERROR(set_conf(item.first, item.second));
}
_custom_properties.emplace(item.first, item.second);
}

if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
std::stringstream ss;
ss << "failed to set 'event_cb'";
ss << "PAUSE: failed to set 'event_cb'";
LOG(WARNING) << ss.str();
return Status(ss.str());
}

// create consumer
_k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
if (!_k_consumer) {
LOG(WARNING) << "failed to create kafka consumer";
return Status("failed to create kafka consumer");
LOG(WARNING) << "PAUSE: failed to create kafka consumer: " << errstr;
return Status("PAUSE: failed to create kafka consumer: " + errstr);
}

VLOG(3) << "finished to init kafka consumer. " << ctx->brief();
Expand Down Expand Up @@ -174,7 +202,7 @@ Status KafkaDataConsumer::group_consume(
case RdKafka::ERR__TIMED_OUT:
// leave the status as OK, because this may happend
// if there is no data in kafka.
LOG(WARNING) << "kafka consume timeout: " << _id;
LOG(INFO) << "kafka consume timeout: " << _id;
break;
default:
LOG(WARNING) << "kafka consume failed: " << _id
Expand All @@ -199,6 +227,66 @@ Status KafkaDataConsumer::group_consume(
return st;
}

Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids) {
// create topic conf
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
auto conf_deleter = [tconf] () { delete tconf; };
DeferOp delete_conf(std::bind<void>(conf_deleter));

// create topic
std::string errstr;
RdKafka::Topic *topic = RdKafka::Topic::create(_k_consumer, _topic, tconf, errstr);
if (topic == nullptr) {
std::stringstream ss;
ss << "failed to create topic: " << errstr;
LOG(WARNING) << ss.str();
return Status(ss.str());
}
auto topic_deleter = [topic] () { delete topic; };
DeferOp delete_topic(std::bind<void>(topic_deleter));

// get topic metadata
RdKafka::Metadata* metadata = nullptr;
RdKafka::ErrorCode err = _k_consumer->metadata(true/* for this topic */, topic, &metadata, 5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::stringstream ss;
ss << "failed to get partition meta: " << RdKafka::err2str(err);
LOG(WARNING) << ss.str();
return Status(ss.str());
}
auto meta_deleter = [metadata] () { delete metadata; };
DeferOp delete_meta(std::bind<void>(meta_deleter));

// get partition ids
RdKafka::Metadata::TopicMetadataIterator it;
for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {
if ((*it)->topic() != _topic) {
continue;
}

if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
std::stringstream ss;
ss << "error: " << err2str((*it)->err());
if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
ss << ", try again";
}
LOG(WARNING) << ss.str();
return Status(ss.str());
}

RdKafka::TopicMetadata::PartitionMetadataIterator ip;
for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {
partition_ids->push_back((*ip)->id());
}
}

if (partition_ids->empty()) {
return Status("no partition in this topic");
}

return Status::OK;
}

Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) {
std::unique_lock<std::mutex> l(_lock);
if (!_init) {
Expand All @@ -225,6 +313,15 @@ bool KafkaDataConsumer::match(StreamLoadContext* ctx) {
if (_brokers != ctx->kafka_info->brokers || _topic != ctx->kafka_info->topic) {
return false;
}
// check properties
if (_custom_properties.size() != ctx->kafka_info->properties.size()) {
return false;
}
for (auto& item : ctx->kafka_info->properties) {
if (_custom_properties.find(item.first) == _custom_properties.end()) {
return false;
}
}
return true;
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/routine_load/data_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <ctime>
#include <mutex>
#include <unordered_map>

#include "librdkafka/rdkafkacpp.h"

Expand Down Expand Up @@ -141,9 +142,13 @@ class KafkaDataConsumer : public DataConsumer {
// start the consumer and put msgs to queue
Status group_consume(BlockingQueue<RdKafka::Message*>* queue, int64_t max_running_time_ms);

// get the partitions ids of the topic
Status get_partition_meta(std::vector<int32_t>* partition_ids);

private:
std::string _brokers;
std::string _topic;
std::unordered_map<std::string, std::string> _custom_properties;

KafkaEventCb _k_event_cb;
RdKafka::KafkaConsumer* _k_consumer = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/routine_load/data_consumer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Status DataConsumerPool::get_consumer(
break;
default:
std::stringstream ss;
ss << "unknown routine load task type: " << ctx->load_type;
ss << "PAUSE: unknown routine load task type: " << ctx->load_type;
return Status(ss.str());
}

Expand All @@ -66,7 +66,7 @@ Status DataConsumerPool::get_consumer_grp(
StreamLoadContext* ctx,
std::shared_ptr<DataConsumerGroup>* ret) {
if (ctx->load_src_type != TLoadSourceType::KAFKA) {
return Status("Currently nly support consumer group for Kafka data source");
return Status("PAUSE: Currently only support consumer group for Kafka data source");
}
DCHECK(ctx->kafka_info);

Expand Down
Loading

0 comments on commit ff0dd0d

Please sign in to comment.