Skip to content

Commit

Permalink
Fix issue with message payload being disposed while resending (#282)
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertIndie authored Nov 21, 2024
1 parent e89bad8 commit 93bd8f6
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 7 deletions.
1 change: 0 additions & 1 deletion src/Pulsar.Client/Common/Commands.fs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ let private serializePayloadCommand (command : BaseCommand) (metadata: MessageMe
try
return! temp.CopyToAsync(output)
finally
payload.Dispose()
temp.Dispose()
binaryWriter.Dispose()
} :> Task
Expand Down
3 changes: 2 additions & 1 deletion src/Pulsar.Client/Common/DTO.fs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,8 @@ type internal PendingMessage<'T> =
CreatedAt: TimeStamp
SequenceId: SequenceId
HighestSequenceId: SequenceId
Payload: SendTask
SendTask: SendTask
Payload: MemoryStream
Callback : PendingCallback<'T>
}

Expand Down
14 changes: 9 additions & 5 deletions src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
let failPendingMessages (ex: exn) =
while pendingMessages.Count > 0 do
let msg = pendingMessages.Dequeue()
msg.Payload.Dispose()
failPendingMessage msg ex
while blockedRequests.Count > 0 do
let struct(_, channel, _) = blockedRequests.Dequeue()
Expand Down Expand Up @@ -191,7 +192,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
pendingMessages.Enqueue(pendingMessage)
match connectionHandler.ConnectionState with
| Ready clientCnx ->
clientCnx.SendAndForget pendingMessage.Payload
clientCnx.SendAndForget pendingMessage.SendTask
| _ ->
Log.Logger.LogWarning("{0} not connected, skipping send", prefix)

Expand All @@ -208,12 +209,13 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
|> post this.Mb
i <- i - 1
pendingMessages.Dequeue() |> ignore
pendingMessage.Payload.Dispose()

let resendMessages (clientCnx: ClientCnx) =
if pendingMessages.Count > 0 then
Log.Logger.LogInformation("{0} resending {1} pending messages", prefix, pendingMessages.Count)
for pendingMessage in pendingMessages do
clientCnx.SendAndForget pendingMessage.Payload
clientCnx.SendAndForget pendingMessage.SendTask
else
Log.Logger.LogDebug("{0} No pending messages to resend", prefix)
producerCreatedTsc.TrySetResult() |> ignore
Expand All @@ -222,7 +224,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
backgroundTask {
use stream = MemoryStreamManager.GetStream()
use reader = new BinaryReader(stream)
let struct(send, _) = msg.Payload
let struct(send, _) = msg.SendTask
let writer = PipeWriter.Create(stream, StreamPipeWriterOptions(leaveOpen = true))
do! send writer // materialize stream
do! writer.CompleteAsync()
Expand Down Expand Up @@ -324,7 +326,8 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
let pendingMessage = {
SequenceId = lowestSequenceId
HighestSequenceId = highestSequenceId
Payload = sendTask
SendTask = sendTask
Payload = encryptedBatchPayload
Callback = BatchCallbacks batchCallbacks
CreatedAt = %Stopwatch.GetTimestamp()
}
Expand Down Expand Up @@ -468,7 +471,8 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
let pendingMessage = {
SequenceId = sequenceId
HighestSequenceId = %(-1L)
Payload = payload
SendTask = payload
Payload = encryptedPayload
Callback = SingleCallback (chunkDetails, message, channel)
CreatedAt = %Stopwatch.GetTimestamp()
}
Expand Down
12 changes: 12 additions & 0 deletions tests/UnitTests/Common/CommandTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,16 @@ module CommandsTests =
command.Seek.ConsumerId |> Expect.equal "" %consumerId
command.Seek.RequestId |> Expect.equal "" %requestId
}

test "newSend shouldn't dispose the payload" {
let producerId: ProducerId = % 5UL
let sequenceId: SequenceId = % 6L
let numMessages = 1
let metadata = MessageMetadata(ProducerName = "TestMe")
let payload = [| 1uy; 17uy; |]
let streamPayload = new MemoryStream(payload)

serializeDeserializePayloadCommand (newSend producerId sequenceId None numMessages metadata streamPayload) |> ignore
Expect.isTrue "Stream should not be disposed" streamPayload.CanRead
}
]

0 comments on commit 93bd8f6

Please sign in to comment.