Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support non-colorized log output, logging refactor #420

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 33 additions & 12 deletions cmd/syncv3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ package main
import (
"flag"
"fmt"
"log"
"github.com/getsentry/sentry-go"
sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/pressly/goose/v3"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"net/http"
_ "net/http/pprof"
"os"
Expand All @@ -14,13 +20,6 @@ import (
"syscall"
"time"

"github.com/getsentry/sentry-go"
sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/pressly/goose/v3"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

syncv3 "github.com/matrix-org/sliding-sync"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sync2"
Expand Down Expand Up @@ -52,6 +51,7 @@ const (
EnvOTLPPassword = "SYNCV3_OTLP_PASSWORD"
EnvSentryDsn = "SYNCV3_SENTRY_DSN"
EnvLogLevel = "SYNCV3_LOG_LEVEL"
EnvPlainOutput = "SYNCV3_PLAIN_OUTPUT"
EnvMaxConns = "SYNCV3_MAX_DB_CONN"
EnvIdleTimeoutSecs = "SYNCV3_DB_IDLE_TIMEOUT_SECS"
EnvHTTPTimeoutSecs = "SYNCV3_HTTP_TIMEOUT_SECS"
Expand All @@ -74,11 +74,12 @@ Environment var
%s Default: unset. The Sentry DSN to report events to e.g https://sliding-sync@sentry.example.com/123 - if unset does not send sentry events.
%s Default: info. The level of verbosity for messages logged. Available values are trace, debug, info, warn, error and fatal
%s Default: unset. Max database connections to use when communicating with postgres. Unset or 0 means no limit.
%s Default: unset. Disable colorized output (for cleaner text logging). If set to 1, will output plain text.
%s Default: 3600. The maximum amount of time a database connection may be idle, in seconds. 0 means no limit.
%s Default: 300. The timeout in seconds for normal HTTP requests.
%s Default: 1800. The timeout in seconds for initial sync requests.
`, EnvServer, EnvDB, EnvSecret, EnvBindAddr, EnvTLSCert, EnvTLSKey, EnvPPROF, EnvPrometheus, EnvOTLP, EnvOTLPUsername, EnvOTLPPassword,
EnvSentryDsn, EnvLogLevel, EnvMaxConns, EnvIdleTimeoutSecs, EnvHTTPTimeoutSecs, EnvHTTPInitialTimeoutSecs)
EnvSentryDsn, EnvLogLevel, EnvMaxConns, EnvPlainOutput, EnvIdleTimeoutSecs, EnvHTTPTimeoutSecs, EnvHTTPInitialTimeoutSecs)

func defaulting(in, dft string) string {
if in == "" {
Expand Down Expand Up @@ -113,6 +114,7 @@ func main() {
EnvSentryDsn: os.Getenv(EnvSentryDsn),
EnvLogLevel: os.Getenv(EnvLogLevel),
EnvMaxConns: defaulting(os.Getenv(EnvMaxConns), "0"),
EnvPlainOutput: defaulting(os.Getenv(EnvPlainOutput), "0"),
EnvIdleTimeoutSecs: defaulting(os.Getenv(EnvIdleTimeoutSecs), "3600"),
EnvHTTPTimeoutSecs: defaulting(os.Getenv(EnvHTTPTimeoutSecs), "300"),
EnvHTTPInitialTimeoutSecs: defaulting(os.Getenv(EnvHTTPInitialTimeoutSecs), "1800"),
Expand Down Expand Up @@ -194,6 +196,25 @@ func main() {
}
}

if args[EnvPlainOutput] != "1" {
log.Logger = log.Output(zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: "15:04:05",
})
} else {
output := zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339}
output.FormatTimestamp = func(i interface{}) string {
return fmt.Sprintf("%v", i)
}
output.FormatLevel = func(i interface{}) string {
return strings.ToUpper(fmt.Sprintf("%s", i))
}
output.FormatFieldName = func(i interface{}) string {
return fmt.Sprintf("%s=", i)
}
log.Logger = zerolog.New(output).With().Timestamp().Logger()
}

maxConnsInt, err := strconv.Atoi(args[EnvMaxConns])
if err != nil {
panic("invalid value for " + EnvMaxConns + ": " + args[EnvMaxConns])
Expand Down Expand Up @@ -285,12 +306,12 @@ func executeMigrations() {

db, err := goose.OpenDBWithDriver("postgres", envArgs[EnvDB])
if err != nil {
log.Fatalf("goose: failed to open DB: %v\n", err)
log.Fatal().Err(err).Msgf("goose: failed to open DB: %v\n", err)
}

defer func() {
if err := db.Close(); err != nil {
log.Fatalf("goose: failed to close DB: %v\n", err)
log.Fatal().Err(err).Msgf("goose: failed to close DB: %v\n", err)
}
}()

Expand All @@ -301,7 +322,7 @@ func executeMigrations() {

goose.SetBaseFS(syncv3.EmbedMigrations)
if err := goose.Run(command, db, "state/migrations", arguments...); err != nil {
log.Fatalf("goose %v: %v", command, err)
log.Fatal().Err(err).Msgf("goose %v: %v", command, err)
}
}

Expand Down
10 changes: 2 additions & 8 deletions internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,9 @@ import (
"runtime"

"github.com/getsentry/sentry-go"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: "15:04:05",
})

type HandlerError struct {
StatusCode int
Err error
Expand Down Expand Up @@ -103,7 +97,7 @@ func assert(msg string, expr bool) {
if os.Getenv("SYNCV3_DEBUG") == "1" {
panic(fmt.Sprintf("assert: %s", msg))
}
l := logger.Error()
l := log.Error()
_, file, line, ok := runtime.Caller(1)
if ok {
l = l.Str("assertion", fmt.Sprintf("%s:%d", file, line))
Expand Down
7 changes: 0 additions & 7 deletions pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,12 @@ package pubsub

import (
"fmt"
"os"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
)

var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: "15:04:05",
})

type Payload interface {
// The type of payload; used mostly for logging and prometheus metrics
Type() string
Expand Down
3 changes: 2 additions & 1 deletion pubsub/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"

"github.com/matrix-org/sliding-sync/internal"
"github.com/rs/zerolog/log"
)

// The channel which has V2* payloads
Expand Down Expand Up @@ -197,7 +198,7 @@ func (v *V2Sub) onMessage(p Payload) {
case *V2StateRedaction:
v.receiver.OnStateRedaction(pl)
default:
logger.Warn().Str("type", p.Type()).Msg("V2Sub: unhandled payload type")
log.Warn().Str("type", p.Type()).Msg("V2Sub: unhandled payload type")
}
}

Expand Down
6 changes: 5 additions & 1 deletion pubsub/v3.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package pubsub

import (
"github.com/rs/zerolog/log"
)

// The channel which has V3* payloads
const ChanV3 = "v3ch"

Expand Down Expand Up @@ -39,7 +43,7 @@ func (v *V3Sub) onMessage(p Payload) {
case *V3EnsurePolling:
v.receiver.EnsurePolling(pl)
default:
logger.Warn().Str("type", p.Type()).Msg("V3Sub: unhandled payload type")
log.Warn().Str("type", p.Type()).Msg("V3Sub: unhandled payload type")
}
}

Expand Down
13 changes: 4 additions & 9 deletions sqlutil/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,12 @@ import (
"context"
"fmt"
"github.com/matrix-org/sliding-sync/internal"
"github.com/rs/zerolog"
"os"
"runtime/debug"

"github.com/jmoiron/sqlx"
"github.com/rs/zerolog/log"
)

var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: "15:04:05",
})

// WithTransaction runs a block of code passing in an SQL transaction
// If the code returns an error or panics then the transactions is rolled back
// Otherwise the transaction is committed.
Expand All @@ -30,7 +24,7 @@ func WithTransaction(db *sqlx.DB, fn func(txn *sqlx.Tx) error) (err error) {
if err == nil && panicErr != nil {
// TODO: thread a context through to here?
ctx := context.Background()
logger.Error().Msg(string(debug.Stack()))
log.Error().Msg(string(debug.Stack()))
internal.GetSentryHubFromContextOrDefault(ctx).RecoverWithContext(ctx, panicErr)
err = fmt.Errorf("panic: %v", panicErr)
}
Expand Down Expand Up @@ -59,7 +53,8 @@ type Chunker interface {
// Inserting events using NamedExec involves 3n params (n=number of events), meaning it's easy to hit
// the limit in rooms like Matrix HQ. This function breaks up the events into chunks which can be
// batch inserted in multiple statements. Without this, you'll see errors like:
// "pq: got 95331 parameters but PostgreSQL only supports 65535 parameters"
//
// "pq: got 95331 parameters but PostgreSQL only supports 65535 parameters"
func Chunkify(numParamsPerStmt, maxParamsPerCall int, entries Chunker) []Chunker {
// common case, most things are small
if (entries.Len() * numParamsPerStmt) <= maxParamsPerCall {
Expand Down
15 changes: 8 additions & 7 deletions state/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/matrix-org/sliding-sync/sqlutil"
"github.com/rs/zerolog/log"
"github.com/tidwall/gjson"
)

Expand Down Expand Up @@ -77,7 +78,7 @@ func (a *Accumulator) calculateNewSnapshot(old StrippedEvents, new Event) (Strip
// ruh roh. This should be impossible, but it can happen if the v2 response sends the same
// event in both state and timeline. We need to alert the operator and whine badly as it means
// we have lost an event by now.
logger.Warn().Str("new_event_id", new.ID).Str("old_event_id", e.ID).Str("room_id", new.RoomID).Str("type", new.Type).Str("state_key", new.StateKey).Msg(
log.Warn().Str("new_event_id", new.ID).Str("old_event_id", e.ID).Str("room_id", new.RoomID).Str("type", new.Type).Str("state_key", new.StateKey).Msg(
"Detected different event IDs with the same NID when rolling forward state. This has resulted in data loss in this room (1 event). " +
"This can happen when the v2 /sync response sends the same event in both state and timeline sections. " +
"The event in this log line has been dropped!",
Expand Down Expand Up @@ -227,7 +228,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
// we don't have a current snapshot for this room but yet no events are new,
// no idea how this should be handled.
const errMsg = "Accumulator.Initialise: room has no current snapshot but also no new inserted events, doing nothing. This is probably a bug."
logger.Error().Str("room_id", roomID).Msg(errMsg)
log.Error().Str("room_id", roomID).Msg(errMsg)
sentry.CaptureException(fmt.Errorf(errMsg))
}
// Note: we otherwise ignore cases where the state has only changed to a
Expand Down Expand Up @@ -398,7 +399,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s
} else {
// Bail out and complain loudly.
const msg = "Accumulator: skipping processing of timeline, as no snapshot exists"
logger.Warn().
log.Warn().
Str("event_id", newEvents[0].ID).
Str("event_type", newEvents[0].Type).
Str("event_state_key", newEvents[0].StateKey).
Expand Down Expand Up @@ -484,7 +485,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s
if roomVersion == "" {
// Defaults to "1" if the key does not exist.
roomVersion = "1"
logger.Warn().Str("room", roomID).Err(err).Msg(
log.Warn().Str("room", roomID).Err(err).Msg(
"Redact: no content.room_version in create event, defaulting to v1",
)
}
Expand Down Expand Up @@ -576,13 +577,13 @@ func parseAndDeduplicateTimelineEvents(roomID string, timeline sync2.TimelineRes
RoomID: roomID,
}
if err := e.ensureFieldsSetOnEvent(); err != nil {
logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Err(err).Msg(
log.Warn().Str("event_id", e.ID).Str("room_id", roomID).Err(err).Msg(
"Accumulator.filterToNewTimelineEvents: failed to parse event, ignoring",
)
continue
}
if _, ok := seenEvents[e.ID]; ok {
logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg(
log.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg(
"Accumulator.filterToNewTimelineEvents: seen the same event ID twice, ignoring",
)
continue
Expand Down Expand Up @@ -671,7 +672,7 @@ func ensureStateHasCreateEvent(events []Event) error {
})
sentry.CaptureMessage(errMsg)
})
logger.Warn().
log.Warn().
Str("room_id", events[0].RoomID).
Int("len_state", len(events)).
Msg(errMsg)
Expand Down
5 changes: 3 additions & 2 deletions state/event_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sqlutil"
"github.com/rs/zerolog/log"
)

const (
Expand Down Expand Up @@ -393,7 +394,7 @@ func (t *EventTable) Redact(txn *sqlx.Tx, roomVer string, redacteeEventIDToRedac
if err != nil {
// unknown room version... let's just default to "1"
rv = gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionV1)
logger.Warn().Str("version", roomVer).Err(err).Msg(
log.Warn().Str("version", roomVer).Err(err).Msg(
"Redact: GetRoomVersion: unknown room version, defaulting to v1",
)
}
Expand Down Expand Up @@ -567,7 +568,7 @@ func filterAndEnsureFieldsSet(events []Event) []Event {
for i := range events {
ev := &events[i]
if err := ev.ensureFieldsSetOnEvent(); err != nil {
logger.Warn().Str("event_id", ev.ID).Err(err).Msg(
log.Warn().Str("event_id", ev.ID).Err(err).Msg(
"filterAndEnsureFieldsSet: failed to parse event, ignoring",
)
continue
Expand Down
15 changes: 5 additions & 10 deletions state/migrations/20230822180807_bogus_snapshot_cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@ import (
"context"
"database/sql"
"fmt"

"github.com/lib/pq"
"github.com/pressly/goose/v3"
"github.com/rs/zerolog"
"os"
"github.com/rs/zerolog/log"
)

var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: "15:04:05",
})

func init() {
goose.AddMigrationContext(upBogusSnapshotCleanup, downBogusSnapshotCleanup)
}
Expand All @@ -29,7 +24,7 @@ func upBogusSnapshotCleanup(ctx context.Context, tx *sql.Tx) error {
if len(bogusRooms) == 0 {
return nil
}
logger.Info().Strs("room_ids", bogusRooms).
log.Info().Strs("room_ids", bogusRooms).
Msgf("Found %d bogus rooms to cleanup", len(bogusRooms))

tables := []string{"syncv3_snapshots", "syncv3_events", "syncv3_rooms"}
Expand All @@ -52,9 +47,9 @@ func deleteFromTable(ctx context.Context, tx *sql.Tx, table string, roomIDs []st
}
ra, err := result.RowsAffected()
if err != nil {
logger.Warn().Err(err).Msgf("Couldn't get number of rows deleted from %s", table)
log.Warn().Err(err).Msgf("Couldn't get number of rows deleted from %s", table)
} else {
logger.Info().Msgf("Deleted %d rows from %s", ra, table)
log.Info().Msgf("Deleted %d rows from %s", ra, table)
}
return nil
}
Expand Down
Loading