From 2f13bb930c7894e6561c1ccb289ddd3e23a33617 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 26 Sep 2024 18:21:04 +0800 Subject: [PATCH 1/5] [feat](job)Implementing Job using antlr4 --- .../org/apache/doris/nereids/DorisParser.g4 | 25 +- .../apache/doris/analysis/CreateJobStmt.java | 1 + .../doris/cloud/rpc/MetaServiceClient.java | 2 +- .../job/extensions/insert/InsertJob.java | 4 +- .../apache/doris/job/task/AbstractTask.java | 1 - .../nereids/parser/LogicalPlanBuilder.java | 22 ++ .../doris/nereids/trees/plans/PlanType.java | 1 + .../plans/commands/CreateJobCommand.java | 73 +++++ .../plans/commands/info/CreateJobInfo.java | 263 ++++++++++++++++++ .../insert/InsertIntoTableCommand.java | 14 +- .../trees/plans/visitor/CommandVisitor.java | 5 + .../suites/job_p0/test_base_insert_job.groovy | 59 ++-- 12 files changed, 437 insertions(+), 33 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 8eb6720048186b..c5f97ab771c6d6 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -50,6 +50,7 @@ statementBase | supportedCreateStatement #supportedCreateStatementAlias | supportedAlterStatement #supportedAlterStatementAlias | materializedViewStatement #materializedViewStatementAlias + | jobScheduleStatement #jobScheduleStatementAlias | constraintStatement #constraintStatementAlias | supportedDropStatement #supportedDropStatementAlias | unsupportedStatement #unsupported @@ -102,7 +103,17 @@ materializedViewStatement | CANCEL MATERIALIZED VIEW TASK taskId=INTEGER_VALUE ON mvName=multipartIdentifier #cancelMTMVTask | SHOW CREATE MATERIALIZED VIEW mvName=multipartIdentifier #showCreateMTMV ; - +jobScheduleStatement + : CREATE JOB label=multipartIdentifier ON SCHEDULE + ( + (EVERY timeInterval=INTEGER_VALUE timeUnit=identifier + (STARTS (startTime=STRING_LITERAL | CURRENT_TIMESTAMP))? + (ENDS endsTime=STRING_LITERAL)?) + | + (AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP))) + commentSpec? + DO supportedDmlStatement #createScheduledJob + ; constraintStatement : ALTER TABLE table=multipartIdentifier ADD CONSTRAINT constraintName=errorCapturingIdentifier @@ -413,16 +424,8 @@ unsupportedCleanStatement ; unsupportedJobStatement - : CREATE JOB label=multipartIdentifier ON SCHEDULE - ( - (EVERY timeInterval=INTEGER_VALUE timeUnit=identifier - (STARTS (startTime=STRING_LITERAL | CURRENT_TIMESTAMP))? - (ENDS endsTime=STRING_LITERAL)?) - | - (AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP))) - commentSpec? - DO statement #createJob - | PAUSE JOB wildWhere? #pauseJob + + : PAUSE JOB wildWhere? #pauseJob | DROP JOB (IF EXISTS)? wildWhere? #dropJob | RESUME JOB wildWhere? #resumeJob | CANCEL TASK wildWhere? #cancelJobTask diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index 0fff1e097497ea..8babb665299a71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -60,6 +60,7 @@ * quantity { DAY | HOUR | MINUTE | * WEEK | SECOND } */ +@Deprecated @Slf4j public class CreateJobStmt extends DdlStmt implements NotFallbackInParser { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index 323b880a3a7a79..c4d28fb3bc256c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -79,7 +79,7 @@ private long connectionAgeExpiredAt() { if (!isMetaServiceEndpointList && connectionAgeBase > 1) { long base = TimeUnit.MINUTES.toMillis(connectionAgeBase); long now = System.currentTimeMillis(); - long rand = random.nextLong(base); + long rand = random.nextLong() % base; return now + base + rand; } return Long.MAX_VALUE; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 43f43ba86997cf..487591efc04745 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -299,7 +299,9 @@ public void cancelTaskById(long taskId) throws JobException { @Override public void cancelAllTasks() throws JobException { try { - checkAuth("CANCEL LOAD"); + if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { + checkAuth("CANCEL LOAD"); + } super.cancelAllTasks(); this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"); } catch (DdlException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index f78446aaf85cbf..a29878a97a3c1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -141,7 +141,6 @@ public void cancel() throws JobException { executeCancelLogic(); } catch (Exception e) { log.warn("cancel task failed, job id is {}, task id is {}", jobId, taskId, e); - throw new JobException(e); } finally { closeOrReleaseResources(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index d5a48f8e1129c6..094488acec2d3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -376,6 +376,7 @@ import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.Constraint; +import org.apache.doris.nereids.trees.plans.commands.CreateJobCommand; import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand; import org.apache.doris.nereids.trees.plans.commands.CreateProcedureCommand; @@ -414,6 +415,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc; import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo; import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; +import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateTableLikeInfo; @@ -565,6 +567,26 @@ public LogicalPlan visitStatementDefault(StatementDefaultContext ctx) { return withExplain(plan, ctx.explain()); } + @Override + public LogicalPlan visitCreateScheduledJob(DorisParser.CreateScheduledJobContext ctx) { + Optional label = ctx.label == null ? Optional.empty() : Optional.of(ctx.label.getText()); + Optional atTime = ctx.atTime == null ? Optional.empty() : Optional.of(ctx.atTime.getText()); + Optional immediateStartOptional = ctx.CURRENT_TIMESTAMP() == null ? Optional.of(false) : + Optional.of(true); + Optional startTime = ctx.startTime == null ? Optional.empty() : Optional.of(ctx.startTime.getText()); + Optional endsTime = ctx.endsTime == null ? Optional.empty() : Optional.of(ctx.endsTime.getText()); + Optional interval = ctx.timeInterval == null ? Optional.empty() : + Optional.of(Long.valueOf(ctx.timeInterval.getText())); + Optional intervalUnit = ctx.timeUnit == null ? Optional.empty() : Optional.of(ctx.timeUnit.getText()); + String commentSpec = ctx.commentSpec() == null ? "''" : ctx.commentSpec().STRING_LITERAL().getText(); + String comment = + LogicalPlanBuilderAssistant.escapeBackSlash(commentSpec.substring(1, commentSpec.length() - 1)); + String executeSql = getOriginSql(ctx.supportedDmlStatement()); + CreateJobInfo createJobInfo = new CreateJobInfo(label, atTime, interval, intervalUnit, startTime, + endsTime, immediateStartOptional, comment, executeSql); + return new CreateJobCommand(createJobInfo); + } + @Override public LogicalPlan visitInsertTable(InsertTableContext ctx) { boolean isOverwrite = ctx.INTO() == null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 42cdc0b7d9d267..73ea61fcface45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -143,6 +143,7 @@ public enum PlanType { SELECT_INTO_OUTFILE_COMMAND, UPDATE_COMMAND, CREATE_MTMV_COMMAND, + CREATE_JOB_COMMAND, ALTER_MTMV_COMMAND, ADD_CONSTRAINT_COMMAND, DROP_CONSTRAINT_COMMAND, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java new file mode 100644 index 00000000000000..fecd457ada56eb --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.analysis.StmtType; +import org.apache.doris.catalog.Env; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +/** + * syntax: + * CREATE + * [DEFINER = user] + * JOB + * event_name + * ON SCHEDULE schedule + * [COMMENT 'string'] + * DO event_body; + * schedule: { + * [STREAMING] AT timestamp + * | EVERY interval + * [STARTS timestamp ] + * [ENDS timestamp ] + * } + * interval: + * quantity { DAY | HOUR | MINUTE | + * WEEK | SECOND } + */ +public class CreateJobCommand extends Command implements ForwardWithSync { + + private CreateJobInfo createJobInfo; + + public CreateJobCommand(CreateJobInfo jobInfo) { + super(PlanType.CREATE_JOB_COMMAND); + this.createJobInfo = jobInfo; + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + AbstractJob job = createJobInfo.analyzeAndBuildJobInfo(ctx); + Env.getCurrentEnv().getJobManager().registerJob(job); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitCreateJobCommand(this, context); + } + + @Override + public StmtType stmtType() { + return StmtType.CREATE; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java new file mode 100644 index 00000000000000..6cef7ee89ec960 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.info; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.base.JobExecuteType; +import org.apache.doris.job.base.JobExecutionConfiguration; +import org.apache.doris.job.base.TimerDefinition; +import org.apache.doris.job.common.IntervalUnit; +import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.extensions.insert.InsertJob; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Strings; + +import java.util.Optional; + +/** + * Build job info and analyze the SQL statement to create a job. + */ +public class CreateJobInfo { + + // exclude job name prefix, which is used by inner job + private static final String excludeJobNamePrefix = "inner_"; + + private final Optional labelNameOptional; + + private final Optional onceJobStartTimestampOptional; + + private final Optional intervalOptional; + + private final Optional intervalTimeUnitOptional; + + private final Optional startsTimeStampOptional; + + private final Optional endsTimeStampOptional; + + private final Optional immediateStartOptional; + + private final String comment; + + private final String executeSql; + + /** + * Constructor for CreateJobInfo. + * + * @param labelNameOptional Job name. + * @param onceJobStartTimestampOptional Start time for a one-time job. + * @param intervalOptional Interval for a recurring job. + * @param intervalTimeUnitOptional Interval time unit for a recurring job. + * @param startsTimeStampOptional Start time for a recurring job. + * @param endsTimeStampOptional End time for a recurring job. + * @param immediateStartOptional Immediate start for a job. + * @param comment Comment for the job. + * @param executeSql Original SQL statement. + */ + public CreateJobInfo(Optional labelNameOptional, Optional onceJobStartTimestampOptional, + Optional intervalOptional, Optional intervalTimeUnitOptional, + Optional startsTimeStampOptional, Optional endsTimeStampOptional, + Optional immediateStartOptional, String comment, String executeSql) { + this.labelNameOptional = labelNameOptional; + this.onceJobStartTimestampOptional = onceJobStartTimestampOptional; + this.intervalOptional = intervalOptional; + this.intervalTimeUnitOptional = intervalTimeUnitOptional; + this.startsTimeStampOptional = startsTimeStampOptional; + this.endsTimeStampOptional = endsTimeStampOptional; + this.immediateStartOptional = immediateStartOptional; + this.comment = comment; + this.executeSql = executeSql; + + } + + /** + * Analyzes the provided SQL statement and builds the job information. + * + * @param ctx Connect context. + * @return AbstractJob instance. + * @throws UserException If there is an error during SQL analysis or job creation. + */ + public AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws UserException { + checkAuth(); + if (labelNameOptional.orElseThrow(() -> new AnalysisException("labelName is null")).isEmpty()) { + throw new AnalysisException("Job name can not be empty"); + } + + String jobName = labelNameOptional.get(); + checkJobName(jobName); + String dbName = ctx.getDatabase(); + + Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName); + // check its insert stmt,currently only support insert stmt + JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); + JobExecuteType executeType = intervalOptional.isPresent() ? JobExecuteType.RECURRING : JobExecuteType.ONE_TIME; + jobExecutionConfiguration.setExecuteType(executeType); + TimerDefinition timerDefinition = new TimerDefinition(); + + if (executeType.equals(JobExecuteType.ONE_TIME)) { + buildOnceJob(timerDefinition, jobExecutionConfiguration); + } else { + buildRecurringJob(timerDefinition, jobExecutionConfiguration); + } + jobExecutionConfiguration.setTimerDefinition(timerDefinition); + return analyzeAndCreateJob(executeSql, dbName, jobExecutionConfiguration); + } + + /** + * Builds a TimerDefinition for a once-job. + * + * @param timerDefinition Timer definition to be built. + * @param jobExecutionConfiguration Job execution configuration. + * @throws AnalysisException If the job is not configured correctly. + */ + private void buildOnceJob(TimerDefinition timerDefinition, + JobExecutionConfiguration jobExecutionConfiguration) throws AnalysisException { + if (immediateStartOptional.isPresent() && Boolean.TRUE.equals(immediateStartOptional.get())) { + jobExecutionConfiguration.setImmediate(true); + timerDefinition.setStartTimeMs(System.currentTimeMillis()); + return; + } + + // Ensure start time is provided for once jobs. + String startTime = onceJobStartTimestampOptional.orElseThrow(() + -> new AnalysisException("Once time job must set start time")); + timerDefinition.setStartTimeMs(stripQuotesAndParseTimestamp(startTime)); + } + + /** + * Builds a TimerDefinition for a recurring job. + * + * @param timerDefinition Timer definition to be built. + * @param jobExecutionConfiguration Job execution configuration. + * @throws AnalysisException If the job is not configured correctly. + */ + private void buildRecurringJob(TimerDefinition timerDefinition, + JobExecutionConfiguration jobExecutionConfiguration) throws AnalysisException { + // Ensure interval is provided for recurring jobs. + long interval = intervalOptional.orElseThrow(() + -> new AnalysisException("Interval must be set for recurring job")); + timerDefinition.setInterval(interval); + + // Ensure interval time unit is provided for recurring jobs. + String intervalTimeUnit = intervalTimeUnitOptional.orElseThrow(() + -> new AnalysisException("Interval time unit must be set for recurring job")); + IntervalUnit intervalUnit = IntervalUnit.fromString(intervalTimeUnit.toUpperCase()); + if (intervalUnit == null) { + throw new AnalysisException("Invalid interval time unit: " + intervalTimeUnit); + } + + // Check if interval unit is second and disable if not in test mode. + if (intervalUnit.equals(IntervalUnit.SECOND) && !Config.enable_job_schedule_second_for_test) { + throw new AnalysisException("Interval time unit can not be second in production mode"); + } + + timerDefinition.setIntervalUnit(intervalUnit); + + // Set end time if provided. + endsTimeStampOptional.ifPresent(s -> timerDefinition.setEndTimeMs(stripQuotesAndParseTimestamp(s))); + + // Set immediate start if configured. + if (immediateStartOptional.isPresent() && Boolean.TRUE.equals(immediateStartOptional.get())) { + jobExecutionConfiguration.setImmediate(true); + // Avoid immediate re-scheduling by setting start time slightly in the past. + timerDefinition.setStartTimeMs(System.currentTimeMillis() - 100); + return; + } + // Set start time if provided. + startsTimeStampOptional.ifPresent(s -> timerDefinition.setStartTimeMs(stripQuotesAndParseTimestamp(s))); + } + + protected static void checkAuth() throws AnalysisException { + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + } + + /** + * Analyzes the provided SQL statement and creates an appropriate job based on the parsed logical plan. + * Currently, only "InsertIntoTableCommand" is supported for job creation. + * + * @param sql the SQL statement to be analyzed + * @param currentDbName the current database name where the SQL statement will be executed + * @param jobExecutionConfiguration the configuration for job execution + * @return an instance of AbstractJob corresponding to the SQL statement + * @throws UserException if there is an error during SQL analysis or job creation + */ + private AbstractJob analyzeAndCreateJob(String sql, String currentDbName, + JobExecutionConfiguration jobExecutionConfiguration) throws UserException { + NereidsParser parser = new NereidsParser(); + LogicalPlan logicalPlan = parser.parseSingle(sql); + if (logicalPlan instanceof InsertIntoTableCommand) { + InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) logicalPlan; + try { + insertIntoTableCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor(), false); + return new InsertJob(labelNameOptional.get(), + JobStatus.RUNNING, + currentDbName, + comment, + ConnectContext.get().getCurrentUserIdentity(), + jobExecutionConfiguration, + System.currentTimeMillis(), + sql); + } catch (Exception e) { + throw new AnalysisException(e.getMessage()); + } + } else { + throw new AnalysisException("Not support this sql : " + sql + " Command class is " + + logicalPlan.getClass().getName() + "."); + } + } + + private void checkJobName(String jobName) throws AnalysisException { + if (Strings.isNullOrEmpty(jobName)) { + throw new AnalysisException("job name can not be null"); + } + if (jobName.startsWith(excludeJobNamePrefix)) { + throw new AnalysisException("job name can not start with " + excludeJobNamePrefix); + } + } + + /** + * Strips quotes from the input string and parses it to a timestamp. + * + * @param str The input string potentially enclosed in single or double quotes. + * @return The parsed timestamp as a long value, or -1L if the input is null or empty. + */ + public static Long stripQuotesAndParseTimestamp(String str) { + if (str == null || str.isEmpty()) { + return -1L; + } + if (str.startsWith("'") && str.endsWith("'")) { + str = str.substring(1, str.length() - 1); + } else if (str.startsWith("\"") && str.endsWith("\"")) { + str = str.substring(1, str.length() - 1); + } + return TimeUtils.timeStringToLong(str.trim()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 74f75d2d7d5dd9..68718de0f86a5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -123,12 +123,20 @@ public void runWithUpdateInfo(ConnectContext ctx, StmtExecutor executor, runInternal(ctx, executor); } + public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor) throws Exception { + return initPlan(ctx, executor, true); + } + /** * This function is used to generate the plan for Nereids. * There are some load functions that only need to the plan, such as stream_load. * Therefore, this section will be presented separately. + * @param needBeginTransaction whether to start a transaction. + * For external uses such as creating a job, only basic analysis is needed without starting a transaction, + * in which case this can be set to false. */ - public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor) throws Exception { + public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor, + boolean needBeginTransaction) throws Exception { TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx); // check auth if (!Env.getCurrentEnv().getAccessManager() @@ -220,6 +228,10 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor // TODO: support other table types throw new AnalysisException("insert into command only support [olap, hive, iceberg, jdbc] table"); } + if (!needBeginTransaction) { + targetTableIf.readUnlock(); + return insertExecutor; + } if (!insertExecutor.isEmptyInsert()) { insertExecutor.beginTransaction(); insertExecutor.finalizeSink(planner.getFragments().get(0), sink, physicalSink); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index 0763e8fcbfd704..f35e6f8a6400b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -23,6 +23,7 @@ import org.apache.doris.nereids.trees.plans.commands.CallCommand; import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand; import org.apache.doris.nereids.trees.plans.commands.Command; +import org.apache.doris.nereids.trees.plans.commands.CreateJobCommand; import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand; import org.apache.doris.nereids.trees.plans.commands.CreateProcedureCommand; @@ -113,6 +114,10 @@ default R visitCreateMTMVCommand(CreateMTMVCommand createMTMVCommand, C context) return visitCommand(createMTMVCommand, context); } + default R visitCreateJobCommand(CreateJobCommand createJobCommand, C context) { + return visitCommand(createJobCommand, context); + } + default R visitAlterMTMVCommand(AlterMTMVCommand alterMTMVCommand, C context) { return visitCommand(alterMTMVCommand, context); } diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index be744427d88d24..19f4422d64fb01 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -26,6 +26,9 @@ suite("test_base_insert_job") { def tableName = "t_test_BASE_inSert_job" def jobName = "insert_recovery_test_base_insert_job" def jobMixedName = "Insert_recovery_Test_base_insert_job" + sql """ + SET enable_fallback_to_original_planner=false; + """ sql """drop table if exists `${tableName}` force""" sql """ DROP JOB IF EXISTS where jobname = '${jobName}' @@ -70,27 +73,47 @@ suite("test_base_insert_job") { ); """ sql """ - CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + insert into ${tableName} values + ('2023-03-18', 1, 1) + """ + sql """ + CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test' DO INSERT INTO ${tableName} (`timestamp`, `type`, `user_id`) + WITH + tbl_timestamp AS ( + SELECT `timestamp` FROM ${tableName} WHERE user_id = 1 + ), + tbl_type AS ( + SELECT `type` FROM ${tableName} WHERE user_id = 1 + ), + tbl_user_id AS ( + SELECT `user_id` FROM ${tableName} WHERE user_id = 1 + ) + SELECT + tbl_timestamp.`timestamp`, + tbl_type.`type`, + tbl_user_id.`user_id` + FROM + tbl_timestamp, tbl_type, tbl_user_id; """ Awaitility.await().atMost(30, SECONDS).until( { def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='RECURRING' """ println(onceJob) - onceJob .size() == 1 && '1' <= onceJob.get(0).get(0) - + onceJob.size() == 1 && '1' <= onceJob.get(0).get(0) + } - ) + ) sql """ PAUSE JOB where jobname = '${jobName}' """ def tblDatas = sql """select * from ${tableName}""" println tblDatas - assert 3 >= tblDatas.size() >= (2 as Boolean) //at least 2 records, some times 3 records + assert tblDatas.size() >= 2 //at least 2 records def pauseJobId = sql """select id from jobs("type"="insert") where Name='${jobName}'""" def taskStatus = sql """select status from tasks("type"="insert") where jobid= '${pauseJobId.get(0).get(0)}'""" println taskStatus for (int i = 0; i < taskStatus.size(); i++) { - assert taskStatus.get(i).get(0) != "FAILED"||taskStatus.get(i).get(0) != "STOPPED"||taskStatus.get(i).get(0) != "STOPPED" + assert taskStatus.get(i).get(0) != "FAILED" || taskStatus.get(i).get(0) != "STOPPED" || taskStatus.get(i).get(0) != "STOPPED" } sql """ CREATE JOB ${jobMixedName} ON SCHEDULE every 1 second DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); @@ -126,11 +149,11 @@ suite("test_base_insert_job") { CREATE JOB ${jobName} ON SCHEDULE at current_timestamp comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', 2, 1001); """ - Awaitility.await("create-one-time-job-test").atMost(30,SECONDS).until( - { - def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """ - onceJob.size() == 1 && '1' == onceJob.get(0).get(0) - } + Awaitility.await("create-one-time-job-test").atMost(30, SECONDS).until( + { + def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """ + onceJob.size() == 1 && '1' == onceJob.get(0).get(0) + } ) def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """ assert onceJob.size() == 1 @@ -141,7 +164,7 @@ suite("test_base_insert_job") { assert datas.size() == 1 assert datas.get(0).get(0) == "FINISHED" // check table data - def dataCount1 = sql """select count(1) from ${tableName}""" + def dataCount1 = sql """select count(1) from ${tableName} where user_id=1001""" assert dataCount1.get(0).get(0) == 1 // check job status def oncejob = sql """select status,comment from jobs("type"="insert") where Name='${jobName}' """ @@ -198,10 +221,10 @@ suite("test_base_insert_job") { println(tasks.size()) Awaitility.await("resume-job-test").atMost(60, SECONDS).until({ def afterResumeTasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """ - println "resume tasks :"+afterResumeTasks - afterResumeTasks.size() >tasks.size() + println "resume tasks :" + afterResumeTasks + afterResumeTasks.size() > tasks.size() }) - + // assert same job name try { sql """ @@ -216,7 +239,7 @@ suite("test_base_insert_job") { CREATE JOB ${jobName} ON SCHEDULE at current_timestamp comment 'test' DO update ${tableName} set type=2 where type=1; """ } catch (Exception e) { - assert e.getMessage().contains("Not support this sql") + assert e.getMessage().contains("Not support this sql :") } // assert start time greater than current time try { @@ -245,7 +268,7 @@ suite("test_base_insert_job") { // assert end time less than start time try { sql """ - CREATE JOB test_error_starts ON SCHEDULE every 1 second ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + CREATE JOB test_error_starts ON SCHEDULE every 1 second starts current_timestamp ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { assert e.getMessage().contains("endTimeMs must be greater than the start time") @@ -256,7 +279,7 @@ suite("test_base_insert_job") { CREATE JOB test_error_starts ON SCHEDULE every 1 years ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { - assert e.getMessage().contains("interval time unit can not be years") + assert e.getMessage().contains("Invalid interval time unit: years") } // test keyword as job name From be25a1f8b8ac57f23664fed845a675c19b1825bc Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Sun, 6 Oct 2024 17:44:33 +0800 Subject: [PATCH 2/5] fix test --- regression-test/data/job_p0/job_meta/job_query_test.out | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/regression-test/data/job_p0/job_meta/job_query_test.out b/regression-test/data/job_p0/job_meta/job_query_test.out index 1a2bfe0f9cd995..2bfbb890aed767 100644 --- a/regression-test/data/job_p0/job_meta/job_query_test.out +++ b/regression-test/data/job_p0/job_meta/job_query_test.out @@ -1,7 +1,7 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !select1 -- -JOB_ONETIME ONE_TIME AT 2052-03-18 00:00:00 insert into t_test_BASE_inSert_job (timestamp, type, user_id) values ('2023-03-18','1','12213'); +JOB_ONETIME ONE_TIME AT 2052-03-18 00:00:00 insert into t_test_BASE_inSert_job (timestamp, type, user_id) values ('2023-03-18','1','12213') -- !select2 -- -JOB_RECURRING RECURRING EVERY 1 HOUR STARTS 2052-03-18 00:00:00 insert into t_test_BASE_inSert_job (timestamp, type, user_id) values ('2023-03-18','1','12213'); +JOB_RECURRING RECURRING EVERY 1 HOUR STARTS 2052-03-18 00:00:00 insert into t_test_BASE_inSert_job (timestamp, type, user_id) values ('2023-03-18','1','12213') From b195bd77a2a2e2c11273582c21777657255c70a4 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 10 Oct 2024 11:03:57 +0800 Subject: [PATCH 3/5] When a task cacel fails, the exception should be explicitly thrown --- .../src/main/java/org/apache/doris/job/task/AbstractTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index a29878a97a3c1b..f78446aaf85cbf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -141,6 +141,7 @@ public void cancel() throws JobException { executeCancelLogic(); } catch (Exception e) { log.warn("cancel task failed, job id is {}, task id is {}", jobId, taskId, e); + throw new JobException(e); } finally { closeOrReleaseResources(); } From ef45c6b88ff5b29b8df8a2a926873482c793fa1a Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 10 Oct 2024 14:32:59 +0800 Subject: [PATCH 4/5] minor change --- .../org/apache/doris/nereids/DorisParser.g4 | 4 +- .../nereids/parser/LogicalPlanBuilder.java | 120 +++++++++--------- 2 files changed, 65 insertions(+), 59 deletions(-) diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 28247fa6e71584..fc5314e14d6306 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -50,7 +50,7 @@ statementBase | supportedCreateStatement #supportedCreateStatementAlias | supportedAlterStatement #supportedAlterStatementAlias | materializedViewStatement #materializedViewStatementAlias - | jobScheduleStatement #jobScheduleStatementAlias + | supportedJobStatement #supportedJobStatementAlias | constraintStatement #constraintStatementAlias | supportedDropStatement #supportedDropStatementAlias | unsupportedStatement #unsupported @@ -103,7 +103,7 @@ materializedViewStatement | CANCEL MATERIALIZED VIEW TASK taskId=INTEGER_VALUE ON mvName=multipartIdentifier #cancelMTMVTask | SHOW CREATE MATERIALIZED VIEW mvName=multipartIdentifier #showCreateMTMV ; -jobScheduleStatement +supportedJobStatement : CREATE JOB label=multipartIdentifier ON SCHEDULE ( (EVERY timeInterval=INTEGER_VALUE timeUnit=identifier diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 094488acec2d3c..69d89b0be5d564 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -576,17 +576,23 @@ public LogicalPlan visitCreateScheduledJob(DorisParser.CreateScheduledJobContext Optional startTime = ctx.startTime == null ? Optional.empty() : Optional.of(ctx.startTime.getText()); Optional endsTime = ctx.endsTime == null ? Optional.empty() : Optional.of(ctx.endsTime.getText()); Optional interval = ctx.timeInterval == null ? Optional.empty() : - Optional.of(Long.valueOf(ctx.timeInterval.getText())); + Optional.of(Long.valueOf(ctx.timeInterval.getText())); Optional intervalUnit = ctx.timeUnit == null ? Optional.empty() : Optional.of(ctx.timeUnit.getText()); - String commentSpec = ctx.commentSpec() == null ? "''" : ctx.commentSpec().STRING_LITERAL().getText(); String comment = - LogicalPlanBuilderAssistant.escapeBackSlash(commentSpec.substring(1, commentSpec.length() - 1)); + visitCommentSpec(ctx.commentSpec()); String executeSql = getOriginSql(ctx.supportedDmlStatement()); CreateJobInfo createJobInfo = new CreateJobInfo(label, atTime, interval, intervalUnit, startTime, - endsTime, immediateStartOptional, comment, executeSql); + endsTime, immediateStartOptional, comment, executeSql); return new CreateJobCommand(createJobInfo); } + @Override + public String visitCommentSpec(DorisParser.CommentSpecContext ctx) { + String commentSpec = ctx == null ? "''" : ctx.STRING_LITERAL().getText(); + return + LogicalPlanBuilderAssistant.escapeBackSlash(commentSpec.substring(1, commentSpec.length() - 1)); + } + @Override public LogicalPlan visitInsertTable(InsertTableContext ctx) { boolean isOverwrite = ctx.INTO() == null; @@ -1147,7 +1153,7 @@ public LogicalPlan visitLoad(DorisParser.LoadContext ctx) { visitMultipartIdentifier(ddc.tableName)); List colNames = (ddc.columns == null ? ImmutableList.of() : visitIdentifierList(ddc.columns)); List columnsFromPath = (ddc.columnsFromPath == null ? ImmutableList.of() - : visitIdentifierList(ddc.columnsFromPath.identifierList())); + : visitIdentifierList(ddc.columnsFromPath.identifierList())); List partitions = ddc.partition == null ? ImmutableList.of() : visitIdentifierList(ddc.partition); // TODO: multi location List multiFilePaths = new ArrayList<>(); @@ -1166,16 +1172,16 @@ public LogicalPlan visitLoad(DorisParser.LoadContext ctx) { } LoadTask.MergeType mergeType = ddc.mergeType() == null ? LoadTask.MergeType.APPEND - : LoadTask.MergeType.valueOf(ddc.mergeType().getText()); + : LoadTask.MergeType.valueOf(ddc.mergeType().getText()); Optional fileFormat = ddc.format == null ? Optional.empty() : Optional.of(visitIdentifierOrText(ddc.format)); Optional separator = ddc.separator == null ? Optional.empty() : Optional.of(ddc.separator.getText() - .substring(1, ddc.separator.getText().length() - 1)); + .substring(1, ddc.separator.getText().length() - 1)); Optional comma = ddc.comma == null ? Optional.empty() : Optional.of(ddc.comma.getText() - .substring(1, ddc.comma.getText().length() - 1)); + .substring(1, ddc.comma.getText().length() - 1)); Map dataProperties = ddc.propertyClause() == null ? new HashMap<>() - : visitPropertyClause(ddc.propertyClause()); + : visitPropertyClause(ddc.propertyClause()); dataDescriptions.add(new BulkLoadDataDesc( tableName, partitions, @@ -1252,8 +1258,8 @@ public LogicalSubQueryAlias visitAliasQuery(AliasQueryContext ctx) { LogicalPlan queryPlan = plan(ctx.query()); Optional> columnNames = optionalVisit(ctx.columnAliases(), () -> ctx.columnAliases().identifier().stream() - .map(RuleContext::getText) - .collect(ImmutableList.toImmutableList()) + .map(RuleContext::getText) + .collect(ImmutableList.toImmutableList()) ); return new LogicalSubQueryAlias<>(ctx.identifier().getText(), columnNames, queryPlan); }); @@ -1362,7 +1368,7 @@ private static LogicalPlan logicalPlanCombiner(LogicalPlan left, LogicalPlan rig * construct avl union tree */ public static LogicalPlan reduceToLogicalPlanTree(int low, int high, - List logicalPlans, Qualifier qualifier) { + List logicalPlans, Qualifier qualifier) { switch (high - low) { case 0: return logicalPlans.get(low); @@ -1671,7 +1677,7 @@ public Expression visitComparison(ComparisonContext ctx) { return new NullSafeEqual(left, right); default: throw new ParseException("Unsupported comparison expression: " - + operator.getSymbol().getText(), ctx); + + operator.getSymbol().getText(), ctx); } }); } @@ -1740,7 +1746,7 @@ private Expression expressionCombiner(Expression left, Expression right, Logical } private Expression reduceToExpressionTree(int low, int high, - List expressions, LogicalBinaryContext ctx) { + List expressions, LogicalBinaryContext ctx) { switch (high - low) { case 0: return expressions.get(low); @@ -2110,11 +2116,11 @@ public Expression visitDoublePipes(DorisParser.DoublePipesContext ctx) { /** * Create a value based [[CaseWhen]] expression. This has the following SQL form: * {{{ - * CASE [expression] - * WHEN [value] THEN [expression] - * ... - * ELSE [expression] - * END + * CASE [expression] + * WHEN [value] THEN [expression] + * ... + * ELSE [expression] + * END * }}} */ @Override @@ -2132,11 +2138,11 @@ public Expression visitSimpleCase(DorisParser.SimpleCaseContext context) { /** * Create a condition based [[CaseWhen]] expression. This has the following SQL syntax: * {{{ - * CASE - * WHEN [predicate] THEN [expression] - * ... - * ELSE [expression] - * END + * CASE + * WHEN [predicate] THEN [expression] + * ... + * ELSE [expression] + * END * }}} * * @param context the parse tree @@ -2287,8 +2293,8 @@ private WindowExpression withWindowSpec(WindowSpecContext ctx, Expression functi List orderKeyList = Lists.newArrayList(); if (ctx.sortClause() != null) { orderKeyList = visit(ctx.sortClause().sortItem(), OrderKey.class).stream() - .map(orderKey -> new OrderExpression(orderKey)) - .collect(Collectors.toList()); + .map(orderKey -> new OrderExpression(orderKey)) + .collect(Collectors.toList()); } if (ctx.windowFrame() != null) { @@ -2579,8 +2585,8 @@ public LogicalPlan visitRelationList(DorisParser.RelationListContext ctx) { @Override public List visitMultipartIdentifier(MultipartIdentifierContext ctx) { return ctx.parts.stream() - .map(RuleContext::getText) - .collect(ImmutableList.toImmutableList()); + .map(RuleContext::getText) + .collect(ImmutableList.toImmutableList()); } /** @@ -2597,8 +2603,8 @@ public List visitIdentifierList(IdentifierListContext ctx) { @Override public List visitIdentifierSeq(IdentifierSeqContext ctx) { return ctx.ident.stream() - .map(RuleContext::getText) - .collect(ImmutableList.toImmutableList()); + .map(RuleContext::getText) + .collect(ImmutableList.toImmutableList()); } @Override @@ -2779,10 +2785,10 @@ public PartitionTableInfo visitPartitionTable(DorisParser.PartitionTableContext }) .collect(ImmutableList.toImmutableList()); return new PartitionTableInfo( - isAutoPartition, - ctx.RANGE() != null ? "RANGE" : "LIST", - ctx.partitions != null ? visitPartitionsDef(ctx.partitions) : null, - partitionList); + isAutoPartition, + ctx.RANGE() != null ? "RANGE" : "LIST", + ctx.partitions != null ? visitPartitionsDef(ctx.partitions) : null, + partitionList); } @Override @@ -2796,8 +2802,8 @@ public ColumnDefinition visitColumnDef(ColumnDefContext ctx) { DataType colType = ctx.type instanceof PrimitiveDataTypeContext ? visitPrimitiveDataType(((PrimitiveDataTypeContext) ctx.type)) : ctx.type instanceof ComplexDataTypeContext - ? visitComplexDataType((ComplexDataTypeContext) ctx.type) - : visitAggStateDataType((AggStateDataTypeContext) ctx.type); + ? visitComplexDataType((ComplexDataTypeContext) ctx.type) + : visitAggStateDataType((AggStateDataTypeContext) ctx.type); colType = colType.conversion(); boolean isKey = ctx.KEY() != null; ColumnNullableType nullableType = ColumnNullableType.DEFAULT; @@ -2890,7 +2896,7 @@ public IndexDefinition visitIndexDef(IndexDefContext ctx) { String indexType = ctx.indexType != null ? ctx.indexType.getText().toUpperCase() : null; //comment should remove '\' and '(") at the beginning and end String comment = ctx.comment == null ? "" : LogicalPlanBuilderAssistant.escapeBackSlash( - ctx.comment.getText().substring(1, ctx.STRING_LITERAL().getText().length() - 1)); + ctx.comment.getText().substring(1, ctx.STRING_LITERAL().getText().length() - 1)); // change BITMAP index to INVERTED index if (Config.enable_create_bitmap_index_as_inverted_index && "BITMAP".equalsIgnoreCase(indexType)) { @@ -3329,7 +3335,7 @@ public Object visitCommentRelationHint(CommentRelationHintContext ctx) { } protected LogicalPlan withProjection(LogicalPlan input, SelectColumnClauseContext selectCtx, - Optional aggCtx, boolean isDistinct) { + Optional aggCtx, boolean isDistinct) { return ParserUtils.withOrigin(selectCtx, () -> { if (aggCtx.isPresent()) { if (isDistinct) { @@ -3374,8 +3380,8 @@ private LogicalPlan withRelations(LogicalPlan inputPlan, List r private LogicalPlan withFilter(LogicalPlan input, Optional whereCtx) { return input.optionalMap(whereCtx, () -> - new LogicalFilter<>(ExpressionUtils.extractConjunctionToSet( - getExpression(whereCtx.get().booleanExpression())), input)); + new LogicalFilter<>(ExpressionUtils.extractConjunctionToSet( + getExpression(whereCtx.get().booleanExpression())), input)); } private LogicalPlan withAggregate(LogicalPlan input, SelectColumnClauseContext selectCtx, @@ -3417,7 +3423,7 @@ private LogicalPlan withHaving(LogicalPlan input, Optional /** * match predicate type and generate different predicates. * - * @param ctx PredicateContext + * @param ctx PredicateContext * @param valueExpression valueExpression * @return Expression */ @@ -3433,15 +3439,15 @@ private Expression withPredicate(Expression valueExpression, PredicateContext ct break; case DorisParser.LIKE: outExpression = new Like( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; case DorisParser.RLIKE: case DorisParser.REGEXP: outExpression = new Regexp( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; case DorisParser.IN: @@ -3472,38 +3478,38 @@ private Expression withPredicate(Expression valueExpression, PredicateContext ct case DorisParser.MATCH: case DorisParser.MATCH_ANY: outExpression = new MatchAny( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; case DorisParser.MATCH_ALL: outExpression = new MatchAll( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; case DorisParser.MATCH_PHRASE: outExpression = new MatchPhrase( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; case DorisParser.MATCH_PHRASE_PREFIX: outExpression = new MatchPhrasePrefix( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; case DorisParser.MATCH_REGEXP: outExpression = new MatchRegexp( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; case DorisParser.MATCH_PHRASE_EDGE: outExpression = new MatchPhraseEdge( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; default: From 0e68131a8571899be5665d175e95ff77b2d68e32 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 10 Oct 2024 14:37:19 +0800 Subject: [PATCH 5/5] minor change --- .../nereids/parser/LogicalPlanBuilder.java | 106 +++++++++--------- 1 file changed, 53 insertions(+), 53 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 69d89b0be5d564..bfbe0e4f1976e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -1153,7 +1153,7 @@ public LogicalPlan visitLoad(DorisParser.LoadContext ctx) { visitMultipartIdentifier(ddc.tableName)); List colNames = (ddc.columns == null ? ImmutableList.of() : visitIdentifierList(ddc.columns)); List columnsFromPath = (ddc.columnsFromPath == null ? ImmutableList.of() - : visitIdentifierList(ddc.columnsFromPath.identifierList())); + : visitIdentifierList(ddc.columnsFromPath.identifierList())); List partitions = ddc.partition == null ? ImmutableList.of() : visitIdentifierList(ddc.partition); // TODO: multi location List multiFilePaths = new ArrayList<>(); @@ -1172,16 +1172,16 @@ public LogicalPlan visitLoad(DorisParser.LoadContext ctx) { } LoadTask.MergeType mergeType = ddc.mergeType() == null ? LoadTask.MergeType.APPEND - : LoadTask.MergeType.valueOf(ddc.mergeType().getText()); + : LoadTask.MergeType.valueOf(ddc.mergeType().getText()); Optional fileFormat = ddc.format == null ? Optional.empty() : Optional.of(visitIdentifierOrText(ddc.format)); Optional separator = ddc.separator == null ? Optional.empty() : Optional.of(ddc.separator.getText() - .substring(1, ddc.separator.getText().length() - 1)); + .substring(1, ddc.separator.getText().length() - 1)); Optional comma = ddc.comma == null ? Optional.empty() : Optional.of(ddc.comma.getText() - .substring(1, ddc.comma.getText().length() - 1)); + .substring(1, ddc.comma.getText().length() - 1)); Map dataProperties = ddc.propertyClause() == null ? new HashMap<>() - : visitPropertyClause(ddc.propertyClause()); + : visitPropertyClause(ddc.propertyClause()); dataDescriptions.add(new BulkLoadDataDesc( tableName, partitions, @@ -1258,8 +1258,8 @@ public LogicalSubQueryAlias visitAliasQuery(AliasQueryContext ctx) { LogicalPlan queryPlan = plan(ctx.query()); Optional> columnNames = optionalVisit(ctx.columnAliases(), () -> ctx.columnAliases().identifier().stream() - .map(RuleContext::getText) - .collect(ImmutableList.toImmutableList()) + .map(RuleContext::getText) + .collect(ImmutableList.toImmutableList()) ); return new LogicalSubQueryAlias<>(ctx.identifier().getText(), columnNames, queryPlan); }); @@ -1368,7 +1368,7 @@ private static LogicalPlan logicalPlanCombiner(LogicalPlan left, LogicalPlan rig * construct avl union tree */ public static LogicalPlan reduceToLogicalPlanTree(int low, int high, - List logicalPlans, Qualifier qualifier) { + List logicalPlans, Qualifier qualifier) { switch (high - low) { case 0: return logicalPlans.get(low); @@ -1677,7 +1677,7 @@ public Expression visitComparison(ComparisonContext ctx) { return new NullSafeEqual(left, right); default: throw new ParseException("Unsupported comparison expression: " - + operator.getSymbol().getText(), ctx); + + operator.getSymbol().getText(), ctx); } }); } @@ -1746,7 +1746,7 @@ private Expression expressionCombiner(Expression left, Expression right, Logical } private Expression reduceToExpressionTree(int low, int high, - List expressions, LogicalBinaryContext ctx) { + List expressions, LogicalBinaryContext ctx) { switch (high - low) { case 0: return expressions.get(low); @@ -2116,11 +2116,11 @@ public Expression visitDoublePipes(DorisParser.DoublePipesContext ctx) { /** * Create a value based [[CaseWhen]] expression. This has the following SQL form: * {{{ - * CASE [expression] - * WHEN [value] THEN [expression] - * ... - * ELSE [expression] - * END + * CASE [expression] + * WHEN [value] THEN [expression] + * ... + * ELSE [expression] + * END * }}} */ @Override @@ -2138,11 +2138,11 @@ public Expression visitSimpleCase(DorisParser.SimpleCaseContext context) { /** * Create a condition based [[CaseWhen]] expression. This has the following SQL syntax: * {{{ - * CASE - * WHEN [predicate] THEN [expression] - * ... - * ELSE [expression] - * END + * CASE + * WHEN [predicate] THEN [expression] + * ... + * ELSE [expression] + * END * }}} * * @param context the parse tree @@ -2293,8 +2293,8 @@ private WindowExpression withWindowSpec(WindowSpecContext ctx, Expression functi List orderKeyList = Lists.newArrayList(); if (ctx.sortClause() != null) { orderKeyList = visit(ctx.sortClause().sortItem(), OrderKey.class).stream() - .map(orderKey -> new OrderExpression(orderKey)) - .collect(Collectors.toList()); + .map(orderKey -> new OrderExpression(orderKey)) + .collect(Collectors.toList()); } if (ctx.windowFrame() != null) { @@ -2585,8 +2585,8 @@ public LogicalPlan visitRelationList(DorisParser.RelationListContext ctx) { @Override public List visitMultipartIdentifier(MultipartIdentifierContext ctx) { return ctx.parts.stream() - .map(RuleContext::getText) - .collect(ImmutableList.toImmutableList()); + .map(RuleContext::getText) + .collect(ImmutableList.toImmutableList()); } /** @@ -2603,8 +2603,8 @@ public List visitIdentifierList(IdentifierListContext ctx) { @Override public List visitIdentifierSeq(IdentifierSeqContext ctx) { return ctx.ident.stream() - .map(RuleContext::getText) - .collect(ImmutableList.toImmutableList()); + .map(RuleContext::getText) + .collect(ImmutableList.toImmutableList()); } @Override @@ -2785,10 +2785,10 @@ public PartitionTableInfo visitPartitionTable(DorisParser.PartitionTableContext }) .collect(ImmutableList.toImmutableList()); return new PartitionTableInfo( - isAutoPartition, - ctx.RANGE() != null ? "RANGE" : "LIST", - ctx.partitions != null ? visitPartitionsDef(ctx.partitions) : null, - partitionList); + isAutoPartition, + ctx.RANGE() != null ? "RANGE" : "LIST", + ctx.partitions != null ? visitPartitionsDef(ctx.partitions) : null, + partitionList); } @Override @@ -2802,8 +2802,8 @@ public ColumnDefinition visitColumnDef(ColumnDefContext ctx) { DataType colType = ctx.type instanceof PrimitiveDataTypeContext ? visitPrimitiveDataType(((PrimitiveDataTypeContext) ctx.type)) : ctx.type instanceof ComplexDataTypeContext - ? visitComplexDataType((ComplexDataTypeContext) ctx.type) - : visitAggStateDataType((AggStateDataTypeContext) ctx.type); + ? visitComplexDataType((ComplexDataTypeContext) ctx.type) + : visitAggStateDataType((AggStateDataTypeContext) ctx.type); colType = colType.conversion(); boolean isKey = ctx.KEY() != null; ColumnNullableType nullableType = ColumnNullableType.DEFAULT; @@ -2896,7 +2896,7 @@ public IndexDefinition visitIndexDef(IndexDefContext ctx) { String indexType = ctx.indexType != null ? ctx.indexType.getText().toUpperCase() : null; //comment should remove '\' and '(") at the beginning and end String comment = ctx.comment == null ? "" : LogicalPlanBuilderAssistant.escapeBackSlash( - ctx.comment.getText().substring(1, ctx.STRING_LITERAL().getText().length() - 1)); + ctx.comment.getText().substring(1, ctx.STRING_LITERAL().getText().length() - 1)); // change BITMAP index to INVERTED index if (Config.enable_create_bitmap_index_as_inverted_index && "BITMAP".equalsIgnoreCase(indexType)) { @@ -3335,7 +3335,7 @@ public Object visitCommentRelationHint(CommentRelationHintContext ctx) { } protected LogicalPlan withProjection(LogicalPlan input, SelectColumnClauseContext selectCtx, - Optional aggCtx, boolean isDistinct) { + Optional aggCtx, boolean isDistinct) { return ParserUtils.withOrigin(selectCtx, () -> { if (aggCtx.isPresent()) { if (isDistinct) { @@ -3380,8 +3380,8 @@ private LogicalPlan withRelations(LogicalPlan inputPlan, List r private LogicalPlan withFilter(LogicalPlan input, Optional whereCtx) { return input.optionalMap(whereCtx, () -> - new LogicalFilter<>(ExpressionUtils.extractConjunctionToSet( - getExpression(whereCtx.get().booleanExpression())), input)); + new LogicalFilter<>(ExpressionUtils.extractConjunctionToSet( + getExpression(whereCtx.get().booleanExpression())), input)); } private LogicalPlan withAggregate(LogicalPlan input, SelectColumnClauseContext selectCtx, @@ -3423,7 +3423,7 @@ private LogicalPlan withHaving(LogicalPlan input, Optional /** * match predicate type and generate different predicates. * - * @param ctx PredicateContext + * @param ctx PredicateContext * @param valueExpression valueExpression * @return Expression */ @@ -3439,15 +3439,15 @@ private Expression withPredicate(Expression valueExpression, PredicateContext ct break; case DorisParser.LIKE: outExpression = new Like( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; case DorisParser.RLIKE: case DorisParser.REGEXP: outExpression = new Regexp( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; case DorisParser.IN: @@ -3478,38 +3478,38 @@ private Expression withPredicate(Expression valueExpression, PredicateContext ct case DorisParser.MATCH: case DorisParser.MATCH_ANY: outExpression = new MatchAny( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; case DorisParser.MATCH_ALL: outExpression = new MatchAll( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; case DorisParser.MATCH_PHRASE: outExpression = new MatchPhrase( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; case DorisParser.MATCH_PHRASE_PREFIX: outExpression = new MatchPhrasePrefix( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; case DorisParser.MATCH_REGEXP: outExpression = new MatchRegexp( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; case DorisParser.MATCH_PHRASE_EDGE: outExpression = new MatchPhraseEdge( - valueExpression, - getExpression(ctx.pattern) + valueExpression, + getExpression(ctx.pattern) ); break; default: