diff --git a/eph/leasedReceiver.go b/eph/leasedReceiver.go index bafa286a..d9d91c9f 100644 --- a/eph/leasedReceiver.go +++ b/eph/leasedReceiver.go @@ -27,6 +27,7 @@ import ( "errors" "fmt" "math/rand" + "sync/atomic" "time" "github.com/devigned/tab" @@ -38,15 +39,18 @@ type ( leasedReceiver struct { handle *eventhub.ListenerHandle processor *EventProcessorHost - lease LeaseMarker + lease *atomic.Value // LeaseMarker done func() } ) func newLeasedReceiver(processor *EventProcessorHost, lease LeaseMarker) *leasedReceiver { + leaseValue := atomic.Value{} + leaseValue.Store(lease) + return &leasedReceiver{ processor: processor, - lease: lease, + lease: &leaseValue, } } @@ -54,8 +58,10 @@ func (lr *leasedReceiver) Run(ctx context.Context) error { span, ctx := lr.startConsumerSpanFromContext(ctx, "eph.leasedReceiver.Run") defer span.End() - partitionID := lr.lease.GetPartitionID() - epoch := lr.lease.GetEpoch() + lease := lr.getLease() + + partitionID := lease.GetPartitionID() + epoch := lease.GetEpoch() lr.dlog(ctx, "running...") renewLeaseCtx, cancelRenewLease := context.WithCancel(context.Background()) @@ -99,7 +105,9 @@ func (lr *leasedReceiver) listenForClose() { defer cancel() span, ctx := lr.startConsumerSpanFromContext(ctx, "eph.leasedReceiver.listenForClose") defer span.End() - err := lr.processor.scheduler.stopReceiver(ctx, lr.lease) + + lease := lr.getLease() + err := lr.processor.scheduler.stopReceiver(ctx, lease) if err != nil { tab.For(ctx).Error(err) } @@ -120,7 +128,8 @@ func (lr *leasedReceiver) periodicallyRenewLease(ctx context.Context) { err := lr.tryRenew(ctx) if err != nil { tab.For(ctx).Error(err) - _ = lr.processor.scheduler.stopReceiver(ctx, lr.lease) + lease := lr.getLease() + _ = lr.processor.scheduler.stopReceiver(ctx, lease) } } } @@ -130,7 +139,8 @@ func (lr *leasedReceiver) tryRenew(ctx context.Context) error { span, ctx := lr.startConsumerSpanFromContext(ctx, "eph.leasedReceiver.tryRenew") defer span.End() - lease, ok, err := lr.processor.leaser.RenewLease(ctx, lr.lease.GetPartitionID()) + oldLease := lr.getLease() + lease, ok, err := lr.processor.leaser.RenewLease(ctx, oldLease.GetPartitionID()) if err != nil { tab.For(ctx).Error(err) return err @@ -141,23 +151,33 @@ func (lr *leasedReceiver) tryRenew(ctx context.Context) error { return err } lr.dlog(ctx, "lease renewed") - lr.lease = lease + + lr.lease.Store(lease) return nil } func (lr *leasedReceiver) dlog(ctx context.Context, msg string) { name := lr.processor.name - partitionID := lr.lease.GetPartitionID() - epoch := lr.lease.GetEpoch() + lease := lr.getLease() + + partitionID := lease.GetPartitionID() + epoch := lease.GetEpoch() tab.For(ctx).Debug(fmt.Sprintf("eph %q, partition %q, epoch %d: "+msg, name, partitionID, epoch)) } func (lr *leasedReceiver) startConsumerSpanFromContext(ctx context.Context, operationName string) (tab.Spanner, context.Context) { span, ctx := startConsumerSpanFromContext(ctx, operationName) + + lease := lr.getLease() + span.AddAttributes( tab.StringAttribute("eph.id", lr.processor.name), - tab.StringAttribute(partitionIDTag, lr.lease.GetPartitionID()), - tab.Int64Attribute(epochTag, lr.lease.GetEpoch()), + tab.StringAttribute(partitionIDTag, lease.GetPartitionID()), + tab.Int64Attribute(epochTag, lease.GetEpoch()), ) return span, ctx } + +func (lr *leasedReceiver) getLease() LeaseMarker { + return lr.lease.Load().(LeaseMarker) +} diff --git a/eph/scheduler.go b/eph/scheduler.go index 695b9341..1a17fac4 100644 --- a/eph/scheduler.go +++ b/eph/scheduler.go @@ -190,7 +190,8 @@ func (s *scheduler) Stop(ctx context.Context) error { if err := lr.Close(ctx); err != nil { lastErr = err } - _, _ = s.processor.leaser.ReleaseLease(ctx, lr.lease.GetPartitionID()) + + _, _ = s.processor.leaser.ReleaseLease(ctx, lr.getLease().GetPartitionID()) } return lastErr diff --git a/storage/storage.go b/storage/storage.go index 93ccad20..91ca946e 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -630,6 +630,12 @@ func (sl *LeaserCheckpointer) createOrGetLease(ctx context.Context, partitionID }) if err != nil { + if storageErr := azblobvendor.StorageError(nil); errors.As(err, &storageErr) && + (storageErr.Response().StatusCode == http.StatusConflict || // blob exists + storageErr.Response().StatusCode == http.StatusPreconditionFailed) { // blob exists AND an Azure storage lease is active + return sl.getLease(ctx, partitionID) + } + return nil, err } diff --git a/storage/storage_test.go b/storage/storage_test.go index e6023f09..9e70e4e6 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -105,6 +105,10 @@ func (ts *testSuite) TestLeaserLeaseEnsure() { lease, err := leaser.EnsureLease(ctx, partitionID) ts.NoError(err) ts.Equal(partitionID, lease.GetPartitionID()) + + lease, err = leaser.EnsureLease(ctx, partitionID) + ts.NoError(err) + ts.Equal(partitionID, lease.GetPartitionID()) } }