Skip to content

Commit

Permalink
Optimize postgresql ingest pipeline (#7269)
Browse files Browse the repository at this point in the history
The postgresql ingest pipeline was not performing so well.
This PR use the following rules to improve the situation.

- Anchor the Regular expression at the begining of the string.
- Merge the multiple statements into a single RE
- Do not use back reference for user/host delimiter.

Fixes: #7201

## Note to reviewers:

Master branch to ingest 18 documents 26ms, this PR 11 ms.
Not I did not use dissect, since there are variations in the tokens.
  • Loading branch information
ph authored and ruflin committed Jun 12, 2018
1 parent 434c567 commit 4a41587
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
9 changes: 2 additions & 7 deletions filebeat/module/postgresql/log/ingest/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,11 @@
"field": "message",
"ignore_missing": true,
"patterns": [
"%{LOCALDATETIME:postgresql.log.timestamp} %{WORD:postgresql.log.timezone} \\[%{NUMBER:postgresql.log.thread_id}\\] %{USERNAME:postgresql.log.user}@%{POSTGRESQL_DB_NAME:postgresql.log.database} %{WORD:postgresql.log.level}: duration: %{NUMBER:postgresql.log.duration} ms statement: %{MULTILINEQUERY:postgresql.log.query}",
"%{LOCALDATETIME:postgresql.log.timestamp} %{WORD:postgresql.log.timezone} \\[%{NUMBER:postgresql.log.thread_id}\\] \\[%{USERNAME:postgresql.log.user}\\]@\\[%{POSTGRESQL_DB_NAME:postgresql.log.database}\\] %{WORD:postgresql.log.level}: duration: %{NUMBER:postgresql.log.duration} ms statement: %{MULTILINEQUERY:postgresql.log.query}",
"%{LOCALDATETIME:postgresql.log.timestamp} %{WORD:postgresql.log.timezone} \\[%{NUMBER:postgresql.log.thread_id}\\] %{USERNAME:postgresql.log.user}@%{POSTGRESQL_DB_NAME:postgresql.log.database} %{WORD:postgresql.log.level}: ?%{GREEDYDATA:postgresql.log.message}",
"%{LOCALDATETIME:postgresql.log.timestamp} %{WORD:postgresql.log.timezone} \\[%{NUMBER:postgresql.log.thread_id}\\] \\[%{USERNAME:postgresql.log.user}\\]@\\[%{POSTGRESQL_DB_NAME:postgresql.log.database}\\] %{WORD:postgresql.log.level}: ?%{GREEDYDATA:postgresql.log.message}",
"%{LOCALDATETIME:postgresql.log.timestamp} %{WORD:postgresql.log.timezone} \\[%{NUMBER:postgresql.log.thread_id}\\] %{WORD:postgresql.log.level}: ?%{GREEDYDATA:postgresql.log.message}"
"^%{LOCALDATETIME:postgresql.log.timestamp} %{WORD:postgresql.log.timezone} \\[%{NUMBER:postgresql.log.thread_id}\\] ((\\[%{USERNAME:postgresql.log.user}\\]@\\[%{POSTGRESQL_DB_NAME:postgresql.log.database}\\]|%{USERNAME:postgresql.log.user}@%{POSTGRESQL_DB_NAME:postgresql.log.database}) )?%{WORD:postgresql.log.level}: (duration: %{NUMBER:postgresql.log.duration} ms statement: %{GREEDYDATA:postgresql.log.query}|%{GREEDYDATA:postgresql.log.message})"
],
"pattern_definitions": {
"LOCALDATETIME": "[-0-9]+ %{TIME}",
"GREEDYDATA": ".*",
"MULTILINEQUERY" : "(.|\n|\t)*?;$",
"GREEDYDATA": "(.|\n|\t)*",
"POSTGRESQL_DB_NAME": "[a-zA-Z0-9_]+[a-zA-Z0-9_\\$]*"
}
}
Expand Down
21 changes: 15 additions & 6 deletions filebeat/tests/system/test_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def _test_expected_events(self, module, test_file, res, objects):
break

assert found, "The following expected object was not found:\n {}\nSearched in: \n{}".format(
ev["_source"][module], objects)
pretty_json(ev["_source"][module]), pretty_json(objects))

def run_on_file(self, module, fileset, test_file, cfgfile):
print("Testing {}/{} on {}".format(module, fileset, test_file))
Expand All @@ -131,7 +131,8 @@ def run_on_file(self, module, fileset, test_file, cfgfile):
"-c", cfgfile,
"-modules={}".format(module),
"-M", "{module}.*.enabled=false".format(module=module),
"-M", "{module}.{fileset}.enabled=true".format(module=module, fileset=fileset),
"-M", "{module}.{fileset}.enabled=true".format(
module=module, fileset=fileset),
"-M", "{module}.{fileset}.var.paths=[{test_file}]".format(
module=module, fileset=fileset, test_file=test_file),
"-M", "*.*.input.close_eof=true",
Expand All @@ -158,7 +159,8 @@ def run_on_file(self, module, fileset, test_file, cfgfile):
assert obj["fileset"]["module"] == module, "expected fileset.module={} but got {}".format(
module, obj["fileset"]["module"])

assert "error" not in obj, "not error expected but got: {}".format(obj)
assert "error" not in obj, "not error expected but got: {}".format(
obj)

if (module == "auditd" and fileset == "log") \
or (module == "osquery" and fileset == "result"):
Expand Down Expand Up @@ -251,13 +253,16 @@ def _run_ml_test(self, setup_flag, modules_flag):
# Clean any previous state
for df in self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")["datafeeds"]:
if df["datafeed_id"] == 'filebeat-nginx-access-response_code':
self.es.transport.perform_request("DELETE", "/_xpack/ml/datafeeds/" + df["datafeed_id"])
self.es.transport.perform_request(
"DELETE", "/_xpack/ml/datafeeds/" + df["datafeed_id"])

for df in self.es.transport.perform_request("GET", "/_xpack/ml/anomaly_detectors/")["jobs"]:
if df["job_id"] == 'datafeed-filebeat-nginx-access-response_code':
self.es.transport.perform_request("DELETE", "/_xpack/ml/anomaly_detectors/" + df["job_id"])
self.es.transport.perform_request(
"DELETE", "/_xpack/ml/anomaly_detectors/" + df["job_id"])

shutil.rmtree(os.path.join(self.working_dir, "modules.d"), ignore_errors=True)
shutil.rmtree(os.path.join(self.working_dir,
"modules.d"), ignore_errors=True)

# generate a minimal configuration
cfgfile = os.path.join(self.working_dir, "filebeat.yml")
Expand Down Expand Up @@ -324,3 +329,7 @@ def _run_ml_test(self, setup_flag, modules_flag):
max_timeout=30)

beat.kill()


def pretty_json(obj):
return json.dumps(obj, indent=2, separators=(',', ': '))

0 comments on commit 4a41587

Please sign in to comment.