Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alibabacloud recommends against using Sarama? #2036

Closed
KeenCN opened this issue Sep 22, 2021 · 10 comments
Closed

alibabacloud recommends against using Sarama? #2036

KeenCN opened this issue Sep 22, 2021 · 10 comments
Labels
stale Issues and pull requests without any recent activity

Comments

@KeenCN
Copy link

KeenCN commented Sep 22, 2021

I saw from alibabacloud that Sarama library is not recommended by them. Do we have a repair plan for these three problems?

source:
https://www.alibabacloud.com/help/faq-detail/266782.htm

Quote:

The following known problems exist with the Go client that is developed with the Sarama library:

  1. When a partition is added to a topic, the Go client developed with the Sarama library cannot detect the new partition or consume messages from the partition. Only after the client is restarted, it can consume messages from the new partition.

  2. When the client subscribes to more than two topics at the same time, the client may fail to consume messages from some partitions.

  3. If the policy for resetting consumer offsets of the Go client developed with the Sarama library is set to Oldest(earliest), the client may start to consume messages from the earliest offset when the client breaks down or the broker version is upgraded. This is because the out_of_range class is implemented in the client.

@KeenCN KeenCN changed the title https://www.alibabacloud.com/help/faq-detail/266782.htm I saw from alibabacloud that Sarama library is not recommended. Do we have a repair plan for these three problems? Sep 22, 2021
@dnwe dnwe changed the title I saw from alibabacloud that Sarama library is not recommended. Do we have a repair plan for these three problems? alibabacloud recommends against using Sarama? Sep 22, 2021
@dnwe
Copy link
Collaborator

dnwe commented Sep 22, 2021

@KeenCN this is the first time I have seen this document, and I'd have to search to see if whoever is responsible for it ever raised any issues for these problems, but it seems somewhat out-of-date and it's unclear for the other two what is being described.

RE: 1., I believe that issue was fixed in Version 1.24.1 (2019-10-31) under #1525

RE: 2, I'm not sure what issue they are referring to here, I'm assuming they saw some behaviour where the fairness of message consumption between topics wasn't as balanced as they would like? It's not something I've personally seen before, but we could investigate if someone can put together a functional test or example that reproduces whatever they were seeing.

RE: 3., again I'm not sure exactly what they are describing here. If the broker is restarted and truncates a partition such that the consumer gets an out of range error, then the broker consumer will stop due to the ErrOffsetOutOfRange and if Consumer.Return.Errors is set then it will return that error to the user to decide what to do. Then when they reconnect the offset reset configuration should only be used in the event that their consumer group didn't have a committed offset on the __consumer_offsets topic that is still valid post-truncation.

@onlyice
Copy link

onlyice commented Oct 13, 2021

I can confirm that Sarama can detect new partition(s). But it happened after a metadata refresh, which by default is by a 10 minutes interval. Could be the cause of the misunderstanding of the first issue.

@1998729
Copy link

1998729 commented Nov 1, 2021

You can implement this function yourself, here is a simple example:

type topicList map[string]struct{}

func (t topicList) fetch(client sarama.Client, prefix string) (bool, error) {
	if t == nil {
		return false, errors.New("topic list must not be empty")
	}

	topics, err := client.Topics()
	if err != nil {
		return false, err
	}

	changed := false
	for _, topic := range topics {
		if strings.Contains(topic, prefix) {
			if _, ok := t[topic]; !ok {
				changed = true
			}
			t[topic] = struct{}{}
		}
	}

	return changed, nil
}

func (t *topicList) poll(c sarama.Client, prefix string, interval time.Duration, handler func(), done <-chan struct{}) {
	for {
		select {
		case <-done:
			klog.Infof("poll stopped because consumer group was closed\n")
			return
		case <-time.After(interval):
			klog.Infof("start polling new topic list, interval is %v\n", interval)
			if c.Closed() {
				klog.Infof("poll stopped because client was closed\n")
				return
			}

			changed, err := t.fetch(c, prefix)
			if err != nil {
				klog.Errorf("failed to fetch new topic list! error: %v\n", err)
				continue
			}

			if changed {
				klog.Infof("new topics found, calling handler\n")
				handler()
				return
			}
		}
	}
}

And then poll it

for {
		consumerGroup, err := sarama.NewConsumerGroup(input.config.Servers, input.config.GroupID, input.saramaConf)

		go input.topicList.poll(client, input.config.TopicPrefix, input.config.TopicPollInterval, cancelFn, donePoll)

topics := make([]string, 0, len(input.topicList))
		for t, _ := range input.topicList {
			topics = append(topics, t)
		}
		klog.Infof("initial topic list (len %d) is: %v\n", len(topics), topics)

		err = consumerGroup.Consume(ctx, topics, input.handler)
}

@erenming
Copy link

erenming commented Sep 2, 2022

I can confirm that Sarama can detect new partition(s). But it happened after a metadata refresh, which by default is by a 10 minutes interval. Could be the cause of the misunderstanding of the first issue.

Yes, you're right. And we should notice, in that case(eg. detect new partitions), the session will exit, so users should always retry the Consume method. Here explained detailly

@cheaster062211
Copy link

cheaster062211 commented Mar 30, 2023

我可以确认 Sarama 可以检测到新分区。但它发生在元数据刷新之后,默认情况下间隔为 10 分钟。可能是对第一个问题的误解的原因。

你是对的。我们应该注意到,在那种情况下(例如检测到新分区),会话将退出,因此用户应该始终重试 Consume 方法。这里有详细解释

if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
		return
	} 

When this oldTopicToPartitionNum already changed , and subsequent checks based on the changed number, then cannot trigger the rebalance.
In extreme cases, this oldTopicToPartitionNum may not be the number of sessions that were created, making subsequent checks invalid.
The root cause is no reset timer in backgroundMetadataUpdater function, executing refreshMetadata before loopCheckPartitionNumbers ,so refreshed the partition number

@cheaster062211
Copy link

cheaster062211 commented Mar 30, 2023

@KeenCN this is the first time I have seen this document, and I'd have to search to see if whoever is responsible for it ever raised any issues for these problems, but it seems somewhat out-of-date and it's unclear for the other two what is being described.

RE: 1., I believe that issue was fixed in Version 1.24.1 (2019-10-31) under #1525

RE: 2, I'm not sure what issue they are referring to here, I'm assuming they saw some behaviour where the fairness of message consumption between topics wasn't as balanced as they would like? It's not something I've personally seen before, but we could investigate if someone can put together a functional test or example that reproduces whatever they were seeing.

RE: 3., again I'm not sure exactly what they are describing here. If the broker is restarted and truncates a partition such that the consumer gets an out of range error, then the broker consumer will stop due to the ErrOffsetOutOfRange and if Consumer.Return.Errors is set then it will return that error to the user to decide what to do. Then when they reconnect the offset reset configuration should only be used in the event that their consumer group didn't have a committed offset on the __consumer_offsets topic that is still valid post-truncation.

When this oldTopicToPartitionNum already changed , and subsequent checks based on the changed number, then cannot trigger the rebalance.
In extreme cases, this oldTopicToPartitionNum may not be the number of sessions that were created, making subsequent checks invalid.
The root cause is no reset timer in backgroundMetadataUpdater function, executing refreshMetadata before loopCheckPartitionNumbers ,so refreshed the partition number

@github-actions

This comment was marked as outdated.

@github-actions github-actions bot added the stale Issues and pull requests without any recent activity label Aug 20, 2023
@dnwe
Copy link
Collaborator

dnwe commented Aug 20, 2023

PR #2563 is also relevant here

Copy link

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur.
Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

@github-actions github-actions bot added the stale Issues and pull requests without any recent activity label Nov 19, 2023
@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Jan 5, 2024
@nick887
Copy link

nick887 commented Nov 25, 2024

the 2 happens to me, we use sarama consumer group, 1 groupID and 4 topic

we have 3 pods, each pod have the same config for consumerGroup & topic

When deploying, we often encounter that a certain topic cannot be consumed normally.

the kafka meta shows that this topic doesn't have any consumer, and we think there is no exit from Consume function, the sarama just stuck

another question is that when we deploy service, it takes a long time to wait for the Consume function exit, even if the context is canceled, we doubt that this may be one of the reasons for not consume

is there any help, i will be grateful

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stale Issues and pull requests without any recent activity
Projects
None yet
Development

No branches or pull requests

7 participants