- have minimal numbers of clients, e.g. 1-10 incoming connections
- a client usually represent one app+vhost.
- a client usually log in blocking mode and it's critical to keep receiving logs even when there are problems
- a normal log record is about 200 bytes including syslog headers, but could be as large as 50KB for error dumps
- massive amounts of incoming garbage could happen and they need to be dropped fast
- Input rate of logs from a single labelSet/keySet could be up to 100K logs per second, for several hours
- In some cases, such logs are almost entirely thrown away, making disk buffering unsuitable
- Most of thrown-away logs cannot be easily dropped before some processing
- A typical server has 50 or more cores, but each core is limited in speed, e.g. 2GHz
- Sensitive information in logs needs to be redacted
Functionality:
- Scripting is slow due to the interface between host language and script language. CGO makes it even slower.
- Without scripting, existing implementations are too limited to be useful
Performance:
- High overhead of each step, for example msgpack serialization in fluent-bit
- Limited to single core (fluent-bit) or the numbers of cores utilized are dependent on steps (vector)
Buffering:
- Disk buffering doesn't work for infinite backpressure, e.g. entire files in memory or open fd
- Lack of hybrid buffering to reduce unnecessary disk IO time for most cases
Input constructs just enough fields for orchestration, which then distributes logs to different pipelines based on key-fields. Such pipelines are meant for both of parallelization and fair processing (more of the latter), to ensure for example error logs still get through while the system is busy handling tons of debug logs.
LogInput: TCPListener, LogParser, LogTransform(s)
------------------------------------------------------------------------------
listen => (conn 1) => parse syslog => extraction transforms
=> (conn 2) [ join multiline ] => parse syslog => extraction transforms
=> (conn 3) [ join multiline ] => parse syslog => extraction transforms
Orchestrator => pipeline(s)
---------------------------------------------------
=> distriute by keySet => (ex: vhost-foo, info)
=> (ex: vhost-foo, warn)
=> (ex: vhost-bar, info)
Inside pipelines:
LogProcessingWorker (per pipeline) ChunkBufferer ChunkConsumer
---------------------------------------------------- -------------------- ---------------
=> (keySet 1) transform => serialize => compress/pack => buffer / persistence => forward & retry
=> (keySet 2) ...
After transformations, logs are immediately serialized and compressed by output, and then buffered by HybridBufferer
,
before being fed into the input channel of FluentdForwardClient
. The fluentd forward messages to be sent by output
client are already finalized before the buffering stage or saving to disk (when needed) - which also means the output
messages are pre-compressed.
If multiple outputs become necessary in future, it'd be up to LogProcessingWorker
to duplicate the results of
transformation for different output routes.
Parsing: costly timestamp parsing is moved to transform, since dropped logs don't need timestamps.
Transform: field inlining and syslog unescaping are moved to rewriters in serialization to reduce allocation and copying, since no subsquent transform depends on such transform results. Moving unescape breaks escaped characters around sanitization.
Buffer: log chunks are serialized and compressed to final output form before buffering or persistence to save space and CPU. The format is decided by the output type and multiple output types would require multiple chunks for the same logs.
Output: Fluentd tag creation is moved to orchestration because tags are normally identical for logs under the same keySet.
- Every stage of processing should have a pure function call version and a worker based version, for performance profiling.
- DO NOT use regular expression. If needed, check ragel
- DO NOT use built-in compression - too slow. Always use klauspost/compress
- Per-record operations are hottest paths and require special handling:
- hashmap lookup must be reduced to minimum, e.g. no getting/setting fields in hashmap, except
ByKeySetOrchestrator
- atomic, locks and metric updates are avoided whenever possible.
- ... including pooling of simple objects and small buffers, which uses locks internally.
- hashmap lookup must be reduced to minimum, e.g. no getting/setting fields in hashmap, except
- Buffers are reused whenever possible
- Complex objects are pooled whenever possible
- Strings are created directly from
[]byte
byutil.StringFromBytes
whenever possible (not copying bystring()
) - Due to resource pooling, the following objects need to be used carefully:
- LogRecord's field values are temporary strings pointing to a backing buffer which is only valid until a record is released. The string values must be copied if stored or used for e.g. permanent hashmap keys.
- Pass records to channels one by one: Go channels are not that efficient
- One worker per stage: overhead due to inefficiency in channels and scheduling (possibly CPU cache).
- Per-connection orchestration and pipelines (simple & lock-free): a client may close a new connection but the existing pipelines need to keep sending chunks, and new connection could be opened while the old pipelines are still alive.