Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #564

Merged
merged 1 commit into from
Sep 4, 2024
Merged

fix #564

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions parser/registry/common.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const { hashLabels, parseLabels } = require('../../common')
const { getPlg } = require('../../plugins/engine')
const Sql = require('@cloki/clickhouse-sql')
const { DATABASE_NAME } = require('../../lib/utils')
const clusterName = require('../../common').clusterName
module.exports.dist = clusterName ? '_dist' : ''

Expand Down Expand Up @@ -413,3 +414,48 @@ module.exports.patchCol = (query, name, patcher) => {
return col
})
}

module.exports.preJoinLabels = (token, query, dist) => {
const from = query.getParam(module.exports.sharedParamNames.from)
const to = query.getParam(module.exports.sharedParamNames.to)
const sqlFrom = new Sql.Raw()
sqlFrom.toString = () => {
let fromNs = 0
if (from.get()) {
fromNs = from.get()
}
return `toDate(fromUnixTimestamp(intDiv(${fromNs}, 1000000000)))`
}
const sqlTo = new Sql.Raw()
sqlTo.toString = () => {
let toNs = 0
if (to.get()) {
toNs = to.get()
}
return `toDate(fromUnixTimestamp(intDiv(${toNs}, 1000000000)))`
}
let withIdxSel = query.with().idx_sel
let inRightSide = new Sql.WithReference(withIdxSel)
if (!withIdxSel) {
withIdxSel = query.with().str_sel
inRightSide = new Sql.Select()
.select('fingerprint')
.from(new Sql.WithReference(withIdxSel))
}
dist = dist || ''
const timeSeriesReq = new Sql.Select()
.select('fingerprint', 'labels')
.from([`${DATABASE_NAME()}.time_series${dist}`, 'time_series'])
.where(new Sql.And(
new Sql.In('time_series.fingerprint', 'in', inRightSide),
Sql.Gte(new Sql.Raw('date'), sqlFrom),
Sql.Lte(new Sql.Raw('date'), sqlTo)
))
timeSeriesReq._toString = timeSeriesReq.toString
timeSeriesReq.toString = () => {
return `(${timeSeriesReq._toString()})`
}
query.join(new module.exports.Aliased(timeSeriesReq, 'time_series'), 'left any',
Sql.Eq('samples.fingerprint', new Sql.Raw('time_series.fingerprint')))
query.select([new Sql.Raw('JSONExtractKeysAndValues(time_series.labels, \'String\')'), 'labels'])
}
7 changes: 3 additions & 4 deletions parser/registry/smart_optimizations/optimization_v3_2.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { getDuration, Aliased } = require('../common')
const { getDuration, preJoinLabels, dist } = require('../common')
const reg = require('./log_range_agg_reg_v3_2')
const Sql = require('@cloki/clickhouse-sql')
const { DATABASE_NAME, checkVersion } = require('../../../lib/utils')
Expand Down Expand Up @@ -51,15 +51,14 @@ module.exports.apply = (token, fromNS, toNS, stepNS) => {
.select(['samples.fingerprint', 'fingerprint'])
.from([`${DATABASE_NAME()}.metrics_15s${_dist}`, 'samples'])
.where(tsClause)
q.join(new Aliased(`${DATABASE_NAME()}.time_series`, 'time_series'), 'left any',
Sql.Eq('samples.fingerprint', new Sql.Raw('time_series.fingerprint')))
q.select([new Sql.Raw('any(JSONExtractKeysAndValues(time_series.labels, \'String\'))'), 'labels'])

q.ctx = {
step: stepNS / 1000000000,
inline: !!clusterName
}

preJoinLabels(token, q, dist)

for (const streamSelectorRule of token.Children('log_stream_selector_rule')) {
q = streamSelectorReg[streamSelectorRule.Child('operator').value](streamSelectorRule, q)
}
Expand Down
42 changes: 23 additions & 19 deletions parser/transpiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const lineFormat = require('./registry/line_format')
const parserRegistry = require('./registry/parser_registry')
const unwrap = require('./registry/unwrap')
const unwrapRegistry = require('./registry/unwrap_registry')
const { durationToMs, sharedParamNames, getStream, Aliased } = require('./registry/common')
const { durationToMs, sharedParamNames, getStream, preJoinLabels } = require('./registry/common')
const compiler = require('./bnf')
const {
parseMs,
Expand Down Expand Up @@ -75,11 +75,6 @@ module.exports.initQuery = (joinLabels, types) => {
.addParam(to)
.addParam(limit)
.addParam(matrix)
if (joinLabels) {
q.join(new Aliased(`${DATABASE_NAME()}.time_series`, 'time_series'), 'left any',
Sql.Eq('samples.fingerprint', new Sql.Raw('time_series.fingerprint')))
q.select([new Sql.Raw('JSONExtractKeysAndValues(time_series.labels, \'String\')'), 'labels'])
}
return q
}

Expand All @@ -88,7 +83,7 @@ module.exports.initQuery = (joinLabels, types) => {
* @param types {[number] || undefined}
* @returns {Select}
*/
module.exports.initQueryV3_2 = (joinLabels, types) => {
/*module.exports.initQueryV3_2 = (joinLabels, types) => {
types = types || [bothType, logType]
const from = new Sql.Parameter(sharedParamNames.from)
const to = new Sql.Parameter(sharedParamNames.to)
Expand All @@ -108,12 +103,13 @@ module.exports.initQueryV3_2 = (joinLabels, types) => {
.addParam(from)
.addParam(to)
if (joinLabels) {
//TODO: fix join
q.join(new Aliased(`${DATABASE_NAME()}.time_series${dist}`, 'time_series'), 'left any',
Sql.Eq('samples.fingerprint', new Sql.Raw('time_series.fingerprint')))
q.select([new Sql.Raw('JSONExtractKeysAndValues(time_series.labels, \'String\')'), 'labels'])
}
return q
}
}*/

/**
*
Expand Down Expand Up @@ -193,6 +189,8 @@ module.exports.transpile = (request) => {
end
}
}
joinLabels && doStreamSelectorOperatorRegistry(token, query)
joinLabels && preJoinLabels(token, query)
matrixOp = matrixOp || (token.Child('summary') && 'summary')
switch (matrixOp) {
case 'aggregation_operator':
Expand Down Expand Up @@ -223,10 +221,9 @@ module.exports.transpile = (request) => {
.orderBy(['labels', order], ['timestamp_ns', order])
setQueryParam(query, sharedParamNames.limit, limit)
if (!joinLabels) {
query.join(new Aliased(`${DATABASE_NAME()}.time_series${dist}`, 'time_series'), 'left any',
Sql.Eq('sel_a.fingerprint', new Sql.Raw('time_series.fingerprint')))
query.select([new Sql.Raw('JSONExtractKeysAndValues(time_series.labels, \'String\')'), 'labels'],
new Sql.Raw('sel_a.*'))
query.from([new Sql.WithReference(query.with().sel_a), 'samples'])
preJoinLabels(token, query, dist)
query.select(new Sql.Raw('samples.*'))
}
}
if (token.Child('agg_statement') && token.Child('compared_agg_statement_cmp')) {
Expand Down Expand Up @@ -381,7 +378,9 @@ module.exports.transpileTail = (request) => {
}
}

let query = module.exports.initQuery(true)
let query = module.exports.initQuery(false)
doStreamSelectorOperatorRegistry(expression.rootToken, query)
preJoinLabels(expression.rootToken, query, dist)
query.ctx = {
...(query.ctx || {}),
legacy: true
Expand All @@ -393,7 +392,6 @@ module.exports.transpileTail = (request) => {
query.order_expressions = []
query.orderBy(['timestamp_ns', 'asc'])
query.limit(undefined, undefined)
//logger.debug(query.toString())
return {
query: request.rawRequest ? query : query.toString(),
stream: getStream(query)
Expand Down Expand Up @@ -496,11 +494,7 @@ module.exports.transpileLogRangeAggregation = (token, query) => {
* @returns {Sql.Select}
*/
module.exports.transpileLogStreamSelector = (token, query) => {
const rules = token.Children('log_stream_selector_rule')
for (const rule of rules) {
const op = rule.Child('operator').value
query = streamSelectorOperatorRegistry[op](rule, query)
}
doStreamSelectorOperatorRegistry(token, query)
for (const pipeline of token.Children('log_pipeline')) {
if (pipeline.Child('line_filter_expression')) {
const op = pipeline.Child('line_filter_operator').value
Expand Down Expand Up @@ -619,3 +613,13 @@ const whereBuilder = (clause) => {
const _clause = clause.slice(1).map(c => Array.isArray(c) ? `(${whereBuilder(c)})` : c)
return _clause.join(` ${op} `)
}

const doStreamSelectorOperatorRegistry = (token, query) => {
if (!query.with().idx_sel && !query.with().str_sel) {
const rules = token.Children('log_stream_selector_rule')
for (const rule of rules) {
const op = rule.Child('operator').value
query = streamSelectorOperatorRegistry[op](rule, query)
}
}
}
Loading
Loading