Skip to content

Commit

Permalink
Merge pull request #542 from trigovision/feature/match-replies
Browse files Browse the repository at this point in the history
Match replies to request in `nats sub`
  • Loading branch information
ripienaar authored Aug 15, 2022
2 parents 11f064f + 3451ca0 commit 9a2b381
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 75 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,24 @@ Header2: Two
hello headers
```

### match requests and replies
We can print matching replay-requests together
```
sub --match-replies subject.name
```
Output
```
[#48] Received on "cli.demo" with reply "_INBOX.12345"
[#48] Matched reply on "_INBOX.12345"
```

sub --match-replies --dump subject.name
```
Output
X.json
X_reply.json
```
#### JetStream

When receiving messages from a JetStream Push Consumer messages can be acknowledged when received by passing `--ack`, the
Expand Down
236 changes: 161 additions & 75 deletions cli/sub_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type subCmd struct {
raw bool
jsAck bool
inbox bool
match bool
dump string
limit uint
sseq uint64
Expand All @@ -45,6 +46,7 @@ type subCmd struct {
deliverLastPerSubject bool
headersOnly bool
stream string
jetStream bool
}

func configureSubCommand(app commandHost) {
Expand All @@ -57,6 +59,7 @@ func configureSubCommand(app commandHost) {
act.Flag("durable", "Use a durable consumer (requires JetStream)").StringVar(&c.durable)
act.Flag("raw", "Show the raw data received").Short('r').UnNegatableBoolVar(&c.raw)
act.Flag("ack", "Acknowledge JetStream message that have the correct metadata").BoolVar(&c.jsAck)
act.Flag("match-replies", "Match replies to requests").UnNegatableBoolVar(&c.match)
act.Flag("inbox", "Subscribes to a generate inbox").Short('i').UnNegatableBoolVar(&c.inbox)
act.Flag("count", "Quit after receiving this many messages").UintVar(&c.limit)
act.Flag("dump", "Dump received messages to files, 1 file per message. Specify - for null terminated STDOUT for use with xargs -0").PlaceHolder("DIRECTORY").StringVar(&c.dump)
Expand All @@ -74,7 +77,7 @@ func init() {
registerCommand("sub", 17, configureSubCommand)
}

func (c *subCmd) subscribe(_ *fisk.ParseContext) error {
func (c *subCmd) subscribe(p *fisk.ParseContext) error {
nc, err := newNatsConn("", natsOpts()...)
if err != nil {
return err
Expand All @@ -91,12 +94,12 @@ func (c *subCmd) subscribe(_ *fisk.ParseContext) error {
return fmt.Errorf("generating inboxes is not compatible with dumping to stdout using null terminated strings")
}

jetStream := c.sseq > 0 || len(c.durable) > 0 || c.deliverAll || c.deliverNew || c.deliverLast || c.deliverSince != "" || c.deliverLastPerSubject || c.stream != ""
c.jetStream = c.sseq > 0 || len(c.durable) > 0 || c.deliverAll || c.deliverNew || c.deliverLast || c.deliverSince != "" || c.deliverLastPerSubject || c.stream != ""

if c.inbox && jetStream {
if c.inbox && c.jetStream {
return fmt.Errorf("generating inboxes is not compatible with JetStream subscriptions")
}
if c.queue != "" && jetStream {
if c.queue != "" && c.jetStream {
return fmt.Errorf("queu group subscriptions are not supported with JetStream")
}

Expand All @@ -106,6 +109,9 @@ func (c *subCmd) subscribe(_ *fisk.ParseContext) error {
dump = c.dump != ""
ctr = uint(0)
ctx, cancel = context.WithCancel(ctx)

replySub *nats.Subscription
matchMap map[string]*nats.Msg
)
defer cancel()

Expand Down Expand Up @@ -139,89 +145,56 @@ func (c *subCmd) subscribe(_ *fisk.ParseContext) error {
}

ctr++
if dump {
// Output format 1/3: dumping, to stdout or files

stdout := c.dump == "-"
outFile := ""
if !stdout {
if info == nil {
outFile = filepath.Join(c.dump, fmt.Sprintf("%d.json", ctr))
} else {
outFile = filepath.Join(c.dump, fmt.Sprintf("%d.json", info.StreamSequence()))
}
}

// dont want sub etc
msg := nats.NewMsg(m.Subject)
msg.Header = m.Header
msg.Data = m.Data
msg.Reply = m.Reply

jm, err := json.Marshal(msg)
if err != nil {
log.Printf("Could not JSON encode message: %s", err)
} else if stdout {
os.Stdout.WriteString(fmt.Sprintf("%s\000", jm))
} else {
err = os.WriteFile(outFile, jm, 0600)
if err != nil {
log.Printf("Could not save message: %s", err)
}

if ctr%100 == 0 {
fmt.Print(".")
}
}

} else if c.raw {
// Output format 2/3: raw

fmt.Println(string(m.Data))

if c.match && m.Reply != "" {
matchMap[m.Reply] = m
} else {
// Output format 3/3: pretty
printMsg(c, m, nil, ctr)
}

if info == nil {
if m.Reply != "" {
fmt.Printf("[#%d] Received on %q with reply %q\n", ctr, m.Subject, m.Reply)
} else {
fmt.Printf("[#%d] Received on %q\n", ctr, m.Subject)
}
} else if jetStream {
fmt.Printf("[#%d] Received JetStream message: stream: %s seq %d / subject: %s / time: %v\n", ctr, info.Stream(), info.StreamSequence(), m.Subject, info.TimeStamp().Format(time.RFC3339))
} else {
fmt.Printf("[#%d] Received JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d\n", ctr, info.Stream(), info.Consumer(), m.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence())
if ctr == c.limit {
sub.Unsubscribe()
// if no reply matching, or if didn't yet get all replies
if !c.match || len(matchMap) == 0 {
cancel()
}
}
}

if len(m.Header) > 0 {
for h, vals := range m.Header {
for _, val := range vals {
fmt.Printf("%s: %s\n", h, val)
}
}

fmt.Println()
}
matchHandler := func(reply *nats.Msg) {
mu.Lock()
defer mu.Unlock()

if !c.headersOnly {
fmt.Println(string(m.Data))
if !strings.HasSuffix(string(m.Data), "\n") {
fmt.Println()
}
}
request, ok := matchMap[reply.Subject]
if !ok {
return
}

} // output format type dispatch
printMsg(c, request, reply, ctr)
delete(matchMap, reply.Subject)

if ctr == c.limit {
sub.Unsubscribe()
// if reached limit and matched all requests
if ctr == c.limit && len(matchMap) == 0 {
replySub.Unsubscribe()
cancel()
}
}

if c.match {
inSubj := "_INBOX.>"
if opts.InboxPrefix != "" {
inSubj = fmt.Sprintf("%v.>", opts.InboxPrefix)
}

matchMap = make(map[string]*nats.Msg)
replySub, err = nc.Subscribe(inSubj, matchHandler)
if err != nil {
return err
}
}

if (!c.raw && c.dump == "") || c.inbox {
switch {
case jetStream:
case c.jetStream:
// logs later depending on settings
case c.jsAck:
log.Printf("Subscribing on %s with acknowledgement of JetStream messages", c.subject)
Expand All @@ -231,7 +204,7 @@ func (c *subCmd) subscribe(_ *fisk.ParseContext) error {
}

switch {
case jetStream:
case c.jetStream:
var js nats.JetStream
js, err = nc.JetStream()
if err != nil {
Expand Down Expand Up @@ -309,3 +282,116 @@ func (c *subCmd) subscribe(_ *fisk.ParseContext) error {

return nil
}

func printMsg(c *subCmd, msg *nats.Msg, reply *nats.Msg, ctr uint) {
var info *jsm.MsgInfo
if msg.Reply != "" {
info, _ = jsm.ParseJSMsgMetadata(msg)
}

if c.dump != "" {
// Output format 1/3: dumping, to stdout or files

var (
stdout = c.dump == "-"
requestFile string
replyFile string
)
if !stdout {
if info == nil {
requestFile = filepath.Join(c.dump, fmt.Sprintf("%d.json", ctr))
replyFile = filepath.Join(c.dump, fmt.Sprintf("%d_reply.json", ctr))
} else {
requestFile = filepath.Join(c.dump, fmt.Sprintf("%d.json", info.StreamSequence()))
replyFile = filepath.Join(c.dump, fmt.Sprintf("%d_reply.json", info.StreamSequence()))
}
}

dumpMsg(msg, stdout, requestFile, ctr)
if reply != nil {
dumpMsg(reply, stdout, replyFile, ctr)
}

} else if c.raw {
// Output format 2/3: raw

fmt.Println(string(msg.Data))
if reply != nil {
fmt.Println(string(reply.Data))
}

} else {
// Output format 3/3: pretty

if info == nil {
if msg.Reply != "" {
fmt.Printf("[#%d] Received on %q with reply %q\n", ctr, msg.Subject, msg.Reply)
} else {
fmt.Printf("[#%d] Received on %q\n", ctr, msg.Subject)
}
} else if c.jetStream {
fmt.Printf("[#%d] Received JetStream message: stream: %s seq %d / subject: %s / time: %v\n", ctr, info.Stream(), info.StreamSequence(), msg.Subject, info.TimeStamp().Format(time.RFC3339))
} else {
fmt.Printf("[#%d] Received JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d\n", ctr, info.Stream(), info.Consumer(), msg.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence())
}

prettyPrintMsg(msg, c.headersOnly)

if reply != nil {
if info == nil {
fmt.Printf("[#%d] Matched reply on %q\n", ctr, reply.Subject)
} else if c.jetStream {
fmt.Printf("[#%d] Matched reply JetStream message: stream: %s seq %d / subject: %s / time: %v\n", ctr, info.Stream(), info.StreamSequence(), reply.Subject, info.TimeStamp().Format(time.RFC3339))
} else {
fmt.Printf("[#%d] Matched reply JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d\n", ctr, info.Stream(), info.Consumer(), reply.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence())
}

prettyPrintMsg(reply, c.headersOnly)

}

} // output format type dispatch
}

func dumpMsg(msg *nats.Msg, stdout bool, filepath string, ctr uint) {
// dont want sub etc
serMsg := nats.NewMsg(msg.Subject)
serMsg.Header = msg.Header
serMsg.Data = msg.Data
serMsg.Reply = msg.Reply

jm, err := json.Marshal(serMsg)
if err != nil {
log.Printf("Could not JSON encode message: %s", err)
} else if stdout {
os.Stdout.WriteString(fmt.Sprintf("%s\000", jm))
} else {
err = os.WriteFile(filepath, jm, 0600)
if err != nil {
log.Printf("Could not save message: %s", err)
}

if ctr%100 == 0 {
fmt.Print(".")
}
}
}

func prettyPrintMsg(msg *nats.Msg, headersOnly bool) {
if len(msg.Header) > 0 {
for h, vals := range msg.Header {
for _, val := range vals {
fmt.Printf("%s: %s\n", h, val)
}
}

fmt.Println()
}

if !headersOnly {
fmt.Println(string(msg.Data))
if !strings.HasSuffix(string(msg.Data), "\n") {
fmt.Println()
}
}
}

0 comments on commit 9a2b381

Please sign in to comment.