Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

panic during rapid send of many large-sized payloads: "panic: send on closed channel" #239

Closed
jinfengnarvar opened this issue May 5, 2020 · 9 comments · Fixed by #248
Closed

Comments

@jinfengnarvar
Copy link

Expected behavior

Sending large number of large payloads via Producer.Send() shouldn't panic/crash

Actual behavior

Need some insights from you guys if this rings any bell or not. We've been using pure golang client for a few weeks now with good results until we just recently onboarded a new customer, who sends in large order files each of which contains many large records.

Typically record after protobuf encoding is about 150KB to 300KB. And we have millions of these records. Although the send is called in a single thread right now, when we start rapidly calling Send, our logs suddenly shows this:

.......
.......
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:46Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:47Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-05T03:06:48Z","severity":"INFO","message":"Error reading from connection"}
panic: send on closed channel

goroutine 3105 [running]:
github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request.func1(0xc001e3bc00, 0x0, 0x0)
	/go/pkg/mod/github.com/apache/pulsar-client-go@v0.0.0-20200316114055-4dc7855f99dc/pulsar/internal/rpc_client.go:97 +0xb6
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleResponse(0xc0057945a0, 0x65, 0xc001e3bc00)
	/go/pkg/mod/github.com/apache/pulsar-client-go@v0.0.0-20200316114055-4dc7855f99dc/pulsar/internal/connection.go:493 +0xb3
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).internalReceivedCommand(0xc0057945a0, 0xc001e3bc00, 0x0, 0x0)
	/go/pkg/mod/github.com/apache/pulsar-client-go@v0.0.0-20200316114055-4dc7855f99dc/pulsar/internal/connection.go:403 +0x2e4
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run(0xc0057945a0)
	/go/pkg/mod/github.com/apache/pulsar-client-go@v0.0.0-20200316114055-4dc7855f99dc/pulsar/internal/connection.go:325 +0x304
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1(0xc0057945a0)
	/go/pkg/mod/github.com/apache/pulsar-client-go@v0.0.0-20200316114055-4dc7855f99dc/pulsar/internal/connection.go:195 +0x59
created by github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start
	/go/pkg/mod/github.com/apache/pulsar-client-go@v0.0.0-20200316114055-4dc7855f99dc/pulsar/internal/connection.go:192 +0x3f

Steps to reproduce

Not sure how this can be repro'ed on your side. I'm reaching out to your rep's slack channel within our company to investigate as well.

System configuration

Pulsar version: github.com/apache/pulsar-client-go v0.0.0-20200316114055-4dc7855f99dc
Go version: 1.12

@wolfstudy
Copy link
Member

Thanks @jinfengnarvar feedback, It seems that we did not close the channel properly somewhere, causing the application to continue writing data to a closed channel.

@wolfstudy
Copy link
Member

I will try to reproduce the issue, you are also welcome to provide a reproducible code

@jinfengnarvar
Copy link
Author

@wolfstudy thx. i'm also trying to find a (more) deterministic way to repro it - right now it only occurs in our prod env service that processes multiple large files at the same time. crash occurs frequently but not in a deterministic way.

also i'm wondering if anyone has any insight of what's the repeated blocks of "failed to write...; connection closed...; reconnecting to broker....;" failure logs indicate? Apparently these logs are from within the pulsar go client, you guys happen to use github.com/sirupsen/logrus as well so we were able to catch it and dump it out.

my question is we're in the midst of writing to producer partitions, and what could usually cause the connection sever? broker overloaded (due to volume or payload size?) Anything on the pulsar server side can be configured to reveal the connection drop failure reason?

@jinfengnarvar
Copy link
Author

A bit more finding after having placing some logs in our code:

func sendOrderMessage(
        ctx *msgProcessor.MsgContext, key string, orderMessage order.QueueOrderMessage,
        recordIndex, aggregatedRecordIndex int) error {
    pbBytes, _ := proto.Marshal(&orderMessage)
    if file1022LogOn(ctx, false) {
        nlog.Infof("cabelas sending pulsar: raw record index: %d, aggregated record index: %d, bytes: %d, key: %s",
            recordIndex + 1, aggregatedRecordIndex + 1, len(pbBytes), key)
    }
    defer func() {
        if p := recover(); p != nil {
            nlog.Infof("crashed")
            nlog.Infof("  - raw record index: %d", recordIndex + 1)
            nlog.Infof("  - aggregated record index: %d", aggregatedRecordIndex + 1)
            nlog.Infof("  - bytes: %d", len(pbBytes))
            nlog.Infof("  - key: %s", key)
            nlog.Infof("  - payload: %s", jsonutils.BestEffortJsonMarshal(orderMessage))
            // re-panic
            panic(p)
        }
    }()
    err := ctx.OutputClient.Send(
        context.Background(),
        npulsar.NewPulsarProducerMessage(pbBytes, key))
    if err != nil {
    ...
    ...

Notes:

  • because we have many lines, so file1022LogOn is a probability logging decider so we don't log out too much. but it is irrelevant here since in the panic recovery we do always log out.

Now the logging we've obtained:

{"timestamp":"2020-05-06T15:02:53Z","severity":"INFO","message":"cabelas sending pulsar: raw record index: 34572, aggregated record index: 458, bytes: 19008, key: 250473543"}
{"timestamp":"2020-05-06T15:02:57Z","severity":"INFO","message":"cabelas sending pulsar: raw record index: 37516, aggregated record index: 516, bytes: 3953, key: 250473623"}
{"timestamp":"2020-05-06T15:03:03Z","severity":"INFO","message":"cabelas sending pulsar: raw record index: 41463, aggregated record index: 588, bytes: 9694, key: 250473719"}
{"timestamp":"2020-05-06T15:03:03Z","severity":"INFO","message":"cabelas sending pulsar: raw record index: 41467, aggregated record index: 589, bytes: 5480, key: 250473720"}
{"timestamp":"2020-05-06T15:03:04Z","severity":"INFO","message":"cabelas sending pulsar: raw record index: 42743, aggregated record index: 603, bytes: 144220, key: 250473740"}
{"timestamp":"2020-05-06T15:03:05Z","severity":"INFO","message":"cabelas sending pulsar: raw record index: 43162, aggregated record index: 613, bytes: 8613, key: 250473756"}
{"timestamp":"2020-05-06T15:03:08Z","severity":"INFO","message":"cabelas sending pulsar: raw record index: 45655, aggregated record index: 631, bytes: 14079, key: 250473779"}
{"timestamp":"2020-05-06T15:03:16Z","severity":"INFO","message":"cabelas sending pulsar: raw record index: 51348, aggregated record index: 703, bytes: 14795, key: 250473874"}
....
{"timestamp":"2020-05-06T15:03:26Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-06T15:03:26Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-06T15:03:26Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-06T15:03:26Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-06T15:03:26Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-06T15:03:26Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-06T15:03:26Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-06T15:03:26Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-06T15:03:26Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-06T15:03:26Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-06T15:03:26Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Error reading from connection"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Connecting to broker"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"TCP connection established"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Connection is ready"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Resending 1 pending batches"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnected producer to broker"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"WARNING","message":"Failed to write on connection"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Connection closed"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"WARNING","message":"Connection was closed"}
{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Reconnecting to broker in 100ms"}
panic: send on closed channel

goroutine 834 [running]:
github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request.func1(0xc0028fa1c0, 0x0, 0x0)
	/go/pkg/mod/github.com/apache/pulsar-client-go@v0.0.0-20200316114055-4dc7855f99dc/pulsar/internal/rpc_client.go:97 +0xb6
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleResponse(0xc0052b6b40, 0x1e, 0xc0028fa1c0)
	/go/pkg/mod/github.com/apache/pulsar-client-go@v0.0.0-20200316114055-4dc7855f99dc/pulsar/internal/connection.go:493 +0xb3

As you can see:

  • At 1% logging rate, those logged payloads are not big, the biggest is 140kb.
  • Apparently the crash isn't in our send goroutine, because we would otherwise capture the panic and logged out but we didn't. So the crash is inside pulsar client somewhere.
  • I'm really suspecting this has something to do with capacity?

@jinfengnarvar
Copy link
Author

Update, enabled more logging and found that the payload we're sending right before the crash occurred is 27MB big!!

That object is incorrect, shouldn't be that big.

That said, do we want to make the pulsar go client more robust to fail us and return us some err, instead of panic/crash?

@wolfstudy
Copy link
Member

wolfstudy commented May 7, 2020

connection closed...; reconnecting to broker....;

This means that the current connection between the client and the broker is closed, and the client attempts to establish a connection with the broker again within the connection timeout.

{"timestamp":"2020-05-06T15:03:27Z","severity":"INFO","message":"Connection closed"}

The log level is INFO, this means that the user program triggered the go client to close the current connection. When the go client receives the connectionClosed, it will reconnect to the broker.

func (c *connection) internalWriteData(data []byte) {
	c.log.Debug("Write data: ", len(data))
	if _, err := c.cnx.Write(data); err != nil {
		c.log.WithError(err).Warn("Failed to write on connection")
		c.Close()
	}
}

So the root of the problem is the failure of c.cnx.Write (data), can you open the debug log and see what the data format is when an error occurs?

@wolfstudy
Copy link
Member

wolfstudy commented May 7, 2020

Update, enabled more logging and found that the payload we're sending right before the crash occurred is 27MB big!!

By default, the maximum message allowed by the broker is 5MB, as follows:

# Max size of messages.
maxMessageSize=5242880

The best way is that we should check the size of the message in the go client, if it exceeds 5MB we will not process it. Follow the java client logic: https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L294-L302

@jinfengnarvar
Copy link
Author

Update, enabled more logging and found that the payload we're sending right before the crash occurred is 27MB big!!

By default, the maximum message allowed by the broker is 5MB, as follows:

# Max size of messages.
maxMessageSize=5242880

The best way is that we should check the size of the message in the go client, if it exceeds 5MB we will not process it.

Yes having a size check in the client library is good. But I'm still concerned about the crash. Even without the check, when we (our code) ask the client to send something illegal (in this case an over-the-limit payload), it should fail gracefully with no panic. Seems like there is some racing condition in the client caused this panic.

@wolfstudy
Copy link
Member

Seems like there is some racing condition in the client caused this panic.

Yes, agree with you, even if we checked the size of the message, the error panic: send on closed channel should not happen, so we may not close the channel properly somewhere.

rueian added a commit to rueian/pulsar-client-go that referenced this issue May 13, 2020
rueian added a commit to rueian/pulsar-client-go that referenced this issue May 13, 2020
rueian added a commit to rueian/pulsar-client-go that referenced this issue May 13, 2020
rueian added a commit to rueian/pulsar-client-go that referenced this issue May 14, 2020
wolfstudy pushed a commit that referenced this issue Aug 17, 2020
Fixes #239

### Motivation

As @wolfstudy pointed out here #239 (comment)

There is a race on callbacks of `pendingReqs` when closing the connection while the run loop is still running, which will lead to calling a callback up to 2 times:
https://github.com/apache/pulsar-client-go/blob/e7f1673350f208b5063823282d14906d70d66904/pulsar/internal/connection.go#L669-L671

### Modifications

Introducing a `runLoopStoppedCh` to make sure that the run loop has already stopped when cleaning callbacks of `pendingReqs` in the `Close()`
wolfstudy added a commit that referenced this issue Jun 30, 2021
Signed-off-by: xiaolongran <xiaolongran@tencent.com>


### Motivation

In `internalSendRequest`, We will add the request to be sent to the `pendingReqs` map, even when the current connection status is `connectionClosed`, we will append the request, which will cause the current request's callback to be called twice

First:

```
func (c *connection) internalSendRequest(req *request) {
	c.pendingLock.Lock()
	if req.id != nil {
		c.pendingReqs[*req.id] = req
	}
	c.pendingLock.Unlock()
	if c.getState() == connectionClosed {
		c.log.Warnf("internalSendRequest failed for connectionClosed")
                // In Here, call req.callback *************
		if req.callback != nil {
			req.callback(req.cmd, ErrConnectionClosed)
		}
	} else {
		c.writeCommand(req.cmd)
	}
}
```

Twice:

```
func (c *connection) run() {
	// All reads come from the reader goroutine
	go c.reader.readFromConnection()
	go c.runPingCheck()

	c.log.Debugf("Connection run starting with request capacity=%d queued=%d",
		cap(c.incomingRequestsCh), len(c.incomingRequestsCh))

	defer func() {
		// all the accesses to the pendingReqs should be happened in this run loop thread,
		// including the final cleanup, to avoid the issue #239
		c.pendingLock.Lock()
		for id, req := range c.pendingReqs {
                         // In Here, call req.callback **********
			req.callback(nil, errors.New("connection closed"))
			delete(c.pendingReqs, id)
		}
		c.pendingLock.Unlock()
		c.Close()
	}()
       ....
}
```

In fact, when the current connection is in the `connectionClosed` state, we don’t need to append the request to the `pendingReqs` map, so we don’t need to process the request when it’s closed.


### Modifications

When the connection is closed, the current request to be sent is not added to the `pendingReqs` map.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants