Skip to content

Commit

Permalink
feat: add debug logs for partition pause and resume
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed Jul 27, 2023
1 parent 7b82467 commit bcb0940
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup Label="Package information">
<BaseVersionSuffix>-beta.2</BaseVersionSuffix>
<BaseVersionSuffix>-beta.3</BaseVersionSuffix>
<BaseVersion>4.4.0$(BaseVersionSuffix)</BaseVersion>
<DatabasePackagesRevision>1</DatabasePackagesRevision>
<DatabasePackagesVersionSuffix>$(BaseVersionSuffix)</DatabasePackagesVersionSuffix>
Expand Down
2 changes: 1 addition & 1 deletion docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ uid: releases

# Releases

## [4.4.0-beta.2](https://github.com/BEagle1984/silverback/releases/tag/v4.4.0)
## [4.4.0-beta.3](https://github.com/BEagle1984/silverback/releases/tag/v4.4.0)

### What's new

Expand Down
16 changes: 16 additions & 0 deletions src/Silverback.Integration.Kafka/Diagnostics/KafkaLogEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,22 @@ public static class KafkaLogEvents
GetEventId(43, nameof(KafkaLogHandlerError)),
"Error in Kafka log handler.");

/// <summary>
/// Gets the <see cref="LogEvent" /> representing the log that is written when a partition is paused.
/// </summary>
public static LogEvent PartitionPaused { get; } = new(
LogLevel.Debug,
GetEventId(44, nameof(PartitionPaused)),
"Partition {topic}[{partition}] paused at offset {offset}.");

/// <summary>
/// Gets the <see cref="LogEvent" /> representing the log that is written when a partition is resumed.
/// </summary>
public static LogEvent PartitionResumed { get; } = new(
LogLevel.Debug,
GetEventId(45, nameof(PartitionResumed)),
"Partition {topic}[{partition}] resumed.");

/// <summary>
/// Gets the <see cref="LogEvent" /> representing the log that is written when an exception is thrown
/// disconnecting the underlying Confluent.Kafka consumer.
Expand Down
138 changes: 63 additions & 75 deletions src/Silverback.Integration.Kafka/Diagnostics/KafkaLoggerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,31 @@ internal static class KafkaLoggerExtensions
{
private static readonly Action<ILogger, string, int, long, string, string, Exception?>
ConsumingMessage =
SilverbackLoggerMessage.Define<string, int, long, string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConsumingMessage));
SilverbackLoggerMessage.Define<string, int, long, string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConsumingMessage));

private static readonly Action<ILogger, string, int, long, string, string, Exception?>
EndOfPartition =
SilverbackLoggerMessage.Define<string, int, long, string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.EndOfPartition));
SilverbackLoggerMessage.Define<string, int, long, string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.EndOfPartition));

private static readonly Action<ILogger, string, string, Exception?>
KafkaExceptionAutoRecovery =
SilverbackLoggerMessage.Define<string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(
KafkaLogEvents.KafkaExceptionAutoRecovery));
SilverbackLoggerMessage.Define<string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.KafkaExceptionAutoRecovery));

private static readonly Action<ILogger, string, string, Exception?>
KafkaExceptionNoAutoRecovery =
SilverbackLoggerMessage.Define<string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(
KafkaLogEvents.KafkaExceptionNoAutoRecovery));
SilverbackLoggerMessage.Define<string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.KafkaExceptionNoAutoRecovery));

private static readonly Action<ILogger, string, string, Exception?>
ConsumingCanceled =
SilverbackLoggerMessage.Define<string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConsumingCanceled));
SilverbackLoggerMessage.Define<string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConsumingCanceled));

private static readonly Action<ILogger, string, string, Exception?>
CreatingConfluentProducer =
SilverbackLoggerMessage.Define<string, string>(
IntegrationLoggerExtensions.EnrichProducerLogEvent(
KafkaLogEvents.CreatingConfluentProducer));
SilverbackLoggerMessage.Define<string, string>(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.CreatingConfluentProducer));

private static readonly Action<ILogger, string, string, Exception?>
ProduceNotAcknowledged =
SilverbackLoggerMessage.Define<string, string>(
IntegrationLoggerExtensions.EnrichProducerLogEvent(
KafkaLogEvents.ProduceNotAcknowledged));
SilverbackLoggerMessage.Define<string, string>(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.ProduceNotAcknowledged));

private static readonly Action<ILogger, string, int, string, Exception?>
PartitionAssigned =
Expand Down Expand Up @@ -86,37 +75,27 @@ internal static class KafkaLoggerExtensions

private static readonly Action<ILogger, string, int, string, string, Exception?>
ConfluentConsumerFatalError =
SilverbackLoggerMessage.Define<string, int, string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(
KafkaLogEvents.ConfluentConsumerFatalError));
SilverbackLoggerMessage.Define<string, int, string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerFatalError));

private static readonly Action<ILogger, string, string, Exception?>
KafkaConsumerErrorHandlerError =
SilverbackLoggerMessage.Define<string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(
KafkaLogEvents.KafkaErrorHandlerError));
SilverbackLoggerMessage.Define<string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.KafkaErrorHandlerError));

private static readonly Action<ILogger, string, string, Exception?>
KafkaConsumerLogHandlerError =
SilverbackLoggerMessage.Define<string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.KafkaLogHandlerError));
SilverbackLoggerMessage.Define<string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.KafkaLogHandlerError));

private static readonly Action<ILogger, string, string, Exception?>
KafkaProducerLogHandlerError =
SilverbackLoggerMessage.Define<string, string>(
IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.KafkaLogHandlerError));
SilverbackLoggerMessage.Define<string, string>(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.KafkaLogHandlerError));

private static readonly Action<ILogger, string, string, string, Exception?>
ConsumerStatisticsReceived =
SilverbackLoggerMessage.Define<string, string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(
KafkaLogEvents.ConsumerStatisticsReceived));
SilverbackLoggerMessage.Define<string, string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConsumerStatisticsReceived));

private static readonly Action<ILogger, string, string, string, Exception?>
ProducerStatisticsReceived =
SilverbackLoggerMessage.Define<string, string, string>(
IntegrationLoggerExtensions.EnrichProducerLogEvent(
KafkaLogEvents.ProducerStatisticsReceived));
SilverbackLoggerMessage.Define<string, string, string>(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.ProducerStatisticsReceived));

private static readonly Action<ILogger, Exception?>
StatisticsDeserializationError =
Expand All @@ -131,87 +110,73 @@ internal static class KafkaLoggerExtensions

private static readonly Action<ILogger, string, int, string, string, Exception?>
ConfluentConsumerError =
SilverbackLoggerMessage.Define<string, int, string, string>(
SilverbackLoggerMessage.Define<string, int, string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerError));

private static readonly Action<ILogger, string, int, long, string, Exception?>
PartitionPaused =
SilverbackLoggerMessage.Define<string, int, long, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(
KafkaLogEvents.ConfluentConsumerError));
KafkaLogEvents.PartitionPaused,
false));

private static readonly Action<ILogger, string, int, string, Exception?>
PartitionResumed =
SilverbackLoggerMessage.Define<string, int, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(
KafkaLogEvents.PartitionResumed,
false));

private static readonly Action<ILogger, string, string, Exception?>
ConfluentConsumerDisconnectError =
SilverbackLoggerMessage.Define<string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(
KafkaLogEvents.ConfluentConsumerDisconnectError));
SilverbackLoggerMessage.Define<string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerDisconnectError));

private static readonly Action<ILogger, string, string, string, string, Exception?>
PollTimeoutAutoRecovery =
SilverbackLoggerMessage.Define<string, string, string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(
KafkaLogEvents.PollTimeoutAutoRecovery));
SilverbackLoggerMessage.Define<string, string, string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.PollTimeoutAutoRecovery));

private static readonly Action<ILogger, string, string, string, string, Exception?>
PollTimeoutNoAutoRecovery =
SilverbackLoggerMessage.Define<string, string, string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(
KafkaLogEvents.PollTimeoutNoAutoRecovery));
SilverbackLoggerMessage.Define<string, string, string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.PollTimeoutNoAutoRecovery));

private static readonly Action<ILogger, string, string, string, string, Exception?>
ConfluentProducerLogCritical =
SilverbackLoggerMessage.Define<string, string, string, string>(
IntegrationLoggerExtensions.EnrichProducerLogEvent(
KafkaLogEvents.ConfluentProducerLogCritical));
SilverbackLoggerMessage.Define<string, string, string, string>(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.ConfluentProducerLogCritical));

private static readonly Action<ILogger, string, string, string, string, Exception?>
ConfluentProducerLogError =
SilverbackLoggerMessage.Define<string, string, string, string>(
IntegrationLoggerExtensions.EnrichProducerLogEvent(
KafkaLogEvents.ConfluentProducerLogError));
SilverbackLoggerMessage.Define<string, string, string, string>(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.ConfluentProducerLogError));

private static readonly Action<ILogger, string, string, string, string, Exception?>
ConfluentProducerLogWarning =
SilverbackLoggerMessage.Define<string, string, string, string>(
IntegrationLoggerExtensions.EnrichProducerLogEvent(
KafkaLogEvents.ConfluentProducerLogWarning));
SilverbackLoggerMessage.Define<string, string, string, string>(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.ConfluentProducerLogWarning));

private static readonly Action<ILogger, string, string, string, string, Exception?>
ConfluentProducerLogInformation =
SilverbackLoggerMessage.Define<string, string, string, string>(
IntegrationLoggerExtensions.EnrichProducerLogEvent(
KafkaLogEvents.ConfluentProducerLogInformation));
SilverbackLoggerMessage.Define<string, string, string, string>(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.ConfluentProducerLogInformation));

private static readonly Action<ILogger, string, string, string, string, Exception?>
ConfluentProducerLogDebug =
SilverbackLoggerMessage.Define<string, string, string, string>(
IntegrationLoggerExtensions.EnrichProducerLogEvent(
KafkaLogEvents.ConfluentProducerLogDebug));
SilverbackLoggerMessage.Define<string, string, string, string>(IntegrationLoggerExtensions.EnrichProducerLogEvent(KafkaLogEvents.ConfluentProducerLogDebug));

private static readonly Action<ILogger, string, string, string, string, Exception?>
ConfluentConsumerLogCritical =
SilverbackLoggerMessage.Define<string, string, string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(
KafkaLogEvents.ConfluentConsumerLogCritical));
SilverbackLoggerMessage.Define<string, string, string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerLogCritical));

private static readonly Action<ILogger, string, string, string, string, Exception?>
ConfluentConsumerLogError =
SilverbackLoggerMessage.Define<string, string, string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(
KafkaLogEvents.ConfluentConsumerLogError));
SilverbackLoggerMessage.Define<string, string, string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerLogError));

private static readonly Action<ILogger, string, string, string, string, Exception?>
ConfluentConsumerLogWarning =
SilverbackLoggerMessage.Define<string, string, string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(
KafkaLogEvents.ConfluentConsumerLogWarning));
SilverbackLoggerMessage.Define<string, string, string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerLogWarning));

private static readonly Action<ILogger, string, string, string, string, Exception?>
ConfluentConsumerLogInformation =
SilverbackLoggerMessage.Define<string, string, string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(
KafkaLogEvents.ConfluentConsumerLogInformation));
SilverbackLoggerMessage.Define<string, string, string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerLogInformation));

private static readonly Action<ILogger, string, string, string, string, Exception?>
ConfluentConsumerLogDebug =
SilverbackLoggerMessage.Define<string, string, string, string>(
IntegrationLoggerExtensions.EnrichConsumerLogEvent(
KafkaLogEvents.ConfluentConsumerLogDebug));
SilverbackLoggerMessage.Define<string, string, string, string>(IntegrationLoggerExtensions.EnrichConsumerLogEvent(KafkaLogEvents.ConfluentConsumerLogDebug));

public static void LogConsuming(
this ISilverbackLogger logger,
Expand Down Expand Up @@ -456,6 +421,29 @@ public static void LogConfluentConsumerError(
null);
}

public static void LogPartitionPaused(
this ISilverbackLogger logger,
TopicPartitionOffset topicPartitionOffset,
KafkaConsumer consumer) =>
PartitionPaused(
logger.InnerLogger,
topicPartitionOffset.Topic,
topicPartitionOffset.Partition,
topicPartitionOffset.Offset,
consumer.Id,
null);

public static void LogPartitionResumed(
this ISilverbackLogger logger,
TopicPartition topicPartition,
KafkaConsumer consumer) =>
PartitionResumed(
logger.InnerLogger,
topicPartition.Topic,
topicPartition.Partition,
consumer.Id,
null);

public static void LogConfluentConsumerDisconnectError(
this ISilverbackLogger logger,
KafkaConsumer consumer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ protected override Task RollbackCoreAsync(IReadOnlyCollection<KafkaOffset> broke
if (IsConsuming)
{
_confluentConsumer?.Pause(latestTopicPartitionOffsets.Select(topicPartitionOffset => topicPartitionOffset.TopicPartition));
latestTopicPartitionOffsets.ForEach(topicPartitionOffset => _logger.LogPartitionPaused(topicPartitionOffset, this));
}

var channelsManagerStoppingTasks = new List<Task?>(latestTopicPartitionOffsets.Count);
Expand Down Expand Up @@ -336,6 +337,7 @@ await Task.WhenAll(channelsManagerStoppingTasks.Where(task => task != null))
_channelsManager?.StartReading(topicPartition);

_confluentConsumer?.Resume(new[] { topicPartition });
_logger.LogPartitionResumed(topicPartition, this);
}
}
}
Expand Down

0 comments on commit bcb0940

Please sign in to comment.