Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create topic support at sarama.Client level instead of sarama.Broker level. #1039

Closed
chandradeepak opened this issue Feb 8, 2018 · 5 comments

Comments

@chandradeepak
Copy link
Contributor

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama Version:1.15.0
Kafka Version:0.10.1
Go Version:1.9

Configuration

What configuration values are you using for Sarama and Kafka?

Logs

When filing an issue please provide logs from Sarama and Kafka if at all
possible. You can set sarama.Logger to a log.Logger to capture Sarama debug
output.

Problem Description

How to create topics especially with sarama supporting 1.0 version of kafka. I see some pull requests which are merged has the implementation of CreateTopicsRequest etc.
But these are inside Broker .
It would be good if we we can create topics from sarama.NewClient().CreateTopic(topicConfig)
instead of acessing the lower level broker .

below is the code i am using now .

if i want to make a working example with kafka
so if i have to create a topic is this what i have to do

 ` var connected bool

     t  := &sarama.CreateTopicsRequest{} 
     t.TopicDetails = make(map[string]*sarama.TopicDetail)
     t.TopicDetails["test_topic"] = &sarama.TopicDetail{NumPartitions: 2}

var err error

c, err := sarama.NewClient([]string{"localhost:9092"}, sarama.NewConfig())

if err != nil {
	fmt.Println("new client not created", err)
	return err
}
time.Sleep(time.Second * 5)

for {
	connected, err = c.Brokers()[0].Connected()
	if err != nil {
		fmt.Println("not connected", err)
		return err
	}
	if !connected {
		fmt.Println("not connected trying again")
		err = c.Brokers()[0].Open(nil)
		if err != nil {
			fmt.Println("error opening connection to broker", err)
			return err
		}
	} else {
		break
	}
}

if connected {
	rsp, err := c.Brokers()[0].CreateTopics(t)
	if err != nil {
		fmt.Println(err)
		return err
	}
	fmt.Println(rsp.TopicErrors)

} else {
	fmt.Println("not connected")
}

return nil`

Is that how we create topics ?

@kamilash
Copy link

kamilash commented Apr 4, 2018

@chandradeepak were you able to create a topic using the code you posted here?

@chandradeepak
Copy link
Contributor Author

chandradeepak commented Apr 6, 2018

@kamilash , in the above code , I was just trying out how to create topics. But we have a pull request pending which would allow you to create topics.

see this pull request.
#1055

with that pull request code and kafka 1.0 and below client code i was able to create topics ,

config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
admin, err := sarama.NewClusterAdmin([]string{"localhost:9092"}, config)
if err != nil {
fmt.Println("error is", err)
return
}
detail := sarama.TopicDetail{NumPartitions: 5, ReplicationFactor: 1}
err = admin.CreateTopic("topic1", &detail)
if err != nil {
fmt.Println("error is", err)
}

}

waiting for @eapache @buyology to give their suggestions and comments on the pull request.

@eapache
Copy link
Contributor

eapache commented Jun 14, 2018

Sorry for the delay in getting to all of this, I'm going to actually review #1055 and focus on getting that merged which will address this issue.

@eapache eapache closed this as completed Jun 14, 2018
@ulkas
Copy link

ulkas commented Apr 11, 2019

@kamilash , in the above code , I was just trying out how to create topics. But we have a pull request pending which would allow you to create topics.

see this pull request.
#1055

with that pull request code and kafka 1.0 and below client code i was able to create topics ,

config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
admin, err := sarama.NewClusterAdmin([]string{"localhost:9092"}, config)
if err != nil {
fmt.Println("error is", err)
return
}
detail := sarama.TopicDetail{NumPartitions: 5, ReplicationFactor: 1}
err = admin.CreateTopic("topic1", &detail)
if err != nil {
fmt.Println("error is", err)
}

}

waiting for @eapache @buyology to give their suggestions and comments on the pull request.

@chandradeepak , i used your code sample but got this error:

./kafka.go:155:17: undefined: sarama.NewClusterAdmin

any idea? this is my code block:

	func createTopic(c *Config, topic string) {
		config := sarama.NewConfig()
		config.Version = sarama.V1_0_0_0
		admin, err := sarama.NewClusterAdmin(c.Kafka.Brokers, config) //c.Kafka.Brokers is of type: []string
		if err != nil {
			fmt.Println("error is", err)
			return
		}
		detail := sarama.TopicDetail{NumPartitions: 30, ReplicationFactor: 1, ConfigEntries: map[string]*string{
			"retention.ms": "259200000", //3 days
		}}
		err = admin.CreateTopic(topic, &detail)
		if err != nil {
			fmt.Println("error is", err)
		}
	}

@chandradeepak
Copy link
Contributor Author

@kamilash ,
I think you might be using old sarama in your vendor or in your go path. That is the reason it is saying that. Please update to latest sarama.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants