Skip to content

Commit

Permalink
Optimize postgresql ingest pipeline (#7269) (#7344)
Browse files Browse the repository at this point in the history
The postgresql ingest pipeline was not performing so well.
This PR use the following rules to improve the situation.

- Anchor the Regular expression at the begining of the string.
- Merge the multiple statements into a single RE
- Do not use back reference for user/host delimiter.

Fixes: #7201

Master branch to ingest 18 documents 26ms, this PR 11 ms.
Not I did not use dissect, since there are variations in the tokens.

(cherry picked from commit 4a41587)
  • Loading branch information
ph authored and ruflin committed Jun 18, 2018
1 parent 56307cd commit 385cbb7
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ https://github.com/elastic/beats/compare/v6.3.0...6.3[Check the HEAD diff]
*Filebeat*

- Comply with PostgreSQL database name format {pull}7198[7198]
- Optimize PostgreSQL ingest pipeline to use anchored regexp and merge multiple regexp into a single expression. {pull}7269[7269]

*Heartbeat*

Expand Down
9 changes: 2 additions & 7 deletions filebeat/module/postgresql/log/ingest/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,11 @@
"field": "message",
"ignore_missing": true,
"patterns": [
"%{LOCALDATETIME:postgresql.log.timestamp} %{WORD:postgresql.log.timezone} \\[%{NUMBER:postgresql.log.thread_id}\\] %{USERNAME:postgresql.log.user}@%{POSTGRESQL_DB_NAME:postgresql.log.database} %{WORD:postgresql.log.level}: duration: %{NUMBER:postgresql.log.duration} ms statement: %{MULTILINEQUERY:postgresql.log.query}",
"%{LOCALDATETIME:postgresql.log.timestamp} %{WORD:postgresql.log.timezone} \\[%{NUMBER:postgresql.log.thread_id}\\] \\[%{USERNAME:postgresql.log.user}\\]@\\[%{POSTGRESQL_DB_NAME:postgresql.log.database}\\] %{WORD:postgresql.log.level}: duration: %{NUMBER:postgresql.log.duration} ms statement: %{MULTILINEQUERY:postgresql.log.query}",
"%{LOCALDATETIME:postgresql.log.timestamp} %{WORD:postgresql.log.timezone} \\[%{NUMBER:postgresql.log.thread_id}\\] %{USERNAME:postgresql.log.user}@%{POSTGRESQL_DB_NAME:postgresql.log.database} %{WORD:postgresql.log.level}: ?%{GREEDYDATA:postgresql.log.message}",
"%{LOCALDATETIME:postgresql.log.timestamp} %{WORD:postgresql.log.timezone} \\[%{NUMBER:postgresql.log.thread_id}\\] \\[%{USERNAME:postgresql.log.user}\\]@\\[%{POSTGRESQL_DB_NAME:postgresql.log.database}\\] %{WORD:postgresql.log.level}: ?%{GREEDYDATA:postgresql.log.message}",
"%{LOCALDATETIME:postgresql.log.timestamp} %{WORD:postgresql.log.timezone} \\[%{NUMBER:postgresql.log.thread_id}\\] %{WORD:postgresql.log.level}: ?%{GREEDYDATA:postgresql.log.message}"
"^%{LOCALDATETIME:postgresql.log.timestamp} %{WORD:postgresql.log.timezone} \\[%{NUMBER:postgresql.log.thread_id}\\] ((\\[%{USERNAME:postgresql.log.user}\\]@\\[%{POSTGRESQL_DB_NAME:postgresql.log.database}\\]|%{USERNAME:postgresql.log.user}@%{POSTGRESQL_DB_NAME:postgresql.log.database}) )?%{WORD:postgresql.log.level}: (duration: %{NUMBER:postgresql.log.duration} ms statement: %{GREEDYDATA:postgresql.log.query}|%{GREEDYDATA:postgresql.log.message})"
],
"pattern_definitions": {
"LOCALDATETIME": "[-0-9]+ %{TIME}",
"GREEDYDATA": ".*",
"MULTILINEQUERY" : "(.|\n|\t)*?;$",
"GREEDYDATA": "(.|\n|\t)*",
"POSTGRESQL_DB_NAME": "[a-zA-Z0-9_]+[a-zA-Z0-9_\\$]*"
}
}
Expand Down
21 changes: 15 additions & 6 deletions filebeat/tests/system/test_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def _test_expected_events(self, module, test_file, res, objects):
break

assert found, "The following expected object was not found:\n {}\nSearched in: \n{}".format(
ev["_source"][module], objects)
pretty_json(ev["_source"][module]), pretty_json(objects))

def run_on_file(self, module, fileset, test_file, cfgfile):
print("Testing {}/{} on {}".format(module, fileset, test_file))
Expand All @@ -108,7 +108,8 @@ def run_on_file(self, module, fileset, test_file, cfgfile):
"-c", cfgfile,
"-modules={}".format(module),
"-M", "{module}.*.enabled=false".format(module=module),
"-M", "{module}.{fileset}.enabled=true".format(module=module, fileset=fileset),
"-M", "{module}.{fileset}.enabled=true".format(
module=module, fileset=fileset),
"-M", "{module}.{fileset}.var.paths=[{test_file}]".format(
module=module, fileset=fileset, test_file=test_file),
"-M", "*.*.input.close_eof=true",
Expand Down Expand Up @@ -138,7 +139,8 @@ def run_on_file(self, module, fileset, test_file, cfgfile):
assert obj["fileset"]["module"] == module, "expected fileset.module={} but got {}".format(
module, obj["fileset"]["module"])

assert "error" not in obj, "not error expected but got: {}".format(obj)
assert "error" not in obj, "not error expected but got: {}".format(
obj)

if (module == "auditd" and fileset == "log") \
or (module == "osquery" and fileset == "result"):
Expand Down Expand Up @@ -229,13 +231,16 @@ def _run_ml_test(self, setup_flag, modules_flag):
# Clean any previous state
for df in self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")["datafeeds"]:
if df["datafeed_id"] == 'filebeat-nginx-access-response_code':
self.es.transport.perform_request("DELETE", "/_xpack/ml/datafeeds/" + df["datafeed_id"])
self.es.transport.perform_request(
"DELETE", "/_xpack/ml/datafeeds/" + df["datafeed_id"])

for df in self.es.transport.perform_request("GET", "/_xpack/ml/anomaly_detectors/")["jobs"]:
if df["job_id"] == 'datafeed-filebeat-nginx-access-response_code':
self.es.transport.perform_request("DELETE", "/_xpack/ml/anomaly_detectors/" + df["job_id"])
self.es.transport.perform_request(
"DELETE", "/_xpack/ml/anomaly_detectors/" + df["job_id"])

shutil.rmtree(os.path.join(self.working_dir, "modules.d"), ignore_errors=True)
shutil.rmtree(os.path.join(self.working_dir,
"modules.d"), ignore_errors=True)

# generate a minimal configuration
cfgfile = os.path.join(self.working_dir, "filebeat.yml")
Expand Down Expand Up @@ -302,3 +307,7 @@ def _run_ml_test(self, setup_flag, modules_flag):
max_timeout=30)

beat.kill()


def pretty_json(obj):
return json.dumps(obj, indent=2, separators=(',', ': '))

0 comments on commit 385cbb7

Please sign in to comment.