Skip to content

Commit

Permalink
Topic reader implementation (apache#158)
Browse files Browse the repository at this point in the history
* Topic reader implementation

* Fixed formatting

* Fixed function name

* Fixed the HasNext()

* Fixed formatting stuff

* Fixed queue draining on reconnection

* Removed space

* Fixed deadlock on start

* Added reader example
  • Loading branch information
merlimat authored Jan 6, 2020
1 parent e7b779c commit 71d81b6
Show file tree
Hide file tree
Showing 10 changed files with 837 additions and 52 deletions.
54 changes: 54 additions & 0 deletions examples/reader/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package main

import (
"context"
"fmt"
"log"

"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}

defer client.Close()

reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()

for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}

fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}
}
8 changes: 6 additions & 2 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,12 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
}

func (c *client) CreateReader(options ReaderOptions) (Reader, error) {
// TODO: Implement reader
return nil, nil
reader, err := newReader(c, options)
if err != nil {
return nil, err
}
c.handlers.Add(reader)
return reader, nil
}

func (c *client) TopicPartitions(topic string) ([]string, error) {
Expand Down
3 changes: 3 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ type ConsumerOptions struct {
// failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
// shared subscription, will lead to the subscription call throwing a PulsarClientException.
ReadCompacted bool

// Mark the subscription as replicated to keep it in sync across clusters
ReplicateSubscriptionState bool
}

// Consumer is an interface that abstracts behavior of Pulsar's consumer
Expand Down
22 changes: 13 additions & 9 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,19 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
nackRedeliveryDelay = options.NackRedeliveryDelay
}
opts := &partitionConsumerOpts{
topic: pt,
consumerName: consumerName,
subscription: options.SubscriptionName,
subscriptionType: options.Type,
subscriptionInitPos: options.SubscriptionInitialPosition,
partitionIdx: idx,
receiverQueueSize: receiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
metadata: metadata,
topic: pt,
consumerName: consumerName,
subscription: options.SubscriptionName,
subscriptionType: options.Type,
subscriptionInitPos: options.SubscriptionInitialPosition,
partitionIdx: idx,
receiverQueueSize: receiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
metadata: metadata,
replicateSubscriptionState: options.ReplicateSubscriptionState,
startMessageID: nil,
subscriptionMode: durable,
readCompacted: options.ReadCompacted,
}
cons, err := newPartitionConsumer(consumer, client, opts, messageCh)
ch <- ConsumerError{
Expand Down
Loading

0 comments on commit 71d81b6

Please sign in to comment.