Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: System.ObjectDisposedException is thrown by DbOutboxWriter #128

Closed
mhmd-azeez opened this issue Apr 4, 2021 · 5 comments
Closed

BUG: System.ObjectDisposedException is thrown by DbOutboxWriter #128

mhmd-azeez opened this issue Apr 4, 2021 · 5 comments
Labels
bug Something isn't working

Comments

@mhmd-azeez
Copy link
Contributor

mhmd-azeez commented Apr 4, 2021

First of all, thank you very much for this library. It fits my use cases nicely. However, I came across a bug when I tried to use DbOutboxWriter, I'd appreciate it if you can take a look at it. The first publish works fine, the subsequent publishes all throw a System.ObjectDisposedException exception.

This is how I have configured the services:

services.AddSilverback()
                .UseDbContext<SampleDbContext>()
                .AddDbDistributedLockManager()
                .WithConnectionToMessageBroker(options =>
                    options.AddKafka()
                           .AddOutbox<DbOutboxWriter, DbOutboxReader>()
                           .AddOutboxWorker())
                .AddEndpointsConfigurator<EndpointsConfigurator>()
                .AddSingletonSubscriber<EchoMessageSubscriber>();

services.AddDbContext<SampleDbContext>(config =>
                config.UseNpgsql("***"));

services.AddControllers();
public class EndpointsConfigurator : IEndpointsConfigurator
{
    public void Configure(IEndpointsConfigurationBuilder builder)
    {
        builder
            .AddKafkaEndpoints(
                endpoints => endpoints

                    // Configure the properties needed by all consumers/producers
                    .Configure(
                        config =>
                        {
                            // The bootstrap server address is needed to connect
                            config.BootstrapServers =
                                "***"; // I am using Confluent Cloud
                            config.SaslUsername = "***";
                            config.SaslPassword = "***";
                            config.SaslMechanism = Confluent.Kafka.SaslMechanism.Plain;
                            config.SecurityProtocol = Confluent.Kafka.SecurityProtocol.SaslSsl;
                        })
                    // Produce the SampleMessage to the samples-basic topic
                    .AddOutbound<EchoMessage>(
                        endpoint => endpoint
                            .ProduceTo("dotnet-test-topic")
                            .ProduceToOutbox())
                    .AddInbound(
                        endpoints => endpoints.
                            ConsumeFrom("dotnet-test-topic")
                            .Configure(config =>
                            {
                                config.GroupId = "kafka-test-consumer";
                                config.AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Earliest;
                            })
                            .DeserializeJson(s => s.UseFixedType<EchoMessage>())
                            ));
    }
}

public class EchoMessageSubscriber
    {
        private readonly ILogger<EchoMessageSubscriber> _logger;

        public EchoMessageSubscriber(ILogger<EchoMessageSubscriber> logger)
        {
            _logger = logger;
        }

        public void OnMessageReceived(EchoMessage message) =>
            _logger.LogInformation($"Received {message.Text}");
    }

My DBContext:

public class SampleDbContext : DbContext
{
    public SampleDbContext(DbContextOptions options)
        : base(options)
    {
        this.Database.EnsureCreated();
    }

    public DbSet<OutboxMessage> Outbox { get; set; } = null!;

    public DbSet<InboundLogEntry> InboundMessages { get; set; } = null!;

    public DbSet<StoredOffset> StoredOffsets { get; set; } = null!;

    public DbSet<Lock> Locks { get; set; } = null!;

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder.Entity<InboundLogEntry>()
            .HasKey(t => new { t.MessageId, t.ConsumerGroupName });
    }
}

This is how I am publishing messages:

    public class EchoMessage
    {
        public string Text { get; set; }
    }

    [Route("api/[controller]")]
    [ApiController]
    public class CommandsController : ControllerBase
    {
        private readonly ILogger<CommandsController> _logger;
        private readonly IPublisher _publisher;

        public CommandsController(
            ILogger<CommandsController> logger,
            IPublisher publisher)
        {
            _logger = logger;
            _publisher = publisher;
        }

        [HttpGet]
        public async Task PublishEcho(string text)
        {
            await _publisher.PublishAsync(new EchoMessage
            {
                Text = text
            });

            _logger.LogInformation($"Produced: Echo => {text}");
        }
    }

This is my csproj file:

<Project Sdk="Microsoft.NET.Sdk.Web">

  <PropertyGroup>
    <TargetFramework>net5.0</TargetFramework>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="Confluent.Kafka" Version="1.6.3" />
    <PackageReference Include="Microsoft.EntityFrameworkCore" Version="5.0.2" />
    <PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="5.0.2">
      <PrivateAssets>all</PrivateAssets>
      <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
    </PackageReference>
    <PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="5.0.2" />
    <PackageReference Include="Swashbuckle.AspNetCore" Version="5.6.3" />
  </ItemGroup>

  <ItemGroup>
    <ProjectReference Include="..\..\silverback\src\Silverback.Core.EFCore30\Silverback.Core.EFCore30.csproj" />
    <ProjectReference Include="..\..\silverback\src\Silverback.Integration.Kafka\Silverback.Integration.Kafka.csproj" />
  </ItemGroup>

</Project>

The first publish works fine, but the consequent publishes throw this exception:

System.ObjectDisposedException: Cannot access a disposed context instance. A common cause of this error is disposing a context instance that was resolved from dependency injection and then later trying to use the same context instance elsewhere in your application. This may occur if you are calling 'Dispose' on the context instance, or wrapping it in a using statement. If you are using dependency injection, you should let the dependency injection container take care of disposing context instances.
Object name: 'SampleDbContext'.
   at Microsoft.EntityFrameworkCore.DbContext.CheckDisposed()
   at Microsoft.EntityFrameworkCore.DbContext.get_DbContextDependencies()
   at Microsoft.EntityFrameworkCore.DbContext.Microsoft.EntityFrameworkCore.Internal.IDbContextDependencies.get_StateManager()
   at Microsoft.EntityFrameworkCore.Internal.InternalDbSet`1.EntryWithoutDetectChanges(TEntity entity)
   at Microsoft.EntityFrameworkCore.Internal.InternalDbSet`1.Add(TEntity entity)
   at Silverback.Database.EfCoreDbSet`1.Add(TEntity entity) in D:\explore\silverback\src\Silverback.Core.EFCore30\Database\EfCoreDbSet`1.cs:line 36
   at Silverback.Messaging.Outbound.TransactionalOutbox.Repositories.DbOutboxWriter.WriteAsync(Object message, Byte[] messageBytes, IReadOnlyCollection`1 headers, String endpointName, String actualEndpointName) in D:\explore\silverback\src\Silverback.Integration\Messaging\Outbound\TransactionalOutbox\Repositories\DbOutboxWriter.cs:line 39
   at Silverback.Messaging.Outbound.Routing.OutboundQueueProducer.ProduceCoreAsync(Object message, Byte[] messageBytes, IReadOnlyCollection`1 headers, String actualEndpointName) in D:\explore\silverback\src\Silverback.Integration\Messaging\Outbound\Routing\OutboundQueueProducer.cs:line 112
   at Silverback.Messaging.Outbound.Routing.OutboundQueueProducer.ProduceCoreAsync(Object message, Stream messageStream, IReadOnlyCollection`1 headers, String actualEndpointName) in D:\explore\silverback\src\Silverback.Integration\Messaging\Outbound\Routing\OutboundQueueProducer.cs:line 98
   at Silverback.Messaging.Broker.Producer.<>c__DisplayClass35_1.<<ProduceAsync>b__0>d.MoveNext() in D:\explore\silverback\src\Silverback.Integration\Messaging\Broker\Producer.cs:line 356
--- End of stack trace from previous location ---
   at Silverback.Messaging.Headers.CustomHeadersMapperProducerBehavior.HandleAsync(ProducerPipelineContext context, ProducerBehaviorHandler next) in D:\explore\silverback\src\Silverback.Integration\Messaging\Headers\CustomHeadersMapperProducerBehavior.cs:line 39
   at Silverback.Messaging.Outbound.Routing.KafkaPartitionResolverProducerBehavior.HandleAsync(ProducerPipelineContext context, ProducerBehaviorHandler next) in D:\explore\silverback\src\Silverback.Integration.Kafka\Messaging\Outbound\Routing\KafkaPartitionResolverProducerBehavior.cs:line 39
   at Silverback.Messaging.Outbound.Routing.EndpointNameResolverProducerBehavior.HandleAsync(ProducerPipelineContext context, ProducerBehaviorHandler next) in D:\explore\silverback\src\Silverback.Integration\Messaging\Outbound\Routing\EndpointNameResolverProducerBehavior.cs:line 33
   at Silverback.Messaging.Sequences.SequencerProducerBehavior.HandleAsync(ProducerPipelineContext context, ProducerBehaviorHandler next) in D:\explore\silverback\src\Silverback.Integration\Messaging\Sequences\SequencerProducerBehavior.cs:line 61
   at Silverback.Messaging.Encryption.EncryptorProducerBehavior.HandleAsync(ProducerPipelineContext context, ProducerBehaviorHandler next) in D:\explore\silverback\src\Silverback.Integration\Messaging\Encryption\EncryptorProducerBehavior.cs:line 44
   at Silverback.Messaging.Serialization.SerializerProducerBehavior.HandleAsync(ProducerPipelineContext context, ProducerBehaviorHandler next) in D:\explore\silverback\src\Silverback.Integration\Messaging\Serialization\SerializerProducerBehavior.cs:line 41
   at Silverback.Messaging.BinaryFiles.BinaryFileHandlerProducerBehavior.HandleAsync(ProducerPipelineContext context, ProducerBehaviorHandler next) in D:\explore\silverback\src\Silverback.Integration\Messaging\BinaryFiles\BinaryFileHandlerProducerBehavior.cs:line 39
   at Silverback.Messaging.Outbound.KafkaMessageKeyInitializerProducerBehavior.HandleAsync(ProducerPipelineContext context, ProducerBehaviorHandler next) in D:\explore\silverback\src\Silverback.Integration.Kafka\Messaging\Outbound\KafkaMessageKeyInitializerProducerBehavior.cs:line 32
   at Silverback.Messaging.Broker.Behaviors.MessageIdInitializerProducerBehavior.HandleAsync(ProducerPipelineContext context, ProducerBehaviorHandler next) in D:\explore\silverback\src\Silverback.Integration\Messaging\Broker\Behaviors\MessageIdInitializerProducerBehavior.cs:line 26
   at Silverback.Messaging.Headers.HeadersWriterProducerBehavior.HandleAsync(ProducerPipelineContext context, ProducerBehaviorHandler next) in D:\explore\silverback\src\Silverback.Integration\Messaging\Headers\HeadersWriterProducerBehavior.cs:line 28
   at Silverback.Messaging.Diagnostics.ActivityProducerBehavior.HandleAsync(ProducerPipelineContext context, ProducerBehaviorHandler next) in D:\explore\silverback\src\Silverback.Integration\Messaging\Diagnostics\ActivityProducerBehavior.cs:line 42
   at Silverback.Messaging.Broker.Producer.ProduceAsync(IOutboundEnvelope envelope) in D:\explore\silverback\src\Silverback.Integration\Messaging\Broker\Producer.cs:line 352
   at Silverback.Messaging.Outbound.Routing.ProduceBehavior.HandleAsync(Object message, MessageHandler next) in D:\explore\silverback\src\Silverback.Integration\Messaging\Outbound\Routing\ProduceBehavior.cs:line 43
   at Silverback.Util.EnumerableForEachExtensions.ForEachAsync[T](IEnumerable`1 source, Func`2 action) in D:\explore\silverback\src\Silverback.Core\Util\EnumerableForEachExtensions.cs:line 33
   at Silverback.Messaging.Outbound.Routing.OutboundRouterBehavior.WrapAndRepublishRoutedMessageAsync(Object message) in D:\explore\silverback\src\Silverback.Integration\Messaging\Outbound\Routing\OutboundRouterBehavior.cs:line 80
   at Silverback.Messaging.Outbound.Routing.OutboundRouterBehavior.HandleAsync(Object message, MessageHandler next) in D:\explore\silverback\src\Silverback.Integration\Messaging\Outbound\Routing\OutboundRouterBehavior.cs:line 59
   at Silverback.Messaging.Outbound.Routing.ProduceBehavior.HandleAsync(Object message, MessageHandler next) in D:\explore\silverback\src\Silverback.Integration\Messaging\Outbound\Routing\ProduceBehavior.cs:line 47
   at KafkaTests.Controllers.CommandsController.PublishEcho(String text) in D:\explore\KafkaTests\KafkaTests\Controllers\CommandsController.cs:line 33
   at lambda_method208(Closure , Object )
   at Microsoft.AspNetCore.Mvc.Infrastructure.ActionMethodExecutor.AwaitableResultExecutor.Execute(IActionResultTypeMapper mapper, ObjectMethodExecutor executor, Object controller, Object[] arguments)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeActionMethodAsync>g__Awaited|12_0(ControllerActionInvoker invoker, ValueTask`1 actionResultValueTask)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeNextActionFilterAsync>g__Awaited|10_0(ControllerActionInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Rethrow(ActionExecutedContextSealed context)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.InvokeInnerFilterAsync()
--- End of stack trace from previous location ---
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeFilterPipelineAsync>g__Awaited|19_0(ResourceInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Awaited|17_0(ResourceInvoker invoker, Task task, IDisposable scope)
   at Microsoft.AspNetCore.Routing.EndpointMiddleware.<Invoke>g__AwaitRequestTask|6_0(Endpoint endpoint, Task requestTask, ILogger logger)
   at Microsoft.AspNetCore.Authorization.AuthorizationMiddleware.Invoke(HttpContext context)
   at Swashbuckle.AspNetCore.SwaggerUI.SwaggerUIMiddleware.Invoke(HttpContext httpContext)
   at Swashbuckle.AspNetCore.Swagger.SwaggerMiddleware.Invoke(HttpContext httpContext, ISwaggerProvider swaggerProvider)
   at Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddleware.Invoke(HttpContext context)

Here is a small repro which should demonstrate the bug if you fill in the connection strings for Kafka and Postgres:
repro-128.zip

@mhmd-azeez
Copy link
Contributor Author

I think DbOutboxWriter's lifetime should be tied to IProducer's lifetime 🤔

@BEagle1984
Copy link
Owner

Thank you for the bug report. I've been able to reproduce the issue and it is indeed a problem with the lifecycle of the DbOutboxWrite.
I'll publish an hot fix asap.

@BEagle1984 BEagle1984 added the bug Something isn't working label Apr 4, 2021
@mhmd-azeez
Copy link
Contributor Author

Thank you very much for the quick response, your library is a life saver 🙏 After implementing it into my current systems, I'll make sure to write a blog post or more about it :)

@BEagle1984
Copy link
Owner

@mhmd-azeez I published v3.0.1 with a fix for this issue. Please give it a try and let me know if the problem is solved for real. 👍

@mhmd-azeez
Copy link
Contributor Author

@BEagle1984 I can confirm that the bug is now fixed. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants