Skip to content

Commit

Permalink
[Issue:53] Fix concurrent map write (apache#54)
Browse files Browse the repository at this point in the history
* [Issue:53]Fix concurrent map write

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
  • Loading branch information
wolfstudy authored Aug 14, 2019
1 parent 448387d commit 8774688
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 30 deletions.
47 changes: 19 additions & 28 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,15 @@

# How to contribute

If you would like to contribute code to this project you can do so through GitHub by forking the repository and sending a pull request.
If you would like to contribute code to this project, fork the repository and send a pull request.

This document outlines some of the conventions on development workflow, commit message formatting, contact points and other resources to make it easier to get your contribution accepted.
## Prerequisite

## Steps to Contribute
If you have not installed Go, install it according to the [installation instruction](http://golang.org/doc/install).

Since the `go mod` package management tool is used in this project, your go version is required at **Go1.11+**.
Since the `go mod` package management tool is used in this project, **Go 1.11 or higher** version is required.

### Fork

Before you start contributing, you need to fork [pulsar-client-go](https://github.com/apache/pulsar) to your github repository.

### Installation

If you don't currently have a go environment installed,install Go according to the installation instructions here: http://golang.org/doc/install

##### mac os && linux
### Install Go on Mac OS and Linux

```bash
$ mkdir -p $HOME/github.com/apache/
Expand All @@ -47,19 +39,21 @@ $ cd pulsar-client-go
$ go mod download
```

When you execute `go mod download`, there may be some libs that cannot be downloaded. You can download them by referring to the proxy provided by [GOPROXY.io](https://goproxy.io/).
If some libs cannot be downloaded when you enter the `go mod download` command, download them by referring to the proxy provided by [GOPROXY.io](https://goproxy.io/).

## Fork

### Contribution flow
Before contributing, you need to fork [pulsar-client-go](https://github.com/apache/pulsar) to your github repository.

## Contribution flow

```bash
$ git remote add apache git@github.com:apache/pulsar-client-go.git

// sync with remote master
$ git checkout master
$ git fetch apache
$ git rebase apache/master
$ git push origin master

// create PR branch
$ git checkout -b your_branch
# do your work, and then
Expand All @@ -68,19 +62,16 @@ $ git commit -sm "xxx"
$ git push origin your_branch
```

Thanks for your contributions!

#### Code style

The coding style suggested by the Golang community is used in Apache pulsar-client-go. See the [style doc](https://github.com/golang/go/wiki/CodeReviewComments) for details.
## Code style

Please follow this style to make your pull request easy to review, maintain and develop.
The coding style suggested by the Golang community is used in Apache pulsar-client-go. For details, refer to [style doc](https://github.com/golang/go/wiki/CodeReviewComments).
Follow the style, make your pull request easy to review, maintain and develop.

#### Create new file
## Create new files

The project uses the open source protocol of Apache License 2.0. When you need to create a new file when developing new features,
please add it at the beginning of the file. The location of the header file: [header file](.header).
The project uses the open source protocol of Apache License 2.0. If you need to create a new file when developing new features,
add the license at the beginning of each file. The location of the header file: [header file](.header).

#### Updating dependencies
## Update dependencies

Apache `pulsar-client-go` uses [Go 1.11 module](https://github.com/golang/go/wiki/Modules) to manage dependencies. To add or update a dependency: use the `go mod edit` command to change the dependency.
Apache `pulsar-client-go` uses [Go 1.11 module](https://github.com/golang/go/wiki/Modules) to manage dependencies. To add or update a dependency, use the `go mod edit` command to change the dependency.
82 changes: 81 additions & 1 deletion pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package pulsar
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/util"
"github.com/stretchr/testify/assert"
"log"
"net/http"
Expand Down Expand Up @@ -361,7 +362,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
topic := "persistent://public/default/testGetPartitions"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions"

makeHTTPCall(t, http.MethodPut, testURL, "5")
makeHTTPCall(t, http.MethodPut, testURL, "64")

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Expand Down Expand Up @@ -635,3 +636,82 @@ func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) {
})
}
}

func TestConsumer_Shared(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topic := "persistent://public/default/testMultiPartitionConsumerShared"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions"

makeHTTPCall(t, http.MethodPut, testURL, "3")

sub := "sub-shared-1"
consumer1, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
Type: Shared,
})
assert.Nil(t, err)
defer consumer1.Close()

consumer2, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
Type: Shared,
})
assert.Nil(t, err)
defer consumer2.Close()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()

// send 10 messages
for i := 0; i < 10; i++ {
if err := producer.Send(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
}
}

msgList := make([]string, 0, 5)
for i := 0; i < 5; i++ {
msg, err := consumer1.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("consumer1 msg id is: %v, value is: %s\n", msg.ID(), string(msg.Payload()))
msgList = append(msgList, string(msg.Payload()))
if err := consumer1.Ack(msg); err != nil {
log.Fatal(err)
}
}

assert.Equal(t, 5, len(msgList))

for i := 0; i < 5; i++ {
msg, err := consumer2.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
if err := consumer2.Ack(msg); err != nil {
log.Fatal(err)
}
fmt.Printf("consumer2 msg id is: %v, value is: %s\n", msg.ID(), string(msg.Payload()))
msgList = append(msgList, string(msg.Payload()))
}

assert.Equal(t, 10, len(msgList))
res := util.RemoveDuplicateElement(msgList)
assert.Equal(t, 10, len(res))
}
1 change: 1 addition & 0 deletions pulsar/impl_partition_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- Consu
if err != nil {
return err
}
receivedSinceFlow = 0
continue
}
break
Expand Down
2 changes: 1 addition & 1 deletion pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,13 @@ func (c *connection) internalSendRequest(req *request) {
func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) {
c.mapMutex.RLock()
request, ok := c.pendingReqs[requestID]
c.mapMutex.RUnlock()
if !ok {
c.log.Warnf("Received unexpected response for request %d of type %s", requestID, response.Type)
return
}

delete(c.pendingReqs, requestID)
c.mapMutex.RUnlock()
request.callback(response)
}

Expand Down

0 comments on commit 8774688

Please sign in to comment.