Skip to content

Commit

Permalink
Try using slog in the engine
Browse files Browse the repository at this point in the history
  • Loading branch information
lalexgap committed Aug 21, 2023
1 parent e1c98e7 commit 2ea98ec
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 30 deletions.
14 changes: 14 additions & 0 deletions internal/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package logging

import (
"log"
"log/slog"
"os"
"path/filepath"
"strconv"
"sync"

"github.com/rs/zerolog"
"github.com/statechannels/go-nitro/protocols"
"github.com/statechannels/go-nitro/types"
)

Expand Down Expand Up @@ -55,3 +57,15 @@ func NewLogWriter(logDir, logFile string) *os.File {
func WithAddress(c zerolog.Context, address *types.Address) zerolog.Context {
return c.Str("address", address.String()[0:8])
}

func WithAddressAttribute(a types.Address) slog.Attr {
return slog.String("address", a.String()[0:8])
}

func WithChannelIdAttribute(c types.Destination) slog.Attr {
return slog.String("channel-id", c.String())
}

func WithObjectiveIdAttribute(o protocols.ObjectiveId) slog.Attr {
return slog.String("objective-id", string(o))
}
60 changes: 31 additions & 29 deletions node/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"math/big"
"sync"

"github.com/rs/zerolog"
"github.com/statechannels/go-nitro/channel"
"github.com/statechannels/go-nitro/channel/consensus_channel"
"github.com/statechannels/go-nitro/internal/logging"
Expand Down Expand Up @@ -73,9 +72,8 @@ type Engine struct {
store store.Store // A Store for persisting and restoring important data
policymaker PolicyMaker // A PolicyMaker decides whether to approve or reject objectives

logger zerolog.Logger

vm *payments.VoucherManager
logger *slog.Logger
vm *payments.VoucherManager

wg *sync.WaitGroup
cancel context.CancelFunc
Expand Down Expand Up @@ -127,7 +125,7 @@ type CompletedObjectiveEvent struct {
type Response struct{}

// NewEngine is the constructor for an Engine
func New(vm *payments.VoucherManager, msg messageservice.MessageService, chain chainservice.ChainService, store store.Store, logDestination io.Writer, policymaker PolicyMaker, eventHandler func(EngineEvent)) Engine {
func New(vm *payments.VoucherManager, msg messageservice.MessageService, chain chainservice.ChainService, store store.Store, policymaker PolicyMaker, eventHandler func(EngineEvent)) Engine {
e := Engine{}

e.store = store
Expand All @@ -145,14 +143,14 @@ func New(vm *payments.VoucherManager, msg messageservice.MessageService, chain c

e.eventHandler = eventHandler

logging.ConfigureZeroLogger()
e.logger = logging.WithAddress(zerolog.New(logDestination).With().Timestamp(), e.store.GetAddress()).Caller().Logger()
// We set a local logger here that will include the address of the node when logging
e.logger = slog.Default().With(logging.WithAddressAttribute(*e.store.GetAddress()))

e.policymaker = policymaker

e.vm = vm

e.logger.Print("Constructed Engine")
e.logger.Info("Constructed Engine")

e.wg = &sync.WaitGroup{}

Expand Down Expand Up @@ -206,7 +204,7 @@ func (e *Engine) run(ctx context.Context) {
if !res.IsEmpty() {

for _, obj := range res.CompletedObjectives {
e.logger.Printf("Objective %s is complete & returned to API", obj.Id())
e.logger.Info("Objective is complete & returned to API", logging.WithObjectiveIdAttribute(obj.Id()))
}
e.eventHandler(res)
}
Expand All @@ -225,7 +223,7 @@ func (e *Engine) handleProposal(proposal consensus_channel.Proposal) (EngineEven
return EngineEvent{}, err
}
if obj.GetStatus() == protocols.Completed {
e.logger.Printf("Ignoring proposal for complected objective %s", obj.Id())
e.logger.Info("Ignoring proposal for complected objective", slog.String("id", string(id)))
return EngineEvent{}, nil
}
return e.attemptProgress(obj)
Expand All @@ -249,7 +247,7 @@ func (e *Engine) handleMessage(message protocols.Message) (EngineEvent, error) {
}

if objective.GetStatus() == protocols.Unapproved {
e.logger.Printf("Policymaker is %+v", e.policymaker)
e.logger.Info("Policymaker for objective", "policy-maker", e.policymaker, logging.WithObjectiveIdAttribute(objective.Id()))
if e.policymaker.ShouldApprove(objective) {
objective = objective.Approve()

Expand Down Expand Up @@ -278,11 +276,12 @@ func (e *Engine) handleMessage(message protocols.Message) (EngineEvent, error) {
}

if objective.GetStatus() == protocols.Completed {
e.logger.Printf("Ignoring payload for complected objective %s", objective.Id())
e.logger.Info("Ignoring payload for complected objective", logging.WithObjectiveIdAttribute(objective.Id()))

continue
}
if objective.GetStatus() == protocols.Rejected {
e.logger.Printf("Ignoring payload for rejected objective %s", objective.Id())
e.logger.Info("Ignoring payload for rejected objective", logging.WithObjectiveIdAttribute(objective.Id()))
continue
}

Expand Down Expand Up @@ -313,7 +312,8 @@ func (e *Engine) handleMessage(message protocols.Message) (EngineEvent, error) {
return EngineEvent{}, err
}
if o.GetStatus() == protocols.Completed {
e.logger.Printf("Ignoring payload for complected objective %s", o.Id())
e.logger.Info("Ignoring proposal for completed objective", logging.WithObjectiveIdAttribute(id))

continue
}
objective, isProposalReceiver := o.(protocols.ProposalReceiver)
Expand Down Expand Up @@ -341,7 +341,8 @@ func (e *Engine) handleMessage(message protocols.Message) (EngineEvent, error) {
return EngineEvent{}, err
}
if objective.GetStatus() == protocols.Rejected {
e.logger.Printf("Ignoring payload for rejected objective %s", objective.Id())
e.logger.Info("Ignoring payload for rejected objective", logging.WithObjectiveIdAttribute(objective.Id()))

continue
}

Expand Down Expand Up @@ -395,10 +396,9 @@ func (e *Engine) handleMessage(message protocols.Message) (EngineEvent, error) {
// - generates an updated objective, and
// - attempts progress.
func (e *Engine) handleChainEvent(chainEvent chainservice.Event) (EngineEvent, error) {
e.logger.Printf("handling chain event: %v", chainEvent)
e.logger.Info("Handling chain event", "event", chainEvent, logging.WithAddressAttribute(*e.store.GetAddress()))

c, ok := e.store.GetChannelById(chainEvent.ChannelID())

if !ok {
// TODO: Right now the chain service returns chain events for ALL channels even those we aren't involved in
// for now we can ignore channels we aren't involved in
Expand Down Expand Up @@ -436,7 +436,7 @@ func (e *Engine) handleObjectiveRequest(or protocols.ObjectiveRequest) (EngineEv

objectiveId := or.Id(myAddress, chainId)
failedEngineEvent := EngineEvent{FailedObjectives: []protocols.ObjectiveId{objectiveId}}
e.logger.Printf("handling new objective request for %s", objectiveId)
e.logger.Info("handling new objective request", logging.WithObjectiveIdAttribute(objectiveId))
defer or.SignalObjectiveStarted()
switch request := or.(type) {

Expand Down Expand Up @@ -538,7 +538,8 @@ func (e *Engine) sendMessages(msgs []protocols.Message) {
e.logMessage(message, Outgoing)
err := e.msg.Send(message)
if err != nil {
e.logger.Err(err)
slog.Error(err.Error())
panic(err)
}
}
e.wg.Done()
Expand All @@ -551,7 +552,8 @@ func (e *Engine) executeSideEffects(sideEffects protocols.SideEffects) error {
go e.sendMessages(sideEffects.MessagesToSend)

for _, tx := range sideEffects.TransactionsToSubmit {
e.logger.Printf("Sending chain transaction for channel %s", tx.ChannelId())
slog.Info("Sending chain transaction", "channel", tx.ChannelId().String())

err := e.chain.SendTransaction(tx)
if err != nil {
return err
Expand Down Expand Up @@ -592,7 +594,7 @@ func (e *Engine) attemptProgress(objective protocols.Objective) (outgoing Engine
}
outgoing.Merge(notifEvents)

e.logger.Printf("Objective %s is %s", objective.Id(), waitingFor)
e.logger.Info("Objective cranked", logging.WithObjectiveIdAttribute(objective.Id()), "waiting-for", string(waitingFor))

// If our protocol is waiting for nothing then we know the objective is complete
// TODO: If attemptProgress is called on a completed objective CompletedObjectives would include that objective id
Expand Down Expand Up @@ -707,7 +709,8 @@ func (e *Engine) getOrCreateObjective(p protocols.ObjectivePayload) (protocols.O
if err != nil {
return nil, fmt.Errorf("error setting objective in store: %w", err)
}
e.logger.Printf("Created new objective from message %s", newObj.Id())
slog.Info("Created new objective from message", "id", id)

return newObj, nil

} else {
Expand All @@ -717,8 +720,7 @@ func (e *Engine) getOrCreateObjective(p protocols.ObjectivePayload) (protocols.O

// constructObjectiveFromMessage Constructs a new objective (of the appropriate concrete type) from the supplied payload.
func (e *Engine) constructObjectiveFromMessage(id protocols.ObjectiveId, p protocols.ObjectivePayload) (protocols.Objective, error) {
e.logger.Printf("Constructing objective %s from message", id)

slog.Info("Constructing objective from message", logging.WithObjectiveIdAttribute(id), logging.WithAddressAttribute(*e.store.GetAddress()))
switch {
case directfund.IsDirectFundObjective(id):

Expand Down Expand Up @@ -815,22 +817,22 @@ const (
// logMessage logs a message to the engine's logger
func (e *Engine) logMessage(msg protocols.Message, direction messageDirection) {
if direction == Incoming {
e.logger.Trace().EmbedObject(msg.Summarize()).Msg("Received message")
slog.Debug("Received message", "msg", msg.Summarize(), logging.WithAddressAttribute(*e.store.GetAddress()))
} else {
e.logger.Trace().EmbedObject(msg.Summarize()).Msg("Sending message")
slog.Debug("Sent message", "msg", msg.Summarize(), logging.WithAddressAttribute(*e.store.GetAddress()))
}
}

func (e *Engine) checkError(err error) {
if err != nil {
e.logger.Err(err).Msgf("%s, error in run loop", e.store.GetAddress())
slog.Error("error in run loop", "err", err)

for _, nonFatalError := range nonFatalErrors {
if errors.Is(err, nonFatalError) {
return
}
}

e.logger.Panic().Msg(err.Error())
panic(err)
}
}
8 changes: 7 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package node // import "github.com/statechannels/go-nitro/node"
import (
"fmt"
"io"
"log/slog"
"math/big"
"runtime/debug"
"time"
Expand Down Expand Up @@ -58,7 +59,12 @@ func New(messageService messageservice.MessageService, chainservice chainservice
n.vm = payments.NewVoucherManager(*store.GetAddress(), store)
n.logger = logging.WithAddress(zerolog.New(logDestination).With().Timestamp(), n.Address).Caller().Logger()

n.engine = engine.New(n.vm, messageService, chainservice, store, logDestination, policymaker, n.handleEngineEvent)
// Set up the default logger for slog
// TODO: Eventually this should be done at the test/CLI level not within the node
h := slog.NewJSONHandler(logDestination, &slog.HandlerOptions{})
slog.SetDefault(slog.New(h))

n.engine = engine.New(n.vm, messageService, chainservice, store, policymaker, n.handleEngineEvent)
n.completedObjectives = &safesync.Map[chan struct{}]{}
n.completedObjectivesForRPC = make(chan protocols.ObjectiveId, 100)

Expand Down

0 comments on commit 2ea98ec

Please sign in to comment.