Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Optimizing pipeline performance #4390

Merged
merged 32 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4634c9c
chore: improve pipeline performance
paomian Jul 11, 2024
eb46306
chore: use arc to improve time type
paomian Jul 12, 2024
dd12e87
chore: improve pipeline coerce
paomian Jul 15, 2024
8b6e71a
chore: add vec refactor
paomian Jul 16, 2024
64dc236
chore: add vec pp
paomian Jul 17, 2024
4fa40ca
chore: improve pipeline
paomian Jul 18, 2024
1ebe6a0
inprocess
paomian Jul 23, 2024
bfd3c77
chore: set log ingester use new pipeline
paomian Jul 24, 2024
d6c26ff
chore: fix some error by pr comment
paomian Jul 25, 2024
018c51a
chore: fix typo
paomian Jul 25, 2024
d5dceb4
chore: use enum_dispatch to simplify code
paomian Jul 25, 2024
19bc574
chore: some minor fix
paomian Jul 26, 2024
9c6fa0e
chore: format code
paomian Jul 26, 2024
4d63157
chore: update by pr comment
paomian Jul 31, 2024
d0e62c2
chore: fix typo
paomian Jul 31, 2024
fd6c219
chore: make clippy happy
paomian Aug 1, 2024
24fc6b8
chore: fix by pr comment
paomian Aug 1, 2024
f79b9fb
chore: remove epoch and date process add new timestamp process
paomian Aug 1, 2024
7f3e5f0
chore: add more test for pipeline
paomian Aug 1, 2024
ea9c758
chore: restore epoch and date processor
paomian Aug 1, 2024
03f3bee
chore: compatibility issue
paomian Aug 1, 2024
71adf6f
chore: fix by pr comment
paomian Aug 2, 2024
51aceb1
chore: move the evaluation out of the loop
paomian Aug 2, 2024
5670708
chore: fix by pr comment
paomian Aug 2, 2024
e4d8b9a
chore: fix dissect output key filter
paomian Aug 2, 2024
e3962ac
chore: fix transform output greptime value has order error
paomian Aug 5, 2024
54e620b
chore: keep pipeline transform output order
paomian Aug 9, 2024
06cff33
chore: revert tests
shuiyisong Aug 9, 2024
38bc345
chore: simplify pipeline prepare implementation
paomian Aug 12, 2024
bc5832b
chore: add test for timestamp pipelin processor
paomian Aug 12, 2024
37072b0
chore: make clippy happy
paomian Aug 12, 2024
c34c92f
chore: replace is_some check to match
paomian Aug 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions src/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ license.workspace = true
workspace = true

[dependencies]
ahash = "0.8"
api.workspace = true
arrow.workspace = true
async-trait.workspace = true
Expand All @@ -35,6 +36,7 @@ datafusion-expr.workspace = true
datafusion-functions.workspace = true
datafusion-physical-expr.workspace = true
datatypes.workspace = true
enum_dispatch = "0.3"
futures.workspace = true
greptime-proto.workspace = true
itertools.workspace = true
Expand All @@ -57,7 +59,13 @@ yaml-rust = "0.4"

[dev-dependencies]
catalog = { workspace = true, features = ["testing"] }
criterion = { version = "0.4", features = ["html_reports"] }
rayon = "1.0"
ron = "0.7"
serde = { version = "1.0", features = ["derive"] }
session = { workspace = true, features = ["testing"] }

[[bench]]
name = "processor"
harness = false
path = "benches/processor.rs"
1,000 changes: 1,000 additions & 0 deletions src/pipeline/benches/data.log

Large diffs are not rendered by default.

263 changes: 263 additions & 0 deletions src/pipeline/benches/processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pipeline::{parse, Array, Content, GreptimeTransformer, Pipeline, Value as PipelineValue};
use serde_json::{Deserializer, Value};

fn processor_map(
pipeline: &Pipeline<GreptimeTransformer>,
input_values: Vec<Value>,
) -> impl IntoIterator<Item = greptime_proto::v1::Rows> {
let pipeline_data = input_values
.into_iter()
.map(|v| PipelineValue::try_from(v).unwrap())
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
.collect::<Vec<_>>();

pipeline.exec(PipelineValue::Array(Array {
values: pipeline_data,
}))
}

fn processor_mut(
pipeline: &Pipeline<GreptimeTransformer>,
input_values: Vec<Value>,
) -> impl IntoIterator<Item = Vec<greptime_proto::v1::Row>> {
let mut payload = pipeline.init_intermediate_state();
let mut result = Vec::with_capacity(input_values.len());

for v in input_values {
pipeline.prepare(v, &mut payload)?;
let r = pipeline.exec_mut(&mut payload)?;
result.push(r);
pipeline.reset_intermediate_state(&mut payload);
}

Ok::<Vec<greptime_proto::v1::Row>, String>(result)
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
}

fn prepare_pipeline() -> Pipeline<GreptimeTransformer> {
let pipeline_yaml = r#"
---
description: Pipeline for Akamai DataStream2 Log

processors:
- urlencoding:
fields:
- breadcrumbs
- UA
- referer
- queryStr
method: decode
ignore_missing: true
- gsub:
field: reqTimeSec
pattern: "\\."
replacement: ""
- epoch:
field: reqTimeSec
resolution: millisecond
ignore_missing: true
- regex:
field: breadcrumbs
patterns:
- "(?<parent>\\[[^\\[]*c=c[^\\]]*\\])"
- "(?<edge>\\[[^\\[]*c=g[^\\]]*\\])"
- "(?<origin>\\[[^\\[]*c=o[^\\]]*\\])"
- "(?<peer>\\[[^\\[]*c=p[^\\]]*\\])"
- "(?<cloud_wrapper>\\[[^\\[]*c=w[^\\]]*\\])"
ignore_missing: true
- regex:
fields:
- breadcrumbs_parent
- breadcrumbs_edge
- breadcrumbs_origin
- breadcrumbs_peer
- breadcrumbs_cloud_wrapper
ignore_missing: true
patterns:
- "a=(?<ip>[^,\\]]+)"
- "b=(?<request_id>[^,\\]]+)"
- "k=(?<request_end_time>[^,\\]]+)"
- "l=(?<turn_around_time>[^,\\]]+)"
- "m=(?<dns_lookup_time>[^,\\]]+)"
- "n=(?<geo>[^,\\]]+)"
- "o=(?<asn>[^,\\]]+)"
- regex:
field: queryStr, cmcd
patterns:
- "(?i)CMCD=//(?<version>[\\d\\.]+)@V/(?<data>.+$)"
ignore_missing: true
- cmcd:
field: cmcd_data, cmcd
ignore_missing: true

transform:
- fields:
- breadcrumbs
- referer
- queryStr, query_str
- customField, custom_field
- reqId, req_id
- city
- state
- country
- securityRules, security_rules
- ewUsageInfo, ew_usage_info
- ewExecutionInfo, ew_execution_info
- errorCode, error_code
- xForwardedFor, x_forwarded_for
- range
- accLang, acc_lang
- reqMethod, req_method
- reqHost, req_host
- proto
- cliIP, cli_ip
- rspContentType, rsp_content_type
- tlsVersion, tls_version
type: string
- fields:
- version
- cacheStatus, cache_status
- lastByte, last_byte
type: uint8
- fields:
- streamId, stream_id
- billingRegion, billing_region
- transferTimeMSec, transfer_time_msec
- turnAroundTimeMSec, turn_around_time_msec
- reqEndTimeMSec, req_end_time_msec
- maxAgeSec, max_age_sec
- reqPort, req_port
- statusCode, status_code
- cp
- dnsLookupTimeMSec, dns_lookup_time_msec
- tlsOverheadTimeMSec, tls_overhead_time_msec
type: uint32
on_failure: ignore
- fields:
- bytes
- rspContentLen, rsp_content_len
- objSize, obj_size
- uncompressedSize, uncompressed_size
- overheadBytes, overhead_bytes
- totalBytes, total_bytes
type: uint64
on_failure: ignore
- fields:
- UA, user_agent
- cookie
- reqPath, req_path
type: string
# index: fulltext
- field: reqTimeSec, req_time_sec
# epoch time is special, the resolution MUST BE specified
type: epoch, ms
index: time

# the following is from cmcd
- fields:
- cmcd_version
- cmcd_cid, cmcd_content_id
- cmcd_nor, cmcd_next_object_requests
- cmcd_nrr, cmcd_next_range_request
- cmcd_ot, cmcd_object_type
- cmcd_sf, cmcd_streaming_format
- cmcd_sid, cmcd_session_id
- cmcd_st, cmcd_stream_type
- cmcd_v
type: string
- fields:
- cmcd_br, cmcd_encoded_bitrate
- cmcd_bl, cmcd_buffer_length
- cmcd_d, cmcd_object_duration
- cmcd_dl, cmcd_deadline
- cmcd_mtp, cmcd_measured_throughput
- cmcd_rtp, cmcd_requested_max_throughput
- cmcd_tb, cmcd_top_bitrate
type: uint64
- fields:
- cmcd_pr, cmcd_playback_rate
type: float64
- fields:
- cmcd_bs, cmcd_buffer_starvation
- cmcd_su, cmcd_startup
type: boolean

# the following is from breadcrumbs
- fields:
- breadcrumbs_parent_ip
- breadcrumbs_parent_request_id
- breadcrumbs_parent_geo
- breadcrumbs_edge_ip
- breadcrumbs_edge_request_id
- breadcrumbs_edge_geo
- breadcrumbs_origin_ip
- breadcrumbs_origin_request_id
- breadcrumbs_origin_geo
- breadcrumbs_peer_ip
- breadcrumbs_peer_request_id
- breadcrumbs_peer_geo
- breadcrumbs_cloud_wrapper_ip
- breadcrumbs_cloud_wrapper_request_id
- breadcrumbs_cloud_wrapper_geo
type: string
- fields:
- breadcrumbs_parent_request_end_time
- breadcrumbs_parent_turn_around_time
- breadcrumbs_parent_dns_lookup_time
- breadcrumbs_parent_asn
- breadcrumbs_edge_request_end_time
- breadcrumbs_edge_turn_around_time
- breadcrumbs_edge_dns_lookup_time
- breadcrumbs_edge_asn
- breadcrumbs_origin_request_end_time
- breadcrumbs_origin_turn_around_time
- breadcrumbs_origin_dns_lookup_time
- breadcrumbs_origin_asn
- breadcrumbs_peer_request_end_time
- breadcrumbs_peer_turn_around_time
- breadcrumbs_peer_dns_lookup_time
- breadcrumbs_peer_asn
- breadcrumbs_cloud_wrapper_request_end_time
- breadcrumbs_cloud_wrapper_turn_around_time
- breadcrumbs_cloud_wrapper_dns_lookup_time
- breadcrumbs_cloud_wrapper_asn
type: uint32
"#;

parse(&Content::Yaml(pipeline_yaml.into())).unwrap()
}

fn criterion_benchmark(c: &mut Criterion) {
let input_value_str = include_str!("./data.log");
let input_value = Deserializer::from_str(input_value_str)
.into_iter::<serde_json::Value>()
.collect::<Result<Vec<_>, _>>()
.unwrap();
let pipeline = prepare_pipeline();
let mut group = c.benchmark_group("pipeline");
group.sample_size(50);
group.bench_function("processor map", |b| {
b.iter(|| processor_map(black_box(&pipeline), black_box(input_value.clone())))
});
group.bench_function("processor mut", |b| {
b.iter(|| processor_mut(black_box(&pipeline), black_box(input_value.clone())))
});
group.finish();
}

// Testing the pipeline's performance in converting Json to Rows
criterion_group!(benches, criterion_benchmark);
paomian marked this conversation as resolved.
Show resolved Hide resolved
criterion_main!(benches);
Loading