Skip to content

Commit

Permalink
feat: add timeout to Ask/RemoteAsk and Client Ask (#380)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Jun 29, 2024
1 parent efcd10b commit 28d2fa9
Show file tree
Hide file tree
Showing 18 changed files with 277 additions and 242 deletions.
47 changes: 27 additions & 20 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type ActorSystem interface {
Deregister(ctx context.Context, actor Actor) error
// handleRemoteAsk handles a synchronous message to another actor and expect a response.
// This block until a response is received or timed out.
handleRemoteAsk(ctx context.Context, to PID, message proto.Message) (response proto.Message, err error)
handleRemoteAsk(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
// handleRemoteTell handles an asynchronous message to an actor
handleRemoteTell(ctx context.Context, to PID, message proto.Message) error
}
Expand All @@ -153,7 +153,7 @@ type actorSystem struct {
expireActorAfter time.Duration
// Specifies how long the sender of a receiveContext should wait to receive a reply
// when using SendReply. The default value is 5s
replyTimeout time.Duration
askTimeout time.Duration
// Specifies the shutdown timeout. The default value is 30s
shutdownTimeout time.Duration
// Specifies the maximum of retries to attempt when the actor
Expand Down Expand Up @@ -194,7 +194,8 @@ type actorSystem struct {
// specifies the stash capacity
stashCapacity uint64

housekeeperStopSig chan types.Unit
stopGC chan types.Unit
gcInterval time.Duration

// specifies the events stream
eventsStream *eventstream.EventsStream
Expand All @@ -209,7 +210,7 @@ type actorSystem struct {
metricEnabled atomic.Bool

registry types.Registry
reflection reflection
reflection *reflection

peersStateLoopInterval time.Duration
peersCacheMu *sync.RWMutex
Expand Down Expand Up @@ -237,14 +238,15 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
name: name,
logger: log.DefaultLogger,
expireActorAfter: DefaultPassivationTimeout,
replyTimeout: DefaultReplyTimeout,
askTimeout: DefaultAskTimeout,
actorInitMaxRetries: DefaultInitMaxRetries,
supervisorStrategy: DefaultSupervisoryStrategy,
telemetry: telemetry.New(),
locker: sync.Mutex{},
shutdownTimeout: DefaultShutdownTimeout,
mailboxSize: DefaultMailboxSize,
housekeeperStopSig: make(chan types.Unit, 1),
stopGC: make(chan types.Unit, 1),
gcInterval: DefaultGCInterval,
eventsStream: eventstream.New(),
partitionHasher: hash.DefaultHasher(),
actorInitTimeout: DefaultInitTimeout,
Expand Down Expand Up @@ -432,7 +434,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID,
opts := []pidOption{
withInitMaxRetries(x.actorInitMaxRetries),
withPassivationAfter(x.expireActorAfter),
withReplyTimeout(x.replyTimeout),
withAskTimeout(x.askTimeout),
withCustomLogger(x.logger),
withActorSystem(x),
withSupervisorStrategy(x.supervisorStrategy),
Expand Down Expand Up @@ -506,7 +508,7 @@ func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc
pidOpts := []pidOption{
withInitMaxRetries(x.actorInitMaxRetries),
withPassivationAfter(x.expireActorAfter),
withReplyTimeout(x.replyTimeout),
withAskTimeout(x.askTimeout),
withCustomLogger(x.logger),
withActorSystem(x),
withSupervisorStrategy(x.supervisorStrategy),
Expand Down Expand Up @@ -780,7 +782,7 @@ func (x *actorSystem) Start(ctx context.Context) error {

x.scheduler.Start(spanCtx)

go x.housekeeper()
go x.gc()

x.logger.Infof("%s started..:)", x.name)
return nil
Expand All @@ -799,7 +801,7 @@ func (x *actorSystem) Stop(ctx context.Context) error {
return e
}

x.housekeeperStopSig <- types.Unit{}
x.stopGC <- types.Unit{}
x.logger.Infof("%s is shutting down..:)", x.name)

x.started.Store(false)
Expand Down Expand Up @@ -935,7 +937,12 @@ func (x *actorSystem) RemoteAsk(ctx context.Context, stream *connect.BidiStream[
return ErrAddressNotFound(actorPath.String())
}

reply, err := x.handleRemoteAsk(ctx, pid, message)
timeout := x.askTimeout
if request.GetTimeout() != nil {
timeout = request.GetTimeout().AsDuration()
}

reply, err := x.handleRemoteAsk(ctx, pid, message, timeout)
if err != nil {
logger.Error(ErrRemoteSendFailure(err).Error())
return ErrRemoteSendFailure(err)
Expand Down Expand Up @@ -1150,10 +1157,10 @@ func (x *actorSystem) GetKinds(_ context.Context, request *connect.Request[inter

// handleRemoteAsk handles a synchronous message to another actor and expect a response.
// This block until a response is received or timed out.
func (x *actorSystem) handleRemoteAsk(ctx context.Context, to PID, message proto.Message) (response proto.Message, err error) {
func (x *actorSystem) handleRemoteAsk(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
spanCtx, span := x.tracer.Start(ctx, "HandleRemoteAsk")
defer span.End()
return Ask(spanCtx, to, message, x.replyTimeout)
return Ask(spanCtx, to, message, timeout)
}

// handleRemoteTell handles an asynchronous message to an actor
Expand Down Expand Up @@ -1317,19 +1324,19 @@ func (x *actorSystem) reset() {
x.cluster = nil
}

// housekeeper time to time removes dead actors from the system
// gc time to time removes dead actors from the system
// that helps free non-utilized resources
func (x *actorSystem) housekeeper() {
x.logger.Info("Housekeeping has started...")
ticker := time.NewTicker(30 * time.Millisecond)
func (x *actorSystem) gc() {
x.logger.Info("garbage collector has started...")
ticker := time.NewTicker(x.gcInterval)
tickerStopSig := make(chan types.Unit, 1)
go func() {
for {
select {
case <-ticker.C:
for _, actor := range x.Actors() {
if !actor.IsRunning() {
x.logger.Infof("Removing actor=%s from system", actor.ActorPath().Name())
x.logger.Infof("removing actor=%s from system", actor.ActorPath().Name())
x.actors.delete(actor.ActorPath())
if x.InCluster() {
if err := x.cluster.RemoveActor(context.Background(), actor.ActorPath().Name()); err != nil {
Expand All @@ -1339,7 +1346,7 @@ func (x *actorSystem) housekeeper() {
}
}
}
case <-x.housekeeperStopSig:
case <-x.stopGC:
tickerStopSig <- types.Unit{}
return
}
Expand All @@ -1348,7 +1355,7 @@ func (x *actorSystem) housekeeper() {

<-tickerStopSig
ticker.Stop()
x.logger.Info("Housekeeping has stopped...")
x.logger.Info("garbage collector has stopped...")
}

// registerMetrics register the PID metrics with OTel instrumentation.
Expand Down
6 changes: 3 additions & 3 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func TestActorSystem(t *testing.T) {
require.NotNil(t, remoteAddr)
require.True(t, proto.Equal(remoteAddr, addr))

reply, err := RemoteAsk(ctx, addr, new(testpb.TestReply))
reply, err := RemoteAsk(ctx, addr, new(testpb.TestReply), DefaultAskTimeout)
require.NoError(t, err)
require.NotNil(t, reply)

Expand Down Expand Up @@ -595,7 +595,7 @@ func TestActorSystem(t *testing.T) {
Port: int32(remotingPort),
Name: actorName,
Id: "",
}, new(testpb.TestReply))
}, new(testpb.TestReply), DefaultAskTimeout)
require.Error(t, err)
require.Nil(t, reply)

Expand Down Expand Up @@ -1467,7 +1467,7 @@ func TestActorSystem(t *testing.T) {
require.NotNil(t, addr)

// send the message to exchanger actor one using remote messaging
reply, err := RemoteAsk(ctx, addr, new(testpb.TestReply))
reply, err := RemoteAsk(ctx, addr, new(testpb.TestReply), DefaultAskTimeout)

require.NoError(t, err)
require.NotNil(t, reply)
Expand Down
4 changes: 3 additions & 1 deletion actors/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"connectrpc.com/otelconnect"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"

"github.com/tochemey/goakt/v2/goaktpb"
"github.com/tochemey/goakt/v2/internal/http"
Expand Down Expand Up @@ -201,7 +202,7 @@ func RemoteTell(ctx context.Context, to *goaktpb.Address, message proto.Message)
}

// RemoteAsk sends a synchronous message to another actor remotely and expect a response.
func RemoteAsk(ctx context.Context, to *goaktpb.Address, message proto.Message) (response *anypb.Any, err error) {
func RemoteAsk(ctx context.Context, to *goaktpb.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error) {
marshaled, err := anypb.New(message)
if err != nil {
return nil, ErrInvalidRemoteMessage(err)
Expand All @@ -224,6 +225,7 @@ func RemoteAsk(ctx context.Context, to *goaktpb.Address, message proto.Message)
Receiver: to,
Message: marshaled,
},
Timeout: durationpb.New(timeout),
}
stream := remotingService.RemoteAsk(ctx)
errc := make(chan error, 1)
Expand Down
12 changes: 6 additions & 6 deletions actors/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ func TestRemoteAsk(t *testing.T) {
// create a message to send to the test actor
message := new(testpb.TestReply)
// send the message to the actor
reply, err := RemoteAsk(ctx, addr, message)
reply, err := RemoteAsk(ctx, addr, message, time.Minute)
// perform some assertions
require.NoError(t, err)
require.NotNil(t, reply)
Expand Down Expand Up @@ -1079,7 +1079,7 @@ func TestRemoteAsk(t *testing.T) {
require.NoError(t, err)

// send the message to the actor
reply, err := RemoteAsk(ctx, addr, nil)
reply, err := RemoteAsk(ctx, addr, nil, time.Minute)
// perform some assertions
require.Error(t, err)
require.Nil(t, reply)
Expand Down Expand Up @@ -1133,7 +1133,7 @@ func TestRemoteAsk(t *testing.T) {
// create a message to send to the test actor
message := new(testpb.TestReply)
// send the message to the actor
reply, err := RemoteAsk(ctx, addr, message)
reply, err := RemoteAsk(ctx, addr, message, time.Minute)
// perform some assertions
require.Error(t, err)
require.Nil(t, reply)
Expand Down Expand Up @@ -1187,7 +1187,7 @@ func TestRemoteAsk(t *testing.T) {
// create a message to send to the test actor
message := new(testpb.TestReply)
// send the message to the actor
reply, err := RemoteAsk(ctx, addr, message)
reply, err := RemoteAsk(ctx, addr, message, time.Minute)
// perform some assertions
require.Error(t, err)
require.EqualError(t, err, "failed_precondition: remoting is not enabled")
Expand Down Expand Up @@ -1409,7 +1409,7 @@ func TestRemoteAsk(t *testing.T) {
// create a message to send to the test actor
message := new(testpb.TestReply)
// send the message to the actor
reply, err := RemoteAsk(ctx, addr, message)
reply, err := RemoteAsk(ctx, addr, message, time.Minute)
// perform some assertions
require.Error(t, err)
require.Contains(t, err.Error(), "not found")
Expand Down Expand Up @@ -1751,7 +1751,7 @@ func TestAPIRemoteSpawn(t *testing.T) {
require.NotNil(t, addr)

// send the message to exchanger actor one using remote messaging
reply, err := RemoteAsk(ctx, addr, new(testpb.TestReply))
reply, err := RemoteAsk(ctx, addr, new(testpb.TestReply), time.Minute)

require.NoError(t, err)
require.NotNil(t, reply)
Expand Down
26 changes: 13 additions & 13 deletions actors/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ func TestReceiveContext(t *testing.T) {
newSupervisor(),
withInitMaxRetries(1),
withCustomLogger(log.DiscardLogger),
withReplyTimeout(replyTimeout))
withAskTimeout(replyTimeout))

require.NoError(t, err)
assert.NotNil(t, parent)
Expand Down Expand Up @@ -726,7 +726,7 @@ func TestReceiveContext(t *testing.T) {
newSupervisor(),
withInitMaxRetries(1),
withCustomLogger(log.DiscardLogger),
withReplyTimeout(replyTimeout))
withAskTimeout(replyTimeout))

require.NoError(t, err)
assert.NotNil(t, parent)
Expand Down Expand Up @@ -758,7 +758,7 @@ func TestReceiveContext(t *testing.T) {
newSupervisor(),
withInitMaxRetries(1),
withCustomLogger(log.DiscardLogger),
withReplyTimeout(replyTimeout))
withAskTimeout(replyTimeout))

require.NoError(t, err)
assert.NotNil(t, parent)
Expand Down Expand Up @@ -799,7 +799,7 @@ func TestReceiveContext(t *testing.T) {
newSupervisor(),
withInitMaxRetries(1),
withCustomLogger(log.DiscardLogger),
withReplyTimeout(replyTimeout))
withAskTimeout(replyTimeout))

require.NoError(t, err)
assert.NotNil(t, parent)
Expand Down Expand Up @@ -840,7 +840,7 @@ func TestReceiveContext(t *testing.T) {
newSupervisor(),
withInitMaxRetries(1),
withCustomLogger(log.DiscardLogger),
withReplyTimeout(replyTimeout))
withAskTimeout(replyTimeout))

require.NoError(t, err)
assert.NotNil(t, parent)
Expand Down Expand Up @@ -881,7 +881,7 @@ func TestReceiveContext(t *testing.T) {
newSupervisor(),
withInitMaxRetries(1),
withCustomLogger(log.DiscardLogger),
withReplyTimeout(replyTimeout))
withAskTimeout(replyTimeout))

require.NoError(t, err)
assert.NotNil(t, parent)
Expand Down Expand Up @@ -924,7 +924,7 @@ func TestReceiveContext(t *testing.T) {
newSupervisor(),
withInitMaxRetries(1),
withCustomLogger(log.DiscardLogger),
withReplyTimeout(replyTimeout))
withAskTimeout(replyTimeout))

require.NoError(t, err)
assert.NotNil(t, parent)
Expand Down Expand Up @@ -958,7 +958,7 @@ func TestReceiveContext(t *testing.T) {
newSupervisor(),
withInitMaxRetries(1),
withCustomLogger(log.DiscardLogger),
withReplyTimeout(replyTimeout))
withAskTimeout(replyTimeout))

require.NoError(t, err)
assert.NotNil(t, parent)
Expand Down Expand Up @@ -998,7 +998,7 @@ func TestReceiveContext(t *testing.T) {
newSupervisor(),
withInitMaxRetries(1),
withCustomLogger(log.DiscardLogger),
withReplyTimeout(replyTimeout))
withAskTimeout(replyTimeout))

require.NoError(t, err)
assert.NotNil(t, parent)
Expand All @@ -1019,7 +1019,7 @@ func TestReceiveContext(t *testing.T) {
newSupervisor(),
withInitMaxRetries(1),
withCustomLogger(log.DiscardLogger),
withReplyTimeout(replyTimeout))
withAskTimeout(replyTimeout))

require.NoError(t, err)

Expand All @@ -1042,7 +1042,7 @@ func TestReceiveContext(t *testing.T) {
newSupervisor(),
withInitMaxRetries(1),
withCustomLogger(log.DiscardLogger),
withReplyTimeout(replyTimeout))
withAskTimeout(replyTimeout))

require.NoError(t, err)
assert.NotNil(t, parent)
Expand Down Expand Up @@ -1930,7 +1930,7 @@ func TestReceiveContext(t *testing.T) {

opts := []pidOption{
withInitMaxRetries(1),
withReplyTimeout(askTimeout),
withAskTimeout(askTimeout),
withPassivationDisabled(),
withCustomLogger(log.DefaultLogger),
}
Expand Down Expand Up @@ -1995,7 +1995,7 @@ func TestReceiveContext(t *testing.T) {

opts := []pidOption{
withInitMaxRetries(1),
withReplyTimeout(askTimeout),
withAskTimeout(askTimeout),
withPassivationDisabled(),
withCustomLogger(log.DiscardLogger),
}
Expand Down
Loading

0 comments on commit 28d2fa9

Please sign in to comment.