Skip to content

Commit

Permalink
Feature: full log entries and included in model.
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevan committed Jun 21, 2024
1 parent 276585d commit c3f7510
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 51 deletions.
2 changes: 1 addition & 1 deletion collector/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ type collector struct {
curlogfile string
csvlog bool
logSpan uint
currLog logEntry
currLog pgmetrics.LogEntry
rxPrefix *regexp.Regexp
}

Expand Down
95 changes: 46 additions & 49 deletions collector/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,47 +231,33 @@ func (c *collector) readLogLinesCSV(filename string) error {
if len(record) >= 26 {
qid, _ = strconv.ParseInt(record[25], 10, 64)
}
c.currLog = logEntry{
t: t,
user: record[1],
db: record[2],
qid: qid,
level: record[11],
line: record[13],
c.currLog = pgmetrics.LogEntry{
At: t.Unix(),
AtFull: t.In(time.UTC).Format(time.RFC3339Nano),
UserName: record[1],
DBName: record[2],
QueryID: qid,
Level: record[11],
Line: record[13],
}
if d := record[14]; len(d) > 0 {
c.currLog.extra = []logEntryExtra{{level: "DETAIL", line: d}}
c.currLog.Extra = []pgmetrics.LogEntryExtra{{Level: "DETAIL", Line: d}}
}
c.processLogEntry()
}
}

var severities = []string{"DEBUG", "LOG", "INFO", "NOTICE", "WARNING", "ERROR", "FATAL", "PANIC"}

type logEntry struct {
t time.Time
user string
db string
qid int64
level string
line string
extra []logEntryExtra
}

func (l *logEntry) get(level string) string {
for _, e := range l.extra {
if e.level == level {
return e.line
func getExtra(l *pgmetrics.LogEntry, level string) string {
for _, e := range l.Extra {
if e.Level == level {
return e.Line
}
}
return ""
}

type logEntryExtra struct {
level string
line string
}

func (c *collector) processLogLine(first bool, t time.Time, user, db string,
qid int64, level, line string) {

Expand All @@ -290,45 +276,53 @@ func (c *collector) processLogLine(first bool, t time.Time, user, db string,
c.processLogEntry()
}
// start new entry
c.currLog = logEntry{
t: t,
user: user,
db: db,
qid: qid,
level: level,
line: line,
extra: nil,
c.currLog = pgmetrics.LogEntry{
At: t.Unix(),
AtFull: t.In(time.UTC).Format(time.RFC3339Nano),
UserName: user,
DBName: db,
QueryID: qid,
Level: level,
Line: line,
}
} else {
// add to extra
c.currLog.extra = append(c.currLog.extra, logEntryExtra{level: level, line: line})
c.currLog.Extra = append(c.currLog.Extra, pgmetrics.LogEntryExtra{
Level: level,
Line: line,
})
}
}

func (c *collector) processLogEntry() {
//log.Printf("debug: got log entry %+v", c.currLog)
if sm := rxAEStart.FindStringSubmatch(c.currLog.line); sm != nil {
if sm := rxAEStart.FindStringSubmatch(c.currLog.Line); sm != nil {
c.processAE(sm)
} else if sm := rxAVStart.FindStringSubmatch(c.currLog.line); sm != nil {
} else if sm := rxAVStart.FindStringSubmatch(c.currLog.Line); sm != nil {
c.processAV(sm)
} else if c.currLog.line == "deadlock detected" {
} else if c.currLog.Line == "deadlock detected" {
c.processDeadlock()
}

// add it to raw log lines
e2 := c.currLog
e2.Extra = append([]pgmetrics.LogEntryExtra{}, c.currLog.Extra...)
c.result.LogEntries = append(c.result.LogEntries, e2)
}

func (c *collector) processAE(sm []string) {
e := c.currLog
p := pgmetrics.Plan{
Database: e.db,
UserName: e.user,
Database: e.DBName,
UserName: e.UserName,
Format: "text",
At: e.t.Unix(),
QueryID: e.qid,
At: e.At,
QueryID: e.QueryID,
}
switch {
case len(sm[1]) > 0:
p.Format = "json"
if parts := strings.SplitN(e.line, "\n", 2); len(parts) == 2 { // has to be 2
if parts := strings.SplitN(e.Line, "\n", 2); len(parts) == 2 { // has to be 2
var obj map[string]interface{}
if err := json.Unmarshal([]byte(parts[1]), &obj); err == nil {
// extract the query and remove it out
Expand All @@ -350,7 +344,7 @@ func (c *collector) processAE(sm []string) {
case len(sm[4]) > 0:
p.Format = "text"
var sp *string = nil
for _, l := range strings.Split(e.line, "\n") {
for _, l := range strings.Split(e.Line, "\n") {
if sm := rxAESwitch1.FindStringSubmatch(l); sm != nil {
p.Query = sm[1]
sp = &p.Query
Expand All @@ -372,22 +366,25 @@ func (c *collector) processAV(sm []string) {
if len(sm) != 4 {
return
}
sm2 := rxAVElapsed.FindStringSubmatch(e.line)
sm2 := rxAVElapsed.FindStringSubmatch(e.Line)
if len(sm2) != 2 {
return
}
elapsed, _ := strconv.ParseFloat(sm2[1], 64)
c.result.AutoVacuums = append(c.result.AutoVacuums, pgmetrics.AutoVacuum{
At: e.t.Unix(),
At: e.At,
Table: sm[3],
Elapsed: elapsed,
})
}

func (c *collector) processDeadlock() {
e := c.currLog
text := strings.ReplaceAll(e.get("DETAIL"), "\t", "") + "\n"
c.result.Deadlocks = append(c.result.Deadlocks, pgmetrics.Deadlock{At: e.t.Unix(), Detail: text})
text := strings.ReplaceAll(getExtra(&e, "DETAIL"), "\t", "") + "\n"
c.result.Deadlocks = append(c.result.Deadlocks, pgmetrics.Deadlock{
At: e.At,
Detail: text,
})
}

//------------------------------------------------------------------------------
Expand Down
30 changes: 29 additions & 1 deletion model.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pgmetrics
// ModelSchemaVersion is the schema version of the "Model" data structure
// defined below. It is in the "semver" notation. Version history:
//
// 1.17 - Raw log entries
// 1.16 - Postgres 16 support
// 1.15 - Pgpool ReplicationDelaySeconds
// 1.14 - PgBouncer 1.19, Pgpool support
Expand All @@ -37,7 +38,7 @@ package pgmetrics
// 1.2 - more table and index attributes
// 1.1 - added NotificationQueueUsage and Statements
// 1.0 - initial release
const ModelSchemaVersion = "1.16"
const ModelSchemaVersion = "1.17"

// Model contains the entire information collected by a single run of
// pgmetrics. It can be converted to and from json without loss of
Expand Down Expand Up @@ -180,6 +181,11 @@ type Model struct {

// metrics from Pgpool
Pgpool *Pgpool `json:"pgpool,omitempty"`

// following fields are present only in schema 1.17 and later

// raw log entries during specified time span
LogEntries []LogEntry `json:"log_entries,omitempty"`
}

// DatabaseByOID iterates over the databases in the model and returns the reference
Expand Down Expand Up @@ -1046,3 +1052,25 @@ type PgpoolQueryCache struct {
FreeCacheEntriesSize int64 `json:"free_cache_entries_size"`
FragmentCacheEntriesSize int64 `json:"fragment_cache_entries_size"`
}

// LogEntry contains one single log entry from the log file. What fields are
// filled in depends on the log_line_prefix setting. Timestamp will always be
// present.
// Added in schema 1.17.
type LogEntry struct {
At int64 `json:"at"`
AtFull string `json:"atfull"` // time, in RFC3339 format, tz will be UTC
UserName string `json:"user,omitempty"`
DBName string `json:"db_name,omitempty"`
QueryID int64 `json:"queryid,omitempty"`
Level string `json:"level,omitempty"`
Line string `json:"line,omitempty"`
Extra []LogEntryExtra `json:"extra,omitempty"`
}

// LogEntryExtra contains lines that appear after the first line in a
// multi-line log entry.
type LogEntryExtra struct {
Level string `json:"level,omitempty"`
Line string `json:"line,omitempty"`
}

0 comments on commit c3f7510

Please sign in to comment.