Skip to content

Commit

Permalink
Merge pull request #72 from ditointernet/CDPCEP-631
Browse files Browse the repository at this point in the history
fix: fixing pubsub publisher
  • Loading branch information
YanToledoDito authored Feb 6, 2023
2 parents a574ef8 + 2ae69ca commit 6313d85
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
64 changes: 64 additions & 0 deletions pubsub/examples/publisher_example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package examples

import (
"context"
"encoding/json"
"fmt"
"strconv"

"cloud.google.com/go/pubsub"
godito "github.com/ditointernet/go-dito/pubsub"
)

// MessageSchema represents the message schema that will be published.
type MessageSchema struct {
Attr string
}

// ToBytes marshals itself using it's instance data.
func (ms MessageSchema) ToBytes() ([]byte, error) {
data, err := json.Marshal(ms)
if err != nil {
return nil, err
}

return data, nil
}

// Publisher_pipeline_example shows the operation of a recently instantiated Generics compatible PubsubClient publisher.
// It accepts any message schema, requiring only that it's type implements ToByteser interface. The message schema,
// along with a wrapped Pubsub Topic (created with a NewTopicWrapper), must be passed to the PubsubClient builder
// (MustNewPubSubClient).
func Publisher_pipeline_example() {
PROJECT_ID := "dito-it-tracking-dev"
TOPIC_ID := "publisher_test"

ctx := context.Background()

client, err := pubsub.NewClient(ctx, PROJECT_ID)
if err != nil {
fmt.Println(err)
return
}
defer client.Close()

topic := client.Topic(TOPIC_ID)

publisher := godito.MustNewPubSubClient[MessageSchema](godito.NewTopicWrapper(topic))

var inputList []godito.PublishInput[MessageSchema]
for i := 0; i < 2; i++ {
in := godito.PublishInput[MessageSchema]{
Data: MessageSchema{
Attr: fmt.Sprintf("fake-publish-data-%s", strconv.Itoa(i)),
},
Attributes: map[string]string{
"test": "test",
},
}

inputList = append(inputList, in)
}

publisher.Publish(ctx, inputList...)
}
7 changes: 6 additions & 1 deletion pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type PublishInput[T ToByteser] struct {
// Publish publishes messages in a pubsub topic.
func (c PubSubClient[T]) Publish(ctx context.Context, in ...PublishInput[T]) []error {
var errs []error
var results []Getter

traceID := getTraceID(trace.SpanFromContext(ctx))

Expand All @@ -65,7 +66,11 @@ func (c PubSubClient[T]) Publish(ctx context.Context, in ...PublishInput[T]) []e
}

result := c.topic.Publish(ctx, pubSubMsg)
_, err = result.Get(ctx)
results = append(results, result)
}

for _, result := range results {
_, err := result.Get(ctx)

if err != nil {
errs = append(errs, err)
Expand Down

0 comments on commit 6313d85

Please sign in to comment.