Skip to content

Commit

Permalink
add fe ut
Browse files Browse the repository at this point in the history
  • Loading branch information
LiBinfeng-01 committed Nov 14, 2024
1 parent 72cbd43 commit 2d40ee2
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ createRoutineLoad
(WITH (APPEND | DELETE | MERGE))?
(loadProperty (COMMA loadProperty)*)? propertyClause? FROM type=identifier
LEFT_PAREN customProperties=propertyItemList RIGHT_PAREN
commentSpec? #createRoutineLoadAlias
commentSpec?
;

unsupportedLoadStatement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,9 @@ public CreateRoutineLoadStmt(LabelName labelName, List<ParseNode> loadPropertyLi
String> jobProperties, String typeName, RoutineLoadDesc routineLoadDesc, int desireTaskConcurrentNum,
long maxErrorNum, double maxFilterRatio, long maxBatchIntervalS, long maxBatchRows, long maxBatchSizeBytes,
long execMemLimit, int sendBatchParallelism, String timezone, String format, String jsonPaths,
String jsonRoot, byte enclose, byte escape, long workloadGroupId,
boolean loadToSingleTablet, boolean strictMode, boolean isPartialUpdate,
boolean stripOuterArray, boolean numAsString, boolean fuzzyParse) {
String jsonRoot, byte enclose, byte escape, long workloadGroupId, boolean loadToSingleTablet,
boolean strictMode, boolean isPartialUpdate, boolean stripOuterArray, boolean numAsString,
boolean fuzzyParse, AbstractDataSourceProperties dataSourceProperties) {
this.labelName = labelName;
this.loadPropertyList = loadPropertyList;
this.jobProperties = jobProperties;
Expand All @@ -265,6 +265,7 @@ public CreateRoutineLoadStmt(LabelName labelName, List<ParseNode> loadPropertyLi
this.stripOuterArray = stripOuterArray;
this.numAsString = numAsString;
this.fuzzyParse = fuzzyParse;
this.dataSourceProperties = dataSourceProperties;
}

public String getName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.doris.nereids.DorisParser.ConstantContext;
import org.apache.doris.nereids.DorisParser.CreateMTMVContext;
import org.apache.doris.nereids.DorisParser.CreateProcedureContext;
import org.apache.doris.nereids.DorisParser.CreateRoutineLoadContext;
import org.apache.doris.nereids.DorisParser.CreateRowPolicyContext;
import org.apache.doris.nereids.DorisParser.CreateTableContext;
import org.apache.doris.nereids.DorisParser.CreateTableLikeContext;
Expand Down Expand Up @@ -138,6 +139,7 @@
import org.apache.doris.nereids.DorisParser.LateralViewContext;
import org.apache.doris.nereids.DorisParser.LessThanPartitionDefContext;
import org.apache.doris.nereids.DorisParser.LimitClauseContext;
import org.apache.doris.nereids.DorisParser.LoadPropertyContext;
import org.apache.doris.nereids.DorisParser.LogicalBinaryContext;
import org.apache.doris.nereids.DorisParser.LogicalNotContext;
import org.apache.doris.nereids.DorisParser.MapLiteralContext;
Expand Down Expand Up @@ -1357,7 +1359,7 @@ public LogicalSubQueryAlias<Plan> visitAliasQuery(AliasQueryContext ctx) {
/**
* process LoadProperty in routine load
*/
public LoadProperty visitLoadProperty(DorisParser.LoadPropertyContext ctx) {
public LoadProperty visitLoadProperty(LoadPropertyContext ctx) {
LoadProperty loadProperty = null;
if (ctx instanceof SeparatorContext) {
loadProperty = new LoadSeparator(((SeparatorContext) ctx).STRING_LITERAL().getText());
Expand Down Expand Up @@ -1391,7 +1393,7 @@ public LoadProperty visitLoadProperty(DorisParser.LoadPropertyContext ctx) {
}

@Override
public Command visitCreateRoutineLoadCommand(DorisParser.CreateRoutineLoadAliasContext ctx) {
public LogicalPlan visitCreateRoutineLoad(CreateRoutineLoadContext ctx) {
List<String> labelParts = visitMultipartIdentifier(ctx.label);
String labelName = null;
String labelDbName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public class CreateRoutineLoadInfo {
/**
* constructor for create table
*/
public CreateRoutineLoadInfo(LabelName labelName, String tableName, Map<String, LoadProperty> loadPropertyList,
public CreateRoutineLoadInfo(LabelName labelName, String tableName, Map<String, LoadProperty> loadPropertyMap,
Map<String, String> jobProperties, String typeName,
Map<String, String> dataSourceProperties, LoadTask.MergeType mergeType,
String comment) {
Expand All @@ -198,7 +198,7 @@ public CreateRoutineLoadInfo(LabelName labelName, String tableName, Map<String,
this.isMultiTable = true;
}
this.tableName = tableName;
this.loadPropertyMap = loadPropertyList;
this.loadPropertyMap = loadPropertyMap;
this.jobProperties = jobProperties == null ? Maps.newHashMap() : jobProperties;
this.typeName = typeName.toUpperCase();
this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory
Expand Down Expand Up @@ -437,11 +437,16 @@ private void checkDataSourceProperties() throws UserException {
this.dataSourceProperties.analyze();
}

/**
* make legacy create routine load statement after validate by nereids
* @return legacy create routine load statement
*/
public CreateRoutineLoadStmt translateToLegacyStmt() {
return new CreateRoutineLoadStmt(labelName, null, jobProperties, typeName, routineLoadDesc,
desiredConcurrentNum, maxErrorNum, maxFilterRatio, maxBatchIntervalS, maxBatchRows, maxBatchSizeBytes,
execMemLimit, sendBatchParallelism, timezone, format, jsonPaths, jsonRoot, enclose, escape, workloadGroupId,
loadToSingleTablet, strictMode, isPartialUpdate, stripOuterArray, numAsString, fuzzyParse
loadToSingleTablet, strictMode, isPartialUpdate, stripOuterArray, numAsString, fuzzyParse,
dataSourceProperties
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import org.apache.doris.load.routineload.LoadDataSourceType;
import org.apache.doris.load.routineload.kafka.KafkaConfiguration;
import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties;
import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
import org.apache.doris.nereids.trees.plans.commands.load.LoadPartitionNames;
import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty;
import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

Expand All @@ -44,6 +48,7 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -208,6 +213,100 @@ public void analyze(Analyzer analyzer1) {
Assert.assertEquals("+08:00", createRoutineLoadStmt.getTimezone());
}

@Test
public void testAnalyzeForNereids(@Injectable Analyzer analyzer,
@Injectable SessionVariable sessionVariable) throws UserException {
String jobName = "job1";
String dbName = "db1";
LabelName labelName = new LabelName(dbName, jobName);
String tableNameString = "table1";
String topicName = "topic1";
String serverAddress = "127.0.0.1:8080";
String kafkaPartitionString = "1,2,3";
String timeZone = "8:00";
List<String> partitionNameString = Lists.newArrayList();
partitionNameString.add("p1");
LoadPartitionNames loadPartitionNames = new LoadPartitionNames(false, partitionNameString);
LoadSeparator loadColumnSeparator = new LoadSeparator(",");

// duplicate load property
Map<String, LoadProperty> loadPropertyMap = new HashMap<>();
loadPropertyMap.put("seperator", loadPartitionNames);
loadPropertyMap.put("partitionNames", loadColumnSeparator);

PartitionNames partitionNames = new PartitionNames(false, partitionNameString);
Separator columnSeparator = new Separator(",");

// duplicate load property
List<ParseNode> loadPropertyList = new ArrayList<>();
loadPropertyList.add(columnSeparator);
loadPropertyList.add(partitionNames);

Map<String, String> properties = Maps.newHashMap();
properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
properties.put(LoadStmt.TIMEZONE, timeZone);
String typeName = LoadDataSourceType.KAFKA.name();
Map<String, String> customProperties = Maps.newHashMap();

customProperties.put(KafkaConfiguration.KAFKA_TOPIC.getName(), topicName);
customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), serverAddress);
customProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), kafkaPartitionString);

CreateRoutineLoadInfo createRoutineLoadInfo = new CreateRoutineLoadInfo(labelName, tableNameString,
loadPropertyMap, properties,
typeName, customProperties,
LoadTask.MergeType.APPEND, "");

new Expectations() {
{
ctx.getSessionVariable();
result = sessionVariable;
sessionVariable.getSendBatchParallelism();
result = 1;

sessionVariable.getTimeZone();
result = "Asia/Hong_Kong";
}
};
createRoutineLoadInfo.validate(analyzer.getContext());
CreateRoutineLoadStmt createRoutineLoadStmtNereids = createRoutineLoadInfo.translateToLegacyStmt();

CreateRoutineLoadStmt createRoutineLoadStmtOrigin = new CreateRoutineLoadStmt(labelName, tableNameString,
loadPropertyList, properties,
typeName, customProperties,
LoadTask.MergeType.APPEND, "");
new MockUp<StatementBase>() {
@Mock
public void analyze(Analyzer analyzer1) {
return;
}
};

new Expectations() {
{
ctx.getSessionVariable();
result = sessionVariable;
sessionVariable.getSendBatchParallelism();
result = 1;

sessionVariable.getTimeZone();
result = "Asia/Hong_Kong";
}
};

createRoutineLoadStmtOrigin.analyze(analyzer);

Assert.assertNotNull(createRoutineLoadStmtNereids.getRoutineLoadDesc());
Assert.assertEquals(createRoutineLoadStmtOrigin.getRoutineLoadDesc().getColumnSeparator().getSeparator(), createRoutineLoadStmtNereids.getRoutineLoadDesc().getColumnSeparator().getSeparator());
Assert.assertEquals(createRoutineLoadStmtOrigin.getRoutineLoadDesc().getPartitionNames().getPartitionNames(), createRoutineLoadStmtNereids.getRoutineLoadDesc().getPartitionNames().getPartitionNames());
Assert.assertEquals(2, createRoutineLoadStmtNereids.getDesiredConcurrentNum());
Assert.assertEquals(0, createRoutineLoadStmtNereids.getMaxErrorNum());
KafkaDataSourceProperties kafkaDataSourceProperties = (KafkaDataSourceProperties) createRoutineLoadStmtNereids.getDataSourceProperties();
Assert.assertEquals(serverAddress, kafkaDataSourceProperties.getBrokerList());
Assert.assertEquals(topicName, kafkaDataSourceProperties.getTopic());
Assert.assertEquals("+08:00", createRoutineLoadStmtNereids.getTimezone());
}

@Test
public void testMultiTableAnalyze(@Injectable Analyzer analyzer,
@Injectable SessionVariable sessionVariable) throws UserException {
Expand Down

0 comments on commit 2d40ee2

Please sign in to comment.