Skip to content

Commit

Permalink
Add map of dest id to src id
Browse files Browse the repository at this point in the history
  • Loading branch information
EmmyMiao87 committed Jun 10, 2019
1 parent 68657bd commit e22ee9f
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 25 deletions.
38 changes: 25 additions & 13 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Status BrokerScanner::init_expr_ctxes() {
return Status(ss.str());
}

bool has_transform_slot_ids = _params.__isset.transform_slot_ids;
bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans;
for (auto slot_desc : _dest_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
Expand All @@ -135,13 +135,26 @@ Status BrokerScanner::init_expr_ctxes() {
RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker.get()));
RETURN_IF_ERROR(ctx->open(_state));
_dest_expr_ctx.emplace_back(ctx);
if (has_transform_slot_ids) {
auto it = _params.transform_slot_ids.find(slot_desc->id());
if (it == std::end(_params.transform_slot_ids)) {
_has_expr_columns.emplace_back(false);
if (has_slot_id_map) {
auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) {
_src_slot_index.emplace_back(-1);
}
else {
_has_expr_columns.emplace_back(true);
int index = 0;
while (index < _src_slot_descs.size()) {
auto _src_slot = _src_slot_descs[index];
if (_src_slot->id() == it->second) {
_src_slot_index.emplace_back(index);
break;
}
index++;
}
if (index == _src_slot_descs.size()) {
std::stringstream ss;
ss << "No src slot " << it->second << " in src slot descs";
return Status(ss.str());
}
}
}
}
Expand All @@ -162,8 +175,8 @@ Status BrokerScanner::open() {
if (_params.__isset.strict_mode) {
_strict_mode = _params.strict_mode;
}
if (_strict_mode && !_params.__isset.transform_slot_ids) {
return Status("Expr column list must be set in strict mode");
if (_strict_mode && !_params.__isset.dest_sid_to_src_sid_without_trans) {
return Status("Slot map of dest to src must be set in strict mode");
}

return Status::OK;
Expand Down Expand Up @@ -602,11 +615,12 @@ bool BrokerScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPoo
continue;
}

ExprContext* ctx = _dest_expr_ctx[ctx_idx];
int dest_index = ctx_idx++;
ExprContext* ctx = _dest_expr_ctx[dest_index];
void* value = ctx->get_value(_src_tuple_row);
if (value == nullptr) {
if (_strict_mode && !_src_tuple->is_null(slot_desc->null_indicator_offset())
&& !_has_expr_columns[ctx_idx]) {
if (_strict_mode && (_src_slot_index[dest_index] != -1)
&& !_src_tuple->is_null(_src_slot_descs[_src_slot_index[ctx_idx]]->null_indicator_offset())) {
std::stringstream error_msg;
error_msg << "column(" << slot_desc->col_name() << ") value is incorrect "
<< "while strict mode is " << std::boolalpha << _strict_mode;
Expand All @@ -625,13 +639,11 @@ bool BrokerScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPoo
return false;
}
dest_tuple->set_null(slot_desc->null_indicator_offset());
ctx_idx++;
continue;
}
dest_tuple->set_not_null(slot_desc->null_indicator_offset());
void* slot = dest_tuple->get_slot(slot_desc->tuple_offset());
RawValue::write(value, slot, slot_desc->type(), mem_pool);
ctx_idx++;
continue;
}
return true;
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ class BrokerScanner {
// Dest tuple descriptor and dest expr context
const TupleDescriptor* _dest_tuple_desc;
std::vector<ExprContext*> _dest_expr_ctx;
std::vector<bool> _has_expr_columns;
// the map values of dest slot id to src slot index
// if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to -1
std::vector<int> _src_slot_index;

// used to hold current StreamLoadPipe
std::shared_ptr<StreamLoadPipe> _stream_load_pipe;
Expand Down
7 changes: 3 additions & 4 deletions fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ private void initParams(ParamCreateContext context) throws AnalysisException, Us
private void finalizeParams(ParamCreateContext context) throws UserException, AnalysisException {
Map<String, SlotDescriptor> slotDescByName = context.slotDescByName;
Map<String, Expr> exprMap = context.exprMap;
Set<Integer> transformSlotIds = Sets.newHashSet();
Map<Integer, Integer> destSidToSrcSidWithoutTrans = Maps.newHashMap();
// Analyze expr map
if (exprMap != null) {
for (Map.Entry<String, Expr> entry : exprMap.entrySet()) {
Expand Down Expand Up @@ -418,6 +418,7 @@ private void finalizeParams(ParamCreateContext context) throws UserException, An
if (expr == null) {
SlotDescriptor srcSlotDesc = slotDescByName.get(destSlotDesc.getColumn().getName());
if (srcSlotDesc != null) {
destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt());
// If dest is allow null, we set source to nullable
if (destSlotDesc.getColumn().isAllowNull()) {
srcSlotDesc.setIsNullable(true);
Expand All @@ -436,8 +437,6 @@ private void finalizeParams(ParamCreateContext context) throws UserException, An
}
}
}
} else {
transformSlotIds.add(destSlotDesc.getId().asInt());
}

if (isNegative && destSlotDesc.getColumn().getAggregationType() == AggregateType.SUM) {
Expand All @@ -447,7 +446,7 @@ private void finalizeParams(ParamCreateContext context) throws UserException, An
expr = castToSlot(destSlotDesc, expr);
context.params.putToExpr_of_dest_slot(destSlotDesc.getId().asInt(), expr.treeToThrift());
}
context.params.setTransform_slot_ids(transformSlotIds);
context.params.setDest_sid_to_src_sid_without_trans(destSidToSrcSidWithoutTrans);
context.params.setDest_tuple_id(desc.getId().asInt());
context.params.setStrict_mode(strictMode);
// Need re compute memory layout after set some slot descriptor to nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void finalize(Analyzer analyzer) throws UserException, UserException {

private void finalizeParams() throws UserException {
boolean negative = streamLoadTask.getNegative();
Set<Integer> transformSlotIds = Sets.newHashSet();
Map<Integer, Integer> destSidToSrcSidWithoutTrans = Maps.newHashMap();
for (SlotDescriptor dstSlotDesc : desc.getSlots()) {
if (!dstSlotDesc.isMaterialized()) {
continue;
Expand All @@ -254,6 +254,7 @@ private void finalizeParams() throws UserException {
if (expr == null) {
SlotDescriptor srcSlotDesc = slotDescByName.get(dstSlotDesc.getColumn().getName());
if (srcSlotDesc != null) {
destSidToSrcSidWithoutTrans.put(srcSlotDesc.getId().asInt(), dstSlotDesc.getId().asInt());
// If dest is allow null, we set source to nullable
if (dstSlotDesc.getColumn().isAllowNull()) {
srcSlotDesc.setIsNullable(true);
Expand All @@ -271,8 +272,6 @@ private void finalizeParams() throws UserException {
}
}
}
} else {
transformSlotIds.add(dstSlotDesc.getId().asInt());
}
// check hll_hash
if (dstSlotDesc.getType().getPrimitiveType() == PrimitiveType.HLL) {
Expand All @@ -294,7 +293,7 @@ private void finalizeParams() throws UserException {
expr = castToSlot(dstSlotDesc, expr);
brokerScanRange.params.putToExpr_of_dest_slot(dstSlotDesc.getId().asInt(), expr.treeToThrift());
}
brokerScanRange.params.setTransform_slot_ids(transformSlotIds);
brokerScanRange.params.setDest_sid_to_src_sid_without_trans(destSidToSrcSidWithoutTrans);
brokerScanRange.params.setDest_tuple_id(desc.getId().asInt());
// LOG.info("brokerScanRange is {}", brokerScanRange);

Expand Down
6 changes: 3 additions & 3 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ struct TBrokerScanRangeParams {
// If partition_ids is set, data that doesn't in this partition will be filtered.
8: optional list<i64> partition_ids

// transform slot isd is a TSlotId list
// the member of list will be transform to slot type by expr
9: optional set<Types.TSlotId> transform_slot_ids
// This is the mapping of dest slot id and src slot id in load expr
// It excludes the slot id which has the transform expr
9: optional map<Types.TSlotId, Types.TSlotId> dest_sid_to_src_sid_without_trans
// strictMode is a boolean
// if strict mode is true, the incorrect data (the result of cast is null) will not be loaded
10: optional bool strict_mode
Expand Down

0 comments on commit e22ee9f

Please sign in to comment.