You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Error streaming topic with cleanup policy: compact,delete
I'm trying to stream a topic with the above cleanup policy and I get this weird exception
Your environment
Operating System - Ubuntu 18
Brooklin version - 1.0.2
Kafka version - 2.7.0
ZooKeeper version - 3.5.8
Steps to reproduce
create the topic in both sites
create a stream from one cluster to another
Actual behavior
Nothing is streamed and I get this exceptions in the log (full log is attached):
[2021-03-09 14:02:36,965] INFO ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [ita-kafka01:9092, ita-kafka02:9092, ita-kafka03:9092]
buffer.memory = 33554432
client.id = datastream-producer
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
(org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,362] WARN The configuration 'producer.acks' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,363] WARN The configuration 'zookeeper.connect' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,363] WARN The configuration 'producer.max.block.ms' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,363] WARN The configuration 'producer.max.in.flight.requests.per.connection' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,363] WARN The configuration 'factoryClassName' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,363] INFO Kafka version : 2.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2021-03-09 14:02:37,363] INFO Kafka commitId : fa14705e51bd2ce5 (org.apache.kafka.common.utils.AppInfoParser)
[2021-03-09 14:02:37,366] INFO Cluster ID: YdnuOPmjQ4KavUKJgY4aZw (org.apache.kafka.clients.Metadata)
[2021-03-09 14:02:37,375] WARN Unexpected error code: 87. (org.apache.kafka.common.protocol.Errors)
[2021-03-09 14:02:37,375] WARN sent failure, restart producer, exception: (class com.linkedin.datastream.kafka.KafkaTransportProvider:kafkaMirroringConnector:0)
org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
[2021-03-09 14:02:37,376] INFO [Producer clientId=datastream-producer] Closing the Kafka producer with timeoutMillis = 2000 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-03-09 14:02:37,376] WARN [Producer clientId=datastream-producer] Overriding close timeout 2000 ms to 0 ms in order to prevent useless blocking due to self-join. This means you have incorrectly invoked close with a non-zero timeout from the producer call-back. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-03-09 14:02:37,376] INFO [Producer clientId=datastream-producer] Proceeding to force close the producer since pending requests could not be completed within timeout 2000 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-03-09 14:02:37,376] ERROR Sending a message with source checkpoint testcompact/0/1 to topic testcompact partition 0 for datastream task second-mirroring-stream_f06ad4ca-1620-4035-9186-006f3abd18d1(kafkaMirroringConnector), partitionsV2=, partitions=[0], dependencies=[] threw an exception. (KafkaTransportProvider)
com.linkedin.datastream.common.DatastreamRuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
at com.linkedin.datastream.kafka.KafkaProducerWrapper.generateSendFailure(KafkaProducerWrapper.java:251)
at com.linkedin.datastream.kafka.KafkaProducerWrapper.lambda$null$0(KafkaProducerWrapper.java:198)
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
[2021-03-09 14:02:37,377] WARN Detect exception being thrown from callback for src partition: testcompact-0 while sending, metadata: Checkpoint: testcompact/0/1, Topic: testcompact, Partition: 0 , exception: (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)
com.linkedin.datastream.server.api.transport.SendFailedException: com.linkedin.datastream.common.DatastreamRuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
at com.linkedin.datastream.server.EventProducer.onSendCallback(EventProducer.java:293)
at com.linkedin.datastream.server.EventProducer.lambda$send$0(EventProducer.java:194)
at com.linkedin.datastream.kafka.KafkaTransportProvider.doOnSendCallback(KafkaTransportProvider.java:189)
at com.linkedin.datastream.kafka.KafkaTransportProvider.lambda$send$0(KafkaTransportProvider.java:155)
at com.linkedin.datastream.kafka.KafkaProducerWrapper.lambda$null$0(KafkaProducerWrapper.java:198)
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.linkedin.datastream.common.DatastreamRuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
at com.linkedin.datastream.kafka.KafkaProducerWrapper.generateSendFailure(KafkaProducerWrapper.java:251)
... 16 more
Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
[2021-03-09 14:02:37,378] ERROR updateErrorRate with 1. Look for error logs right before this message to see what happened (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)
[2021-03-09 14:02:37,378] INFO Trying to seek to previous checkpoint for partitions: [testcompact-0] (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)
[2021-03-09 14:02:37,378] ERROR Partition rewind failed due to (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2199)
at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1701)
at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1676)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.lambda$seekToLastCheckpoint$4(AbstractKafkaBasedConnectorTask.java:605)
at java.util.Collections$SingletonSet.forEach(Collections.java:4769)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.seekToLastCheckpoint(AbstractKafkaBasedConnectorTask.java:604)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.rewindAndPausePartitionOnException(AbstractKafkaBasedConnectorTask.java:251)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.lambda$sendDatastreamProducerRecord$1(AbstractKafkaBasedConnectorTask.java:283)
at com.linkedin.datastream.server.EventProducer.onSendCallback(EventProducer.java:303)
at com.linkedin.datastream.server.EventProducer.lambda$send$0(EventProducer.java:194)
at com.linkedin.datastream.kafka.KafkaTransportProvider.doOnSendCallback(KafkaTransportProvider.java:189)
at com.linkedin.datastream.kafka.KafkaTransportProvider.lambda$send$0(KafkaTransportProvider.java:155)
at com.linkedin.datastream.kafka.KafkaProducerWrapper.lambda$null$0(KafkaProducerWrapper.java:198)
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
[2021-03-09 14:02:41,953] INFO Trying to flush the producer and commit offsets. (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)
Error streaming topic with cleanup policy: compact,delete
I'm trying to stream a topic with the above cleanup policy and I get this weird exception
Your environment
Steps to reproduce
Actual behavior
Nothing is streamed and I get this exceptions in the log (full log is attached):
brooklin.log
The text was updated successfully, but these errors were encountered: