diff --git a/Directory.Build.props b/Directory.Build.props index 9381b3344..4b9a19a9e 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,6 +1,6 @@ - -beta.2 + -beta.3 4.4.0$(BaseVersionSuffix) 1 $(BaseVersionSuffix) diff --git a/docs/releases.md b/docs/releases.md index 46253edc6..31bd4e154 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -4,7 +4,7 @@ uid: releases # Releases -## [4.4.0-beta.2](https://github.com/BEagle1984/silverback/releases/tag/v4.4.0) +## [4.4.0-beta.3](https://github.com/BEagle1984/silverback/releases/tag/v4.4.0) ### What's new diff --git a/src/Silverback.Integration.Kafka/Diagnostics/KafkaLogEvents.cs b/src/Silverback.Integration.Kafka/Diagnostics/KafkaLogEvents.cs index 3617b046c..3595a1d59 100644 --- a/src/Silverback.Integration.Kafka/Diagnostics/KafkaLogEvents.cs +++ b/src/Silverback.Integration.Kafka/Diagnostics/KafkaLogEvents.cs @@ -227,6 +227,22 @@ public static class KafkaLogEvents GetEventId(43, nameof(KafkaLogHandlerError)), "Error in Kafka log handler."); + /// + /// Gets the representing the log that is written when a partition is paused. + /// + public static LogEvent PartitionPaused { get; } = new( + LogLevel.Debug, + GetEventId(44, nameof(PartitionPaused)), + "Partition {topic}[{partition}] paused at offset {offset}."); + + /// + /// Gets the representing the log that is written when a partition is resumed. + /// + public static LogEvent PartitionResumed { get; } = new( + LogLevel.Debug, + GetEventId(45, nameof(PartitionResumed)), + "Partition {topic}[{partition}] resumed."); + /// /// Gets the representing the log that is written when an exception is thrown /// disconnecting the underlying Confluent.Kafka consumer. diff --git a/src/Silverback.Integration.Kafka/Diagnostics/KafkaLoggerExtensions.cs b/src/Silverback.Integration.Kafka/Diagnostics/KafkaLoggerExtensions.cs index 9b64001be..c1050a682 100644 --- a/src/Silverback.Integration.Kafka/Diagnostics/KafkaLoggerExtensions.cs +++ b/src/Silverback.Integration.Kafka/Diagnostics/KafkaLoggerExtensions.cs @@ -12,42 +12,31 @@ internal static class KafkaLoggerExtensions { private static readonly Action ConsumingMessage = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConsumingMessage)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConsumingMessage)); private static readonly Action EndOfPartition = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.EndOfPartition)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.EndOfPartition)); private static readonly Action KafkaExceptionAutoRecovery = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent( - KafkaLogEvents.KafkaExceptionAutoRecovery)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.KafkaExceptionAutoRecovery)); private static readonly Action KafkaExceptionNoAutoRecovery = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent( - KafkaLogEvents.KafkaExceptionNoAutoRecovery)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.KafkaExceptionNoAutoRecovery)); private static readonly Action ConsumingCanceled = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConsumingCanceled)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConsumingCanceled)); private static readonly Action CreatingConfluentProducer = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichProducerLogEvent( - KafkaLogEvents.CreatingConfluentProducer)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.CreatingConfluentProducer)); private static readonly Action ProduceNotAcknowledged = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichProducerLogEvent( - KafkaLogEvents.ProduceNotAcknowledged)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.ProduceNotAcknowledged)); private static readonly Action PartitionAssigned = @@ -86,37 +75,27 @@ internal static class KafkaLoggerExtensions private static readonly Action ConfluentConsumerFatalError = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent( - KafkaLogEvents.ConfluentConsumerFatalError)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerFatalError)); private static readonly Action KafkaConsumerErrorHandlerError = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent( - KafkaLogEvents.KafkaErrorHandlerError)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.KafkaErrorHandlerError)); private static readonly Action KafkaConsumerLogHandlerError = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.KafkaLogHandlerError)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.KafkaLogHandlerError)); private static readonly Action KafkaProducerLogHandlerError = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.KafkaLogHandlerError)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.KafkaLogHandlerError)); private static readonly Action ConsumerStatisticsReceived = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent( - KafkaLogEvents.ConsumerStatisticsReceived)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConsumerStatisticsReceived)); private static readonly Action ProducerStatisticsReceived = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichProducerLogEvent( - KafkaLogEvents.ProducerStatisticsReceived)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.ProducerStatisticsReceived)); private static readonly Action StatisticsDeserializationError = @@ -131,87 +110,73 @@ internal static class KafkaLoggerExtensions private static readonly Action ConfluentConsumerError = - SilverbackLoggerMessage.Define( + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerError)); + + private static readonly Action + PartitionPaused = + SilverbackLoggerMessage.Define( IntegrationLoggerExtensions.EnrichConsumerLogEvent( - KafkaLogEvents.ConfluentConsumerError)); + KafkaLogEvents.PartitionPaused, + false)); + + private static readonly Action + PartitionResumed = + SilverbackLoggerMessage.Define( + IntegrationLoggerExtensions.EnrichConsumerLogEvent( + KafkaLogEvents.PartitionResumed, + false)); private static readonly Action ConfluentConsumerDisconnectError = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent( - KafkaLogEvents.ConfluentConsumerDisconnectError)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerDisconnectError)); private static readonly Action PollTimeoutAutoRecovery = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent( - KafkaLogEvents.PollTimeoutAutoRecovery)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.PollTimeoutAutoRecovery)); private static readonly Action PollTimeoutNoAutoRecovery = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent( - KafkaLogEvents.PollTimeoutNoAutoRecovery)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.PollTimeoutNoAutoRecovery)); private static readonly Action ConfluentProducerLogCritical = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichProducerLogEvent( - KafkaLogEvents.ConfluentProducerLogCritical)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.ConfluentProducerLogCritical)); private static readonly Action ConfluentProducerLogError = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichProducerLogEvent( - KafkaLogEvents.ConfluentProducerLogError)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.ConfluentProducerLogError)); private static readonly Action ConfluentProducerLogWarning = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichProducerLogEvent( - KafkaLogEvents.ConfluentProducerLogWarning)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.ConfluentProducerLogWarning)); private static readonly Action ConfluentProducerLogInformation = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichProducerLogEvent( - KafkaLogEvents.ConfluentProducerLogInformation)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.ConfluentProducerLogInformation)); private static readonly Action ConfluentProducerLogDebug = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichProducerLogEvent( - KafkaLogEvents.ConfluentProducerLogDebug)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.ConfluentProducerLogDebug)); private static readonly Action ConfluentConsumerLogCritical = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent( - KafkaLogEvents.ConfluentConsumerLogCritical)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerLogCritical)); private static readonly Action ConfluentConsumerLogError = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent( - KafkaLogEvents.ConfluentConsumerLogError)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerLogError)); private static readonly Action ConfluentConsumerLogWarning = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent( - KafkaLogEvents.ConfluentConsumerLogWarning)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerLogWarning)); private static readonly Action ConfluentConsumerLogInformation = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent( - KafkaLogEvents.ConfluentConsumerLogInformation)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerLogInformation)); private static readonly Action ConfluentConsumerLogDebug = - SilverbackLoggerMessage.Define( - IntegrationLoggerExtensions.EnrichConsumerLogEvent( - KafkaLogEvents.ConfluentConsumerLogDebug)); + SilverbackLoggerMessage.Define(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerLogDebug)); public static void LogConsuming( this ISilverbackLogger logger, @@ -456,6 +421,29 @@ public static void LogConfluentConsumerError( null); } + public static void LogPartitionPaused( + this ISilverbackLogger logger, + TopicPartitionOffset topicPartitionOffset, + KafkaConsumer consumer) => + PartitionPaused( + logger.InnerLogger, + topicPartitionOffset.Topic, + topicPartitionOffset.Partition, + topicPartitionOffset.Offset, + consumer.Id, + null); + + public static void LogPartitionResumed( + this ISilverbackLogger logger, + TopicPartition topicPartition, + KafkaConsumer consumer) => + PartitionResumed( + logger.InnerLogger, + topicPartition.Topic, + topicPartition.Partition, + consumer.Id, + null); + public static void LogConfluentConsumerDisconnectError( this ISilverbackLogger logger, KafkaConsumer consumer, diff --git a/src/Silverback.Integration.Kafka/Messaging/Broker/KafkaConsumer.cs b/src/Silverback.Integration.Kafka/Messaging/Broker/KafkaConsumer.cs index 3a9d1a262..3d6919f0e 100644 --- a/src/Silverback.Integration.Kafka/Messaging/Broker/KafkaConsumer.cs +++ b/src/Silverback.Integration.Kafka/Messaging/Broker/KafkaConsumer.cs @@ -292,6 +292,7 @@ protected override Task RollbackCoreAsync(IReadOnlyCollection broke if (IsConsuming) { _confluentConsumer?.Pause(latestTopicPartitionOffsets.Select(topicPartitionOffset => topicPartitionOffset.TopicPartition)); + latestTopicPartitionOffsets.ForEach(topicPartitionOffset => _logger.LogPartitionPaused(topicPartitionOffset, this)); } var channelsManagerStoppingTasks = new List(latestTopicPartitionOffsets.Count); @@ -336,6 +337,7 @@ await Task.WhenAll(channelsManagerStoppingTasks.Where(task => task != null)) _channelsManager?.StartReading(topicPartition); _confluentConsumer?.Resume(new[] { topicPartition }); + _logger.LogPartitionResumed(topicPartition, this); } } }