Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaContainer transactions: Timeout expired while initializing transactional state in 60000ms. #1816

Open
ernesthill opened this issue Aug 30, 2019 · 31 comments · Fixed by #1984

Comments

@ernesthill
Copy link

I'm trying create a test that uses kafka transactions. If I use a local instance of Kafka instead of KafkaContainer everything works fine, but when I use KafkaContainer it fails and I see the following:
Timeout expired while initializing transactional state in 60000ms.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
Attached are the source for my test and the logfile from the run.
kafka.log
Test.java.txt

@kiview
Copy link
Member

kiview commented Sep 9, 2019

I just had a quick check with our KafkaContainerTest and added a transaction example and I see the same behaviour, but with a slightly different error mesage:

Timeout expired after 60000milliseconds while awaiting InitProducerId
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

It happens on producer.initTransactions().

@kiview kiview changed the title Timeout expired while initializing transactional state in 60000ms. KafkaContainer transactions: Timeout expired while initializing transactional state in 60000ms. Sep 9, 2019
@bsideup
Copy link
Member

bsideup commented Sep 9, 2019

@gAmUssA any idea? :)

@gAmUssA
Copy link

gAmUssA commented Sep 10, 2019

@bsideup @kiview
Gents
Let me take a look

@gAmUssA
Copy link

gAmUssA commented Sep 10, 2019

@ernesthill

you need to configure some of the broker parameters in order transaction state store will be initialized correctly.
Here's correct test

NOTE: it's always good to enable log output from the container to debug this kind of issues.
I use Slf4jLogConsumer from TC.

@Slf4j
public class ProducerTransactionTest {

  public static KafkaContainer kafka = new KafkaContainer("5.2.1")
      .withLogConsumer(new Slf4jLogConsumer(log));

  @BeforeClass
  public static void prep() {
    // see https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-NewConfigurations
    kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1");
    kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
    kafka.start();
  }

  @Test
  public void testIt() {
    Properties props = new Properties();
    props.put(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(TRANSACTIONAL_ID_CONFIG, "prod-0");
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    ProducerRecord<String, String> record = new ProducerRecord<>("something", "A message");
    producer.initTransactions();
    producer.beginTransaction();
    producer.send(record);
    producer.commitTransaction();
  }
}

Let me know if you have any questions.

@bsideup @kiview do you want me to send PR with a test for this issue?

@ernesthill
Copy link
Author

Thanks for the information.

@debop
Copy link

debop commented Sep 23, 2019

Thanks for this information !!!

@seglo
Copy link
Contributor

seglo commented Oct 11, 2019

This should be part of default Kafka test container config, since (AFAIK) it always runs in a single broker configuration.

@zenglian
Copy link

zenglian commented Nov 5, 2019

The default value of transaction.state.log.replication.factor is 3 and transaction.state.log.min.isr is 2. So if broker count in your cluster is less than 3, kafka server fails to (automatically) create the topic __transaction_state, thus client got timedout error.

@zaanwer
Copy link

zaanwer commented Nov 11, 2019

I did have this transaction.state.log.replication.factor set to 1 but still seeing this error in producer.initTransaction()

"Timeout expired while initializing transactional state in 60000ms"

@zaanwer
Copy link

zaanwer commented Nov 11, 2019

It went to following code, got the transactionManager.initializeTransactions into result but timing out in last line result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
public void initTransactions() {
throwIfNoTransactionManager();
throwIfProducerClosed();
TransactionalRequestResult result = transactionManager.initializeTransactions();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
}

@zenglian
Copy link

I did have this transaction.state.log.replication.factor set to 1 but still seeing this error in producer.initTransaction()

"Timeout expired while initializing transactional state in 60000ms"

Note there are 2 props to change. Do not try to debug the client code. It’s an server error.

@zaanwer
Copy link

zaanwer commented Nov 13, 2019

I have both properties set and I have numBroker=1.

properties.put("transaction.state.log.replication.factor", Short.valueOf("1"));
properties.put("transaction.state.log.min.isr", 1);

@ivanprostran
Copy link

I have 10 nodes cluster with the following configuration:

Kafka Stream log (Exactly once enabled, static group membership enabled):

transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

`

2020-01-30 11:23:00 [Processor-StreamThread-1] ERROR o.a.k.s.p.internals.StreamTask - stream-thread [Processor-StreamThread-1] task [0_0] Timeout exception caught when initializing transactions for task 0_0. This might happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter max.block.ms to increase this timeout.
..
..
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 240000milliseconds while awaiting InitProducerId

org.apache.kafka.streams.errors.StreamsException: stream-thread [Processor-StreamThread-1] Failed to rebalance.
..
..
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 240000milliseconds while awaiting InitProducerId
`

This is a big problem, I don't know what to do.
Only solution to this is to restart the cluster.

This happens occasionally and kafka stream application (client) could not be started (i.e. transition to RUNNING state) before the brokers are restarted manually.
(I tried to restart client application several times but but the problem was not solved before broker restart)

Additional info:

Kafka client/broker: 2.4.0
Nodes are up&running (alive)

[zk: localhost:2181(CONNECTED) 0] ls /kafka_cluster/brokers/ids
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

I appreciate any help!

Thank you

@bsideup
Copy link
Member

bsideup commented Jan 30, 2020

@ivanprostran this sounds like a problem with Kafka / your app and does not seem to be related to Testcontainers' Kafka module.

@ivanprostran
Copy link

Thank you for the info.

I saw the same error and I am desperate.

@ivanprostran
Copy link

I will post it to different group (sorry for the inconvenience)

@pancudaniel7
Copy link

pancudaniel7 commented May 1, 2020

@gAmUssA I've tried your setup regarding:
KafkaContainer, to add all necessary env variables for transactional mode but I have the same Timeout expired while initializing transactional state in 60000ms.
problem.

I've made a debug on the spring library code and on my side it gets stuck on this.producerFactory.createProducer(); line 275 on KafkaTemplate

@gAmUssA
Copy link

gAmUssA commented Jun 8, 2020

@pancudaniel7 could you share a reproducer?
Thank you

@cemkayan
Copy link

cemkayan commented Aug 3, 2020

I am also getting same exception. I am using 'KafkaTemplate' with 'executeInTransaction'

props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-0"); // I am really not clear what this is for. I set it once in the producerConfig. Then create KafkaTemplate and use it many times...

This is how I call Kafka template:

	    kafkaTemplate.setTransactionIdPrefix(message.getGuid()); // I am also not sure about this line. I tried to put a unique value to prevent same message to be created twice.

	    kafkaTemplate.executeInTransaction(kafkaTemplate -> {

		ListenableFuture<SendResult<String, Event>> future = kafkaTemplate.send(topic, message);

		future.addCallback(new ListenableFutureCallback<SendResult<String, Event>>() {

		    @Override
		    public void onSuccess(SendResult<String, Event> result) {
		    }

		    @Override
		    public void onFailure(Throwable ex) {
		    }
		});

		return null;
	    });

This is what I receive

org.apache.kafka.common.errors.TimeoutException: Timeout expired after 10000milliseconds while awaiting InitProducerId

@maver1ck
Copy link

maver1ck commented Sep 29, 2020

Same problem here :(

I'm using Flink job with FlinkKafkaProducer.Semantic.EXACTLY_ONCE.

When using same job without Test Containers everything is working fine.

@bsideup
Copy link
Member

bsideup commented Sep 29, 2020

@maver1ck the transactions are not enabled by default, see @gAmUssA's answer:
#1816 (comment)

@maver1ck
Copy link

maver1ck commented Sep 29, 2020

@bsideup checking this

EDIT: It works. Thanks.

@MrMikeFloyd
Copy link

@ernesthill

you need to configure some of the broker parameters in order transaction state store will be initialized correctly.
Here's correct test

NOTE: it's always good to enable log output from the container to debug this kind of issues.
I use Slf4jLogConsumer from TC.

@Slf4j
public class ProducerTransactionTest {

  public static KafkaContainer kafka = new KafkaContainer("5.2.1")
      .withLogConsumer(new Slf4jLogConsumer(log));

  @BeforeClass
  public static void prep() {
    // see https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-NewConfigurations
    kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1");
    kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
    kafka.start();
  }

  @Test
  public void testIt() {
    Properties props = new Properties();
    props.put(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(TRANSACTIONAL_ID_CONFIG, "prod-0");
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    ProducerRecord<String, String> record = new ProducerRecord<>("something", "A message");
    producer.initTransactions();
    producer.beginTransaction();
    producer.send(record);
    producer.commitTransaction();
  }
}

Let me know if you have any questions.

@bsideup @kiview do you want me to send PR with a test for this issue?

This did the trick for me 👆❤️

In case anyone has the same issue using EmbeddedKafka in Spring Boot test, the values can be set when defining the config for the embedded Kafka instance as follows:

@EmbeddedKafka(topics = "kafka-test", ports = 9099, brokerProperties = {
    "transaction.state.log.replication.factor=1",
    "transaction.state.log.min.isr=1"
})
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
    properties = {
        "spring.kafka.bootstrap-servers=localhost:9099"
    })
class KafkaEmbeddedIT {
...
}

@kcotzen
Copy link

kcotzen commented Nov 26, 2020

Hi everyone, Unfortunately I have the same error:

2020-11-26 11:27:08.246  INFO [poc-test,,,] 506391 --- [ producer-tx-3] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-tx-3, transactionalId=tx-3] Cluster ID: -nEH5zcySOSTk7pnaSsZOg
2020-11-26 11:28:08.238 ERROR [poc-test,,,] 506391 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Failed to obtain partition information

org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId 

In the console output I see this:

2020-11-26 11:27:08.218  INFO [poc-test,,,] 506391 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = -1
	batch.size = 16384
	bootstrap.servers = [PLAINTEXT://localhost:33193]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = producer-tx-3
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = true
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 1
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = tx-3
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

My configuration of KafkaContainer :

        lateinit var kafka: KafkaContainer

        init {
            configureKafka()
        }

        private fun configureKafka() {
            kafka = KafkaContainer("5.3.2-1")
            kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "2")
            kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
            kafka.start()
        }

And my configuration in the application.yml looks like this:

spring:
  cloud:
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: tx-
          producer-properties:
            retries: 1
            acks: all

I'm not getting it to start up successfully, has anyone been able to overcome this?
thanks in advance

@maver1ck
Copy link

kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "2")
Why 2?

@kcotzen
Copy link

kcotzen commented Nov 26, 2020

kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "2")
Why 2?

If I use :

            kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "3")
            kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "2")

The error is the same. If I use 1 for both, I got this:

2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

@bsideup
Copy link
Member

bsideup commented Nov 26, 2020

@kcotzen it should be 1, unless you start & connect two brokers

@kcotzen
Copy link

kcotzen commented Nov 26, 2020

I used value 1 for both, and I got this error:

2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

@kcotzen
Copy link

kcotzen commented Nov 26, 2020

I made some modifications(I removed use of chainedTransactionManager), and it works Zero errors in console output, but when I use chainedTransactionManager :

2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

sounds weird. Any ideas please?

@kcotzen
Copy link

kcotzen commented Nov 26, 2020

well, for some reason the problem was caused by the chainedTransactionManager definition, I think the console output is very weird and led me to confusion.
Thanks anyway.

@daoanhvu
Copy link

I got the same issue while trying to apply transaction producers.
I fixed it by overriding these two settings at server side:
transaction.state.log.min.isr = 1
transaction.state.log.replication.factor = 1

if you are using Kafka container with docker then most likely you will go with:
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.