From c3f75108aa9c5b43f0aaa527737d41f97a7898e7 Mon Sep 17 00:00:00 2001 From: Mahadevan Date: Fri, 21 Jun 2024 10:45:08 +0530 Subject: [PATCH] Feature: full log entries and included in model. --- collector/collect.go | 2 +- collector/log.go | 95 +++++++++++++++++++++----------------------- model.go | 30 +++++++++++++- 3 files changed, 76 insertions(+), 51 deletions(-) diff --git a/collector/collect.go b/collector/collect.go index b758a03..4a590b3 100644 --- a/collector/collect.go +++ b/collector/collect.go @@ -322,7 +322,7 @@ type collector struct { curlogfile string csvlog bool logSpan uint - currLog logEntry + currLog pgmetrics.LogEntry rxPrefix *regexp.Regexp } diff --git a/collector/log.go b/collector/log.go index 05f45d3..e8e060f 100644 --- a/collector/log.go +++ b/collector/log.go @@ -231,16 +231,17 @@ func (c *collector) readLogLinesCSV(filename string) error { if len(record) >= 26 { qid, _ = strconv.ParseInt(record[25], 10, 64) } - c.currLog = logEntry{ - t: t, - user: record[1], - db: record[2], - qid: qid, - level: record[11], - line: record[13], + c.currLog = pgmetrics.LogEntry{ + At: t.Unix(), + AtFull: t.In(time.UTC).Format(time.RFC3339Nano), + UserName: record[1], + DBName: record[2], + QueryID: qid, + Level: record[11], + Line: record[13], } if d := record[14]; len(d) > 0 { - c.currLog.extra = []logEntryExtra{{level: "DETAIL", line: d}} + c.currLog.Extra = []pgmetrics.LogEntryExtra{{Level: "DETAIL", Line: d}} } c.processLogEntry() } @@ -248,30 +249,15 @@ func (c *collector) readLogLinesCSV(filename string) error { var severities = []string{"DEBUG", "LOG", "INFO", "NOTICE", "WARNING", "ERROR", "FATAL", "PANIC"} -type logEntry struct { - t time.Time - user string - db string - qid int64 - level string - line string - extra []logEntryExtra -} - -func (l *logEntry) get(level string) string { - for _, e := range l.extra { - if e.level == level { - return e.line +func getExtra(l *pgmetrics.LogEntry, level string) string { + for _, e := range l.Extra { + if e.Level == level { + return e.Line } } return "" } -type logEntryExtra struct { - level string - line string -} - func (c *collector) processLogLine(first bool, t time.Time, user, db string, qid int64, level, line string) { @@ -290,45 +276,53 @@ func (c *collector) processLogLine(first bool, t time.Time, user, db string, c.processLogEntry() } // start new entry - c.currLog = logEntry{ - t: t, - user: user, - db: db, - qid: qid, - level: level, - line: line, - extra: nil, + c.currLog = pgmetrics.LogEntry{ + At: t.Unix(), + AtFull: t.In(time.UTC).Format(time.RFC3339Nano), + UserName: user, + DBName: db, + QueryID: qid, + Level: level, + Line: line, } } else { // add to extra - c.currLog.extra = append(c.currLog.extra, logEntryExtra{level: level, line: line}) + c.currLog.Extra = append(c.currLog.Extra, pgmetrics.LogEntryExtra{ + Level: level, + Line: line, + }) } } func (c *collector) processLogEntry() { //log.Printf("debug: got log entry %+v", c.currLog) - if sm := rxAEStart.FindStringSubmatch(c.currLog.line); sm != nil { + if sm := rxAEStart.FindStringSubmatch(c.currLog.Line); sm != nil { c.processAE(sm) - } else if sm := rxAVStart.FindStringSubmatch(c.currLog.line); sm != nil { + } else if sm := rxAVStart.FindStringSubmatch(c.currLog.Line); sm != nil { c.processAV(sm) - } else if c.currLog.line == "deadlock detected" { + } else if c.currLog.Line == "deadlock detected" { c.processDeadlock() } + + // add it to raw log lines + e2 := c.currLog + e2.Extra = append([]pgmetrics.LogEntryExtra{}, c.currLog.Extra...) + c.result.LogEntries = append(c.result.LogEntries, e2) } func (c *collector) processAE(sm []string) { e := c.currLog p := pgmetrics.Plan{ - Database: e.db, - UserName: e.user, + Database: e.DBName, + UserName: e.UserName, Format: "text", - At: e.t.Unix(), - QueryID: e.qid, + At: e.At, + QueryID: e.QueryID, } switch { case len(sm[1]) > 0: p.Format = "json" - if parts := strings.SplitN(e.line, "\n", 2); len(parts) == 2 { // has to be 2 + if parts := strings.SplitN(e.Line, "\n", 2); len(parts) == 2 { // has to be 2 var obj map[string]interface{} if err := json.Unmarshal([]byte(parts[1]), &obj); err == nil { // extract the query and remove it out @@ -350,7 +344,7 @@ func (c *collector) processAE(sm []string) { case len(sm[4]) > 0: p.Format = "text" var sp *string = nil - for _, l := range strings.Split(e.line, "\n") { + for _, l := range strings.Split(e.Line, "\n") { if sm := rxAESwitch1.FindStringSubmatch(l); sm != nil { p.Query = sm[1] sp = &p.Query @@ -372,13 +366,13 @@ func (c *collector) processAV(sm []string) { if len(sm) != 4 { return } - sm2 := rxAVElapsed.FindStringSubmatch(e.line) + sm2 := rxAVElapsed.FindStringSubmatch(e.Line) if len(sm2) != 2 { return } elapsed, _ := strconv.ParseFloat(sm2[1], 64) c.result.AutoVacuums = append(c.result.AutoVacuums, pgmetrics.AutoVacuum{ - At: e.t.Unix(), + At: e.At, Table: sm[3], Elapsed: elapsed, }) @@ -386,8 +380,11 @@ func (c *collector) processAV(sm []string) { func (c *collector) processDeadlock() { e := c.currLog - text := strings.ReplaceAll(e.get("DETAIL"), "\t", "") + "\n" - c.result.Deadlocks = append(c.result.Deadlocks, pgmetrics.Deadlock{At: e.t.Unix(), Detail: text}) + text := strings.ReplaceAll(getExtra(&e, "DETAIL"), "\t", "") + "\n" + c.result.Deadlocks = append(c.result.Deadlocks, pgmetrics.Deadlock{ + At: e.At, + Detail: text, + }) } //------------------------------------------------------------------------------ diff --git a/model.go b/model.go index ee32787..f049809 100644 --- a/model.go +++ b/model.go @@ -19,6 +19,7 @@ package pgmetrics // ModelSchemaVersion is the schema version of the "Model" data structure // defined below. It is in the "semver" notation. Version history: // +// 1.17 - Raw log entries // 1.16 - Postgres 16 support // 1.15 - Pgpool ReplicationDelaySeconds // 1.14 - PgBouncer 1.19, Pgpool support @@ -37,7 +38,7 @@ package pgmetrics // 1.2 - more table and index attributes // 1.1 - added NotificationQueueUsage and Statements // 1.0 - initial release -const ModelSchemaVersion = "1.16" +const ModelSchemaVersion = "1.17" // Model contains the entire information collected by a single run of // pgmetrics. It can be converted to and from json without loss of @@ -180,6 +181,11 @@ type Model struct { // metrics from Pgpool Pgpool *Pgpool `json:"pgpool,omitempty"` + + // following fields are present only in schema 1.17 and later + + // raw log entries during specified time span + LogEntries []LogEntry `json:"log_entries,omitempty"` } // DatabaseByOID iterates over the databases in the model and returns the reference @@ -1046,3 +1052,25 @@ type PgpoolQueryCache struct { FreeCacheEntriesSize int64 `json:"free_cache_entries_size"` FragmentCacheEntriesSize int64 `json:"fragment_cache_entries_size"` } + +// LogEntry contains one single log entry from the log file. What fields are +// filled in depends on the log_line_prefix setting. Timestamp will always be +// present. +// Added in schema 1.17. +type LogEntry struct { + At int64 `json:"at"` + AtFull string `json:"atfull"` // time, in RFC3339 format, tz will be UTC + UserName string `json:"user,omitempty"` + DBName string `json:"db_name,omitempty"` + QueryID int64 `json:"queryid,omitempty"` + Level string `json:"level,omitempty"` + Line string `json:"line,omitempty"` + Extra []LogEntryExtra `json:"extra,omitempty"` +} + +// LogEntryExtra contains lines that appear after the first line in a +// multi-line log entry. +type LogEntryExtra struct { + Level string `json:"level,omitempty"` + Line string `json:"line,omitempty"` +}