Skip to content

Commit

Permalink
Minor refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
yashmayya committed Sep 12, 2024
1 parent ae1172d commit e1c6cab
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.spi.auth.AuthorizationResult;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
Expand All @@ -67,6 +69,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
protected final BrokerRequestIdGenerator _requestIdGenerator;
protected final long _brokerTimeoutMs;
protected final QueryLogger _queryLogger;
@Nullable
protected final String _enableQueryNullHandling;

public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) {
Expand All @@ -82,6 +86,7 @@ public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, Brok
_requestIdGenerator = new BrokerRequestIdGenerator(brokerId);
_brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS);
_queryLogger = new QueryLogger(config);
_enableQueryNullHandling = config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING);
}

@Override
Expand Down Expand Up @@ -116,8 +121,7 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
if (StringUtils.isNotBlank(failureMessage)) {
failureMessage = "Reason: " + failureMessage;
}
throw new WebApplicationException("Permission denied." + failureMessage,
Response.Status.FORBIDDEN);
throw new WebApplicationException("Permission denied." + failureMessage, Response.Status.FORBIDDEN);
}

JsonNode sql = request.get(Broker.Request.SQL);
Expand All @@ -129,6 +133,24 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption

String query = sql.textValue();
requestContext.setQuery(query);

// Parse the query if needed
if (sqlNodeAndOptions == null) {
try {
sqlNodeAndOptions = RequestUtils.parseQuery(query, request);
} catch (Exception e) {
// Do not log or emit metric here because it is pure user error
requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
}
}

// Add null handling option from broker config only if there is no override in the query
if (_enableQueryNullHandling != null) {
sqlNodeAndOptions.getOptions()
.putIfAbsent(Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, _enableQueryNullHandling);
}

BrokerResponse brokerResponse =
handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext, httpHeaders,
accessControl);
Expand All @@ -139,9 +161,9 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
return brokerResponse;
}

protected abstract BrokerResponse handleRequest(long requestId, String query,
@Nullable SqlNodeAndOptions sqlNodeAndOptions, JsonNode request, @Nullable RequesterIdentity requesterIdentity,
RequestContext requestContext, @Nullable HttpHeaders httpHeaders, AccessControl accessControl)
protected abstract BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions,
JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
@Nullable HttpHeaders httpHeaders, AccessControl accessControl)
throws Exception;

protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, Htt
}

@Override
protected BrokerResponse handleRequest(long requestId, String query, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions,
JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
@Nullable HttpHeaders httpHeaders, AccessControl accessControl)
throws Exception {
Expand All @@ -287,26 +287,6 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
Tracing.ThreadAccountantOps.setupRunner(String.valueOf(requestId));

try {
// Parse the query if needed
if (sqlNodeAndOptions == null) {
try {
sqlNodeAndOptions = RequestUtils.parseQuery(query, request);
} catch (Exception e) {
// Do not log or emit metric here because it is pure user error
requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
}
}

Map<String, String> queryOptions = sqlNodeAndOptions.getOptions();

// Add null handling option from broker config only if there is no override in the query
if (!queryOptions.containsKey(QueryOptionKey.ENABLE_NULL_HANDLING)
&& _config.containsKey(Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING)) {
queryOptions.put(QueryOptionKey.ENABLE_NULL_HANDLING,
_config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING));
}

// Compile the request into PinotQuery
long compilationStartTimeNs = System.nanoTime();
PinotQuery pinotQuery;
Expand All @@ -317,7 +297,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1);
requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
// Check if the query is a v2 supported query
String database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders);
String database = DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), httpHeaders);
if (ParserUtils.canCompileWithMultiStageEngine(query, database, _tableCache)) {
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, new Exception(
"It seems that the query is only supported by the multi-stage query engine, please retry the query using "
Expand Down Expand Up @@ -996,9 +976,25 @@ private void handleSubquery(Expression expression, long requestId, JsonNode json
Literal subqueryLiteral = operands.get(1).getLiteral();
Preconditions.checkState(subqueryLiteral != null, "Second argument of IN_SUBQUERY must be a literal (subquery)");
String subquery = subqueryLiteral.getStringValue();

SqlNodeAndOptions sqlNodeAndOptions;
try {
sqlNodeAndOptions = RequestUtils.parseQuery(subquery, jsonRequest);
} catch (Exception e) {
// Do not log or emit metric here because it is pure user error
requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
throw new RuntimeException("Failed to parse subquery: " + subquery, e);
}

// Add null handling option from broker config only if there is no override in the query
if (_enableQueryNullHandling != null) {
sqlNodeAndOptions.getOptions()
.putIfAbsent(Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, _enableQueryNullHandling);
}

BrokerResponse response =
handleRequest(requestId, subquery, null, jsonRequest, requesterIdentity, requestContext, httpHeaders,
accessControl);
handleRequest(requestId, subquery, sqlNodeAndOptions, jsonRequest, requesterIdentity, requestContext,
httpHeaders, accessControl);
if (response.getExceptionsSize() != 0) {
throw new RuntimeException("Caught exception while executing subquery: " + subquery);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.ExceptionUtils;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.query.QueryEnvironment;
Expand Down Expand Up @@ -110,22 +109,11 @@ public void shutDown() {
}

@Override
protected BrokerResponse handleRequest(long requestId, String query, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions,
JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
HttpHeaders httpHeaders, AccessControl accessControl) {
LOGGER.debug("SQL query for request {}: {}", requestId, query);

// Parse the query if needed
if (sqlNodeAndOptions == null) {
try {
sqlNodeAndOptions = RequestUtils.parseQuery(query, request);
} catch (Exception e) {
// Do not log or emit metric here because it is pure user error
requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
}
}

// Compile the request
Map<String, String> queryOptions = sqlNodeAndOptions.getOptions();

Expand Down Expand Up @@ -157,8 +145,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
if (StringUtils.isNotBlank(failureMessage)) {
failureMessage = "Reason: " + failureMessage;
}
throw new WebApplicationException("Permission denied. " + failureMessage,
Response.Status.FORBIDDEN);
throw new WebApplicationException("Permission denied. " + failureMessage, Response.Status.FORBIDDEN);
}
return constructMultistageExplainPlan(query, plan);
case SELECT:
Expand Down Expand Up @@ -210,8 +197,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
if (StringUtils.isNotBlank(failureMessage)) {
failureMessage = "Reason: " + failureMessage;
}
throw new WebApplicationException("Permission denied." + failureMessage,
Response.Status.FORBIDDEN);
throw new WebApplicationException("Permission denied." + failureMessage, Response.Status.FORBIDDEN);
}

// Validate QPS quota
Expand Down Expand Up @@ -299,10 +285,8 @@ private void fillOldBrokerResponseStats(BrokerResponseNativeV2 brokerResponse,
}
} catch (Exception e) {
LOGGER.warn("Error encountered while collecting multi-stage stats", e);
brokerResponse.setStageStats(JsonNodeFactory.instance.objectNode().put(
"error",
"Error encountered while collecting multi-stage stats - " + e)
);
brokerResponse.setStageStats(JsonNodeFactory.instance.objectNode()
.put("error", "Error encountered while collecting multi-stage stats - " + e));
}
}

Expand Down

0 comments on commit e1c6cab

Please sign in to comment.