Skip to content

Commit

Permalink
[CHANGED] Simplified API for listing streams and stream names, added …
Browse files Browse the repository at this point in the history
…option to filter by stream name (#1312)

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio authored Jul 19, 2023
1 parent cf491db commit 03cb7ad
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 174 deletions.
52 changes: 17 additions & 35 deletions jetstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,28 +165,19 @@ js.DeleteStream(ctx, "ORDERS")
```go
// list streams
streams := js.ListStreams(ctx)
var err error
for err == nil {
select {
case s := <-streams.Info():
fmt.Println(s.Config.Name)
case err = <-streams.Err():
}
for s := range streams.Info() {
fmt.Println(s.Config.Name)
}
if err != nil && !errors.Is(err, jetstream.ErrEndOfData) {
if streams.Err() != nil {
fmt.Println("Unexpected error ocurred")
}

// list stream names
names := js.StreamNames(ctx)
for err == nil {
select {
case name := <-names.Name():
fmt.Println(name)
case err = <-names.Err():
}
for name := range names.Name() {
fmt.Println(name)
}
if err != nil && !errors.Is(err, jetstream.ErrEndOfData) {
if names.Err() != nil {
fmt.Println("Unexpected error ocurred")
}
```
Expand All @@ -202,13 +193,13 @@ Using `Stream` interface, it is also possible to:
_ = s.Purge(ctx)

// remove all messages from a stream that are stored on a specific subject
_ = s.Purge(ctx, jetstream.WithSubject("ORDERS.new"))
_ = s.Purge(ctx, jetstream.WithPurgeSubject("ORDERS.new"))

// remove all messages up to specified sequence number
_ = s.Purge(ctx, jetstream.WithSequence(100))
_ = s.Purge(ctx, jetstream.WithPurgeSequence(100))

// remove messages, but keep 10 newest
_ = s.Purge(ctx, jetstream.WithKeep(10))
_ = s.Purge(ctx, jetstream.WithPurgeKeep(10))
```

- Get and messages from stream
Expand Down Expand Up @@ -310,29 +301,20 @@ fmt.Println(cachedInfo.Config.Durable)
```go
// list consumers
consumers := s.ListConsumers(ctx)
var err error
for err != nil {
select {
case s := <-consumers.Info():
fmt.Println(s.Name)
case err = <-consumers.Err():
}
for cons := range consumers.Info() {
fmt.Println(cons.Name)
}
if err != nil && !errors.Is(err, jetstream.ErrEndOfData) {
fmt.Println("Unexpected error occured")
if consumers.Err() != nil {
fmt.Println("Unexpected error ocurred")
}

// list consumer names
names := s.ConsumerNames(ctx)
for err != nil {
select {
case name := <-names.Name():
fmt.Println(name)
case err = <-names.Err():
}
for name := range names.Name() {
fmt.Println(name)
}
if err != nil && !errors.Is(err, jetstream.ErrEndOfData) {
fmt.Println("Unexpected error occured")
if names.Err() != nil {
fmt.Println("Unexpected error ocurred")
}
```

Expand Down
92 changes: 63 additions & 29 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ type (
// DeleteStream removes a stream with given name
DeleteStream(ctx context.Context, stream string) error
// ListStreams returns StreamInfoLister enabling iterating over a channel of stream infos
ListStreams(context.Context) StreamInfoLister
ListStreams(context.Context, ...StreamListOpt) StreamInfoLister
// StreamNames returns a StreamNameLister enabling iterating over a channel of stream names
StreamNames(context.Context) StreamNameLister
StreamNames(context.Context, ...StreamListOpt) StreamNameLister
}

StreamConsumerManager interface {
Expand All @@ -96,6 +96,8 @@ type (
DeleteConsumer(ctx context.Context, stream string, consumer string) error
}

StreamListOpt func(*streamsRequest) error

// AccountInfo contains info about the JetStream usage from the current account.
AccountInfo struct {
Memory uint64 `json:"memory"`
Expand Down Expand Up @@ -158,12 +160,12 @@ type (

StreamInfoLister interface {
Info() <-chan *StreamInfo
Err() <-chan error
Err() error
}

StreamNameLister interface {
Name() <-chan string
Err() <-chan error
Err() error
}

apiPagedRequest struct {
Expand All @@ -176,7 +178,7 @@ type (

streams chan *StreamInfo
names chan string
errs chan error
err error
}

streamListResponse struct {
Expand All @@ -192,6 +194,7 @@ type (
}

streamsRequest struct {
apiPagedRequest
Subject string `json:"subject,omitempty"`
}
)
Expand Down Expand Up @@ -565,33 +568,43 @@ func (js *jetStream) AccountInfo(ctx context.Context) (*AccountInfo, error) {
}

// ListStreams returns StreamInfoLister enabling iterating over a channel of stream infos
func (js *jetStream) ListStreams(ctx context.Context) StreamInfoLister {
//
// Available options:
// [WithStreamListSubject] - allows filtering returned streams by provided subject
func (js *jetStream) ListStreams(ctx context.Context, opts ...StreamListOpt) StreamInfoLister {
l := &streamLister{
js: js,
streams: make(chan *StreamInfo),
errs: make(chan error, 1),
}
var streamsReq streamsRequest
for _, opt := range opts {
if err := opt(&streamsReq); err != nil {
l.err = err
close(l.streams)
return l
}
}
go func() {
defer close(l.streams)
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
for {
page, err := l.streamInfos(ctx)
page, err := l.streamInfos(ctx, streamsReq)
if err != nil && !errors.Is(err, ErrEndOfData) {
l.errs <- err
l.err = err
return
}
for _, info := range page {
select {
case l.streams <- info:
case <-ctx.Done():
l.errs <- ctx.Err()
l.err = ctx.Err()
return
}
}
if errors.Is(err, ErrEndOfData) {
l.errs <- err
return
}
}
Expand All @@ -606,38 +619,48 @@ func (s *streamLister) Info() <-chan *StreamInfo {
}

// Err returns an error channel which will be populated with error from [ListStreams] or [StreamNames] request
func (s *streamLister) Err() <-chan error {
return s.errs
func (s *streamLister) Err() error {
return s.err
}

// StreamNames returns a [StreamNameLister] enabling iterating over a channel of stream names
func (js *jetStream) StreamNames(ctx context.Context) StreamNameLister {
//
// Available options:
// [WithStreamListSubject] - allows filtering returned streams by provided subject
func (js *jetStream) StreamNames(ctx context.Context, opts ...StreamListOpt) StreamNameLister {
l := &streamLister{
js: js,
names: make(chan string),
errs: make(chan error, 1),
}
var streamsReq streamsRequest
for _, opt := range opts {
if err := opt(&streamsReq); err != nil {
l.err = err
close(l.streams)
return l
}
}
go func() {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
defer close(l.names)
for {
page, err := l.streamNames(ctx)
page, err := l.streamNames(ctx, streamsReq)
if err != nil && !errors.Is(err, ErrEndOfData) {
l.errs <- err
l.err = err
return
}
for _, info := range page {
select {
case l.names <- info:
case <-ctx.Done():
l.errs <- ctx.Err()
l.err = ctx.Err()
return
}
}
if errors.Is(err, ErrEndOfData) {
l.errs <- err
return
}
}
Expand Down Expand Up @@ -682,21 +705,28 @@ func (s *streamLister) Name() <-chan string {
}

// infos fetches the next [StreamInfo] page
func (s *streamLister) streamInfos(ctx context.Context) ([]*StreamInfo, error) {
func (s *streamLister) streamInfos(ctx context.Context, streamsReq streamsRequest) ([]*StreamInfo, error) {
if s.pageInfo != nil && s.offset >= s.pageInfo.Total {
return nil, ErrEndOfData
}

req, err := json.Marshal(
apiPagedRequest{Offset: s.offset},
)
req := streamsRequest{
apiPagedRequest: apiPagedRequest{
Offset: s.offset,
},
Subject: streamsReq.Subject,
}
reqJSON, err := json.Marshal(req)
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}

slSubj := apiSubj(s.js.apiPrefix, apiStreamListT)
var resp streamListResponse
_, err = s.js.apiRequestJSON(ctx, slSubj, &resp, req)
_, err = s.js.apiRequestJSON(ctx, slSubj, &resp, reqJSON)
if err != nil {
return nil, err
}
Expand All @@ -710,21 +740,25 @@ func (s *streamLister) streamInfos(ctx context.Context) ([]*StreamInfo, error) {
}

// streamNames fetches the next stream names page
func (s *streamLister) streamNames(ctx context.Context) ([]string, error) {
func (s *streamLister) streamNames(ctx context.Context, streamsReq streamsRequest) ([]string, error) {
if s.pageInfo != nil && s.offset >= s.pageInfo.Total {
return nil, ErrEndOfData
}

req, err := json.Marshal(
apiPagedRequest{Offset: s.offset},
)
req := streamsRequest{
apiPagedRequest: apiPagedRequest{
Offset: s.offset,
},
Subject: streamsReq.Subject,
}
reqJSON, err := json.Marshal(req)
if err != nil {
return nil, err
}

slSubj := apiSubj(s.js.apiPrefix, apiStreams)
var resp streamNamesResponse
_, err = s.js.apiRequestJSON(ctx, slSubj, &resp, req)
_, err = s.js.apiRequestJSON(ctx, slSubj, &resp, reqJSON)
if err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions jetstream/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ func WithSubjectFilter(subject string) StreamInfoOpt {
}
}

// WithStreamListSubject can be used to filter results of ListStreams and StreamNames requests
// to only streams that have given subject in their configuration
func WithStreamListSubject(subject string) StreamListOpt {
return func(req *streamsRequest) error {
req.Subject = subject
return nil
}
}

// WithMsgID sets the message ID used for deduplication.
func WithMsgID(id string) PublishOpt {
return func(opts *pubOpts) error {
Expand Down
Loading

0 comments on commit 03cb7ad

Please sign in to comment.