diff --git a/server/jetstream_benchmark_test.go b/server/jetstream_benchmark_test.go index 94ded0ef93..5d6ec571d1 100644 --- a/server/jetstream_benchmark_test.go +++ b/server/jetstream_benchmark_test.go @@ -29,11 +29,12 @@ import ( func BenchmarkJetStreamConsume(b *testing.B) { const ( - verbose = false - streamName = "S" - subject = "s" - seed = 12345 - publishTimeout = 30 * time.Second + verbose = false + streamName = "S" + subject = "s" + seed = 12345 + publishTimeout = 30 * time.Second + PublishBatchSize = 10000 ) runSyncPushConsumer := func(b *testing.B, js nats.JetStreamContext, streamName, subject string) (int, int, int) { @@ -81,7 +82,6 @@ func BenchmarkJetStreamConsume(b *testing.B) { uniqueConsumed++ bitset.set(index, true) - b.SetBytes(int64(len(msg.Data))) if verbose && uniqueConsumed%1000 == 0 { b.Logf("Consumed: %d/%d", bitset.count(), b.N) @@ -127,7 +127,6 @@ func BenchmarkJetStreamConsume(b *testing.B) { uniqueConsumed++ bitset.set(index, true) - b.SetBytes(int64(len(msg.Data))) if uniqueConsumed == b.N { msg.Sub.Unsubscribe() @@ -223,7 +222,6 @@ func BenchmarkJetStreamConsume(b *testing.B) { uniqueConsumed++ bitset.set(index, true) - b.SetBytes(int64(len(msg.Data))) if uniqueConsumed == b.N { msg.Sub.Unsubscribe() @@ -307,20 +305,9 @@ func BenchmarkJetStreamConsume(b *testing.B) { if verbose { b.Logf("Setting up %d nodes", bc.clusterSize) } - var connectURL string - if bc.clusterSize == 1 { - s := RunBasicJetStreamServer(b) - defer s.Shutdown() - connectURL = s.ClientURL() - } else { - cl := createJetStreamClusterExplicit(b, "BENCH_PUB", bc.clusterSize) - defer cl.shutdown() - cl.waitOnClusterReadyWithNumPeers(bc.clusterSize) - cl.waitOnLeader() - connectURL = cl.randomServer().ClientURL() - } - nc, js := jsClientConnectURL(b, connectURL) + cl, _, shutdown, nc, js := startJSClusterAndConnect(b, bc.clusterSize) + defer shutdown() defer nc.Close() if verbose { @@ -335,28 +322,39 @@ func BenchmarkJetStreamConsume(b *testing.B) { b.Fatalf("Error creating stream: %v", err) } + // If replicated resource, connect to stream leader for lower variability + if bc.replicas > 1 { + connectURL := cl.streamLeader("$G", streamName).ClientURL() + nc.Close() + _, js = jsClientConnectURL(b, connectURL) + } + rng := rand.New(rand.NewSource(int64(seed))) message := make([]byte, bc.messageSize) - publishedCount := 0 - for publishedCount < b.N { + + // Publish b.N messages to the stream (in batches) + for i := 1; i <= b.N; i++ { rng.Read(message) _, err := js.PublishAsync(subject, message) if err != nil { - continue - } else { - publishedCount++ + b.Fatalf("Failed to publish: %s", err) } - } - - select { - case <-js.PublishAsyncComplete(): - if verbose { - b.Logf("Published %d messages", b.N) + // Limit outstanding published messages to PublishBatchSize + if i%PublishBatchSize == 0 || i == b.N { + select { + case <-js.PublishAsyncComplete(): + if verbose { + b.Logf("Published %d/%d messages", i, b.N) + } + case <-time.After(publishTimeout): + b.Fatalf("Publish timed out") + } } - case <-time.After(publishTimeout): - b.Fatalf("Publish timed out") } + // Set size of each operation, for throughput calculation + b.SetBytes(int64(bc.messageSize)) + // Discard time spent during setup // Consumer may reset again further in b.ResetTimer() @@ -407,8 +405,9 @@ func BenchmarkJetStreamConsume(b *testing.B) { func BenchmarkJetStreamPublish(b *testing.B) { const ( - verbose = false - seed = 12345 + verbose = false + seed = 12345 + streamName = "S" ) runSyncPublisher := func(b *testing.B, js nats.JetStreamContext, messageSize int, subjects []string) (int, int) { @@ -426,7 +425,6 @@ func BenchmarkJetStreamPublish(b *testing.B) { errors++ } else { published++ - b.SetBytes(int64(messageSize)) } if verbose && i%1000 == 0 { @@ -443,61 +441,54 @@ func BenchmarkJetStreamPublish(b *testing.B) { const publishCompleteMaxWait = 30 * time.Second rng := rand.New(rand.NewSource(int64(seed))) message := make([]byte, messageSize) - pending := make([]nats.PubAckFuture, 0, asyncWindow) + published, errors := 0, 0 b.ResetTimer() - for i := 1; i <= b.N; i++ { - rng.Read(message) // TODO may skip this? - subject := subjects[rng.Intn(len(subjects))] - pubAckFuture, err := js.PublishAsync(subject, message) - if err != nil { - errors++ - continue + for published < b.N { + + // Normally publish a full batch (of size `asyncWindow`) + publishBatchSize := asyncWindow + // Unless fewer are left to complete the benchmark + if b.N-published < asyncWindow { + publishBatchSize = b.N - published } - pending = append(pending, pubAckFuture) - - // Regularly trim the list of pending - if i%asyncWindow == 0 { - newPending := make([]nats.PubAckFuture, 0, asyncWindow) - for _, pubAckFuture := range pending { - select { - case <-pubAckFuture.Ok(): - published++ - b.SetBytes(int64(messageSize)) - case <-pubAckFuture.Err(): - errors++ - default: - // This pubAck is still pending, keep it - newPending = append(newPending, pubAckFuture) - } + + pending := make([]nats.PubAckFuture, 0, publishBatchSize) + + for i := 0; i < publishBatchSize; i++ { + rng.Read(message) // TODO may skip this? + subject := subjects[rng.Intn(len(subjects))] + pubAckFuture, err := js.PublishAsync(subject, message) + if err != nil { + errors++ + continue } - pending = newPending + pending = append(pending, pubAckFuture) } - if verbose && i%1000 == 0 { - b.Logf("Published %d/%d, %d errors", i, b.N, errors) + // All in this batch published, wait for completed + select { + case <-js.PublishAsyncComplete(): + case <-time.After(publishCompleteMaxWait): + b.Fatalf("Publish timed out") } - } - // All published, wait for completed - select { - case <-js.PublishAsyncComplete(): - case <-time.After(publishCompleteMaxWait): - b.Fatalf("Publish timed out") - } + // Verify one by one if they were published successfully + for _, pubAckFuture := range pending { + select { + case <-pubAckFuture.Ok(): + published++ + case <-pubAckFuture.Err(): + errors++ + default: + b.Fatalf("PubAck is still pending after publish completed") + } + } - // Clear whatever is left pending - for _, pubAckFuture := range pending { - select { - case <-pubAckFuture.Ok(): - published++ - b.SetBytes(int64(messageSize)) - case <-pubAckFuture.Err(): - errors++ - default: - b.Fatalf("PubAck is still pending after publish completed") + if verbose { + b.Logf("Published %d/%d", published, b.N) } } @@ -558,11 +549,6 @@ func BenchmarkJetStreamPublish(b *testing.B) { b.Run( name, func(b *testing.B) { - // Skip short runs, benchmark gets re-executed with a larger N - if b.N < bc.minMessages { - b.ResetTimer() - return - } subjects := make([]string, bc.numSubjects) for i := 0; i < bc.numSubjects; i++ { @@ -576,24 +562,9 @@ func BenchmarkJetStreamPublish(b *testing.B) { if verbose { b.Logf("Setting up %d nodes", bc.clusterSize) } - var connectURL string - - if bc.clusterSize == 1 { - s := RunBasicJetStreamServer(b) - defer s.Shutdown() - connectURL = s.ClientURL() - } else { - cl := createJetStreamClusterExplicit(b, "BENCH_PUB", bc.clusterSize) - defer cl.shutdown() - cl.waitOnClusterReadyWithNumPeers(bc.clusterSize) - cl.waitOnLeader() - connectURL = cl.randomServer().ClientURL() - } - nc, err := nats.Connect(connectURL) - if err != nil { - b.Fatalf("Failed to create client: %v", err) - } + cl, _, shutdown, nc, _ := startJSClusterAndConnect(b, bc.clusterSize) + defer shutdown() defer nc.Close() jsOpts := []nats.JSOpt{ @@ -613,7 +584,7 @@ func BenchmarkJetStreamPublish(b *testing.B) { b.Logf("Creating stream with R=%d and %d input subjects", bc.replicas, bc.numSubjects) } streamConfig := &nats.StreamConfig{ - Name: "S", + Name: streamName, Subjects: subjects, Replicas: bc.replicas, } @@ -621,10 +592,27 @@ func BenchmarkJetStreamPublish(b *testing.B) { b.Fatalf("Error creating stream: %v", err) } + // If replicated resource, connect to stream leader for lower variability + if bc.replicas > 1 { + connectURL := cl.streamLeader("$G", streamName).ClientURL() + nc.Close() + nc, err = nats.Connect(connectURL) + if err != nil { + b.Fatalf("Failed to create client connection to stream leader: %v", err) + } + defer nc.Close() + js, err = nc.JetStream(jsOpts...) + if err != nil { + b.Fatalf("Unexpected error getting JetStream context for stream leader: %v", err) + } + } + if verbose { b.Logf("Running %v publisher with message size: %dB", pc.pType, bc.messageSize) } + b.SetBytes(int64(bc.messageSize)) + // Benchmark starts here b.ResetTimer() @@ -717,48 +705,6 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { }, } - // Helper: Stand up in-process single node or cluster - setupCluster := func(b *testing.B, clusterSize int) (string, func()) { - var connectURL string - var shutdownFunc func() - - if clusterSize == 1 { - s := RunBasicJetStreamServer(b) - shutdownFunc = s.Shutdown - connectURL = s.ClientURL() - } else { - cl := createJetStreamClusterExplicit(b, "BENCH_PUB", clusterSize) - shutdownFunc = cl.shutdown - cl.waitOnClusterReadyWithNumPeers(clusterSize) - cl.waitOnLeader() - connectURL = cl.randomServer().ClientURL() - //connectURL = cl.leader().ClientURL() - } - - return connectURL, shutdownFunc - } - - // Helper: Create the stream - setupStream := func(b *testing.B, connectURL string, streamConfig *nats.StreamConfig) { - // Connect - nc, err := nats.Connect(connectURL) - if err != nil { - b.Fatalf("Failed to create client: %v", err) - } - defer nc.Close() - - jsOpts := []nats.JSOpt{} - - js, err := nc.JetStream(jsOpts...) - if err != nil { - b.Fatalf("Unexpected error getting JetStream context: %v", err) - } - - if _, err := js.AddStream(streamConfig); err != nil { - b.Fatalf("Error creating stream: %v", err) - } - } - // Context shared by publishers routines type PublishersContext = struct { readyWg sync.WaitGroup @@ -844,12 +790,6 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { b.Run( limitDescription, func(b *testing.B) { - // Stop timer during setup - b.StopTimer() - b.ResetTimer() - - // Set per-iteration bytes to calculate throughput (a.k.a. speed) - b.SetBytes(messageSize) // Print benchmark parameters if verbose { @@ -863,8 +803,9 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { } // Setup server or cluster - connectURL, shutdownFunc := setupCluster(b, benchmarkCase.clusterSize) - defer shutdownFunc() + cl, ls, shutdown, nc, js := startJSClusterAndConnect(b, benchmarkCase.clusterSize) + defer shutdown() + defer nc.Close() // Common stream configuration streamConfig := &nats.StreamConfig{ @@ -877,8 +818,11 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { } // Configure stream limit limitConfigFunc(streamConfig) + // Create stream - setupStream(b, connectURL, streamConfig) + if _, err := js.AddStream(streamConfig); err != nil { + b.Fatalf("Error creating stream: %v", err) + } // Set up publishers shared context var pubCtx PublishersContext @@ -889,6 +833,12 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { pubCtx.lock.Lock() pubCtx.messagesLeft = b.N + connectURL := ls.ClientURL() + // If replicated resource, connect to stream leader for lower variability + if benchmarkCase.replicas > 1 { + connectURL = cl.streamLeader("$G", "S").ClientURL() + } + // Spawn publishers routines, each with its own connection and JS context for i := 0; i < numPublishers; i++ { nc, err := nats.Connect(connectURL) @@ -906,8 +856,11 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { // Wait for all publishers to be ready pubCtx.readyWg.Wait() + // Set size of each operation, for throughput calculation + b.SetBytes(messageSize) + // Benchmark starts here - b.StartTimer() + b.ResetTimer() // Unblock the publishers pubCtx.lock.Unlock() @@ -938,30 +891,26 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { func BenchmarkJetStreamKV(b *testing.B) { const ( - verbose = false - kvNamePrefix = "B_" - keyPrefix = "K_" - seed = 12345 - minOps = 1_000 + verbose = false + kvName = "BUCKET" + keyPrefix = "K_" + seed = 12345 ) - runKVGet := func(b *testing.B, kvs []nats.KeyValue, keys []string) int { + runKVGet := func(b *testing.B, kv nats.KeyValue, keys []string) int { rng := rand.New(rand.NewSource(int64(seed))) errors := 0 b.ResetTimer() for i := 1; i <= b.N; i++ { - kv := kvs[rng.Intn(len(kvs))] key := keys[rng.Intn(len(keys))] - kve, err := kv.Get(key) + _, err := kv.Get(key) if err != nil { errors++ continue } - b.SetBytes(int64(len(kve.Value()))) - if verbose && i%1000 == 0 { b.Logf("Completed %d/%d Get ops", i, b.N) } @@ -971,7 +920,7 @@ func BenchmarkJetStreamKV(b *testing.B) { return errors } - runKVPut := func(b *testing.B, kvs []nats.KeyValue, keys []string, valueSize int) int { + runKVPut := func(b *testing.B, kv nats.KeyValue, keys []string, valueSize int) int { rng := rand.New(rand.NewSource(int64(seed))) value := make([]byte, valueSize) errors := 0 @@ -979,7 +928,6 @@ func BenchmarkJetStreamKV(b *testing.B) { b.ResetTimer() for i := 1; i <= b.N; i++ { - kv := kvs[rng.Intn(len(kvs))] key := keys[rng.Intn(len(keys))] rng.Read(value) _, err := kv.Put(key, value) @@ -988,8 +936,6 @@ func BenchmarkJetStreamKV(b *testing.B) { continue } - b.SetBytes(int64(valueSize)) - if verbose && i%1000 == 0 { b.Logf("Completed %d/%d Put ops", i, b.N) } @@ -999,7 +945,7 @@ func BenchmarkJetStreamKV(b *testing.B) { return errors } - runKVUpdate := func(b *testing.B, kvs []nats.KeyValue, keys []string, valueSize int) int { + runKVUpdate := func(b *testing.B, kv nats.KeyValue, keys []string, valueSize int) int { rng := rand.New(rand.NewSource(int64(seed))) value := make([]byte, valueSize) errors := 0 @@ -1007,7 +953,6 @@ func BenchmarkJetStreamKV(b *testing.B) { b.ResetTimer() for i := 1; i <= b.N; i++ { - kv := kvs[rng.Intn(len(kvs))] key := keys[rng.Intn(len(keys))] kve, getErr := kv.Get(key) @@ -1023,8 +968,6 @@ func BenchmarkJetStreamKV(b *testing.B) { continue } - b.SetBytes(int64(valueSize)) - if verbose && i%1000 == 0 { b.Logf("Completed %d/%d Update ops", i, b.N) } @@ -1044,15 +987,14 @@ func BenchmarkJetStreamKV(b *testing.B) { benchmarksCases := []struct { clusterSize int replicas int - numBuckets int numKeys int valueSize int }{ - {1, 1, 1, 100, 100}, // 1 node, 1 bucket with 100 keys, 100B values - {1, 1, 10, 1000, 100}, // 1 node, 10 buckets with 1000 keys, 100B values - {3, 3, 1, 100, 100}, // 3 nodes, 1 bucket with 100 keys, 100B values - {3, 3, 10, 1000, 100}, // 3 nodes, 10 buckets with 1000 keys, 100B values - {3, 3, 10, 1000, 1024}, // 3 nodes, 10 buckets with 1000 keys, 1KB values + {1, 1, 100, 100}, // 1 node with 100 keys, 100B values + {1, 1, 1000, 100}, // 1 node with 1000 keys, 100B values + {3, 3, 100, 100}, // 3 nodes with 100 keys, 100B values + {3, 3, 1000, 100}, // 3 nodes with 1000 keys, 100B values + {3, 3, 1000, 1024}, // 3 nodes with 1000 keys, 1KB values } workloadCases := []WorkloadType{ @@ -1064,10 +1006,9 @@ func BenchmarkJetStreamKV(b *testing.B) { for _, bc := range benchmarksCases { bName := fmt.Sprintf( - "N=%d,R=%d,B=%d,K=%d,ValSz=%db", + "N=%d,R=%d,B=1,K=%d,ValSz=%db", bc.clusterSize, bc.replicas, - bc.numBuckets, bc.numKeys, bc.valueSize, ) @@ -1080,11 +1021,6 @@ func BenchmarkJetStreamKV(b *testing.B) { b.Run( wName, func(b *testing.B) { - // Skip short runs, benchmark gets re-executed with a larger N - if b.N < minOps { - b.ResetTimer() - return - } if verbose { b.Logf("Running %s workload %s with %d messages", wName, bName, b.N) @@ -1093,21 +1029,6 @@ func BenchmarkJetStreamKV(b *testing.B) { if verbose { b.Logf("Setting up %d nodes", bc.clusterSize) } - var connectURL string - if bc.clusterSize == 1 { - s := RunBasicJetStreamServer(b) - defer s.Shutdown() - connectURL = s.ClientURL() - } else { - cl := createJetStreamClusterExplicit(b, "BENCH_KV", bc.clusterSize) - defer cl.shutdown() - cl.waitOnClusterReadyWithNumPeers(bc.clusterSize) - cl.waitOnLeader() - connectURL = cl.randomServer().ClientURL() - } - - nc, js := jsClientConnectURL(b, connectURL) - defer nc.Close() // Pre-generate all keys keys := make([]string, 0, bc.numKeys) @@ -1116,36 +1037,51 @@ func BenchmarkJetStreamKV(b *testing.B) { keys = append(keys, key) } - // Initialize all KVs - kvs := make([]nats.KeyValue, 0, bc.numBuckets) - for i := 1; i <= bc.numBuckets; i++ { - // Create bucket - kvName := fmt.Sprintf("%s%d", kvNamePrefix, i) - if verbose { - b.Logf("Creating KV %s with R=%d", kvName, bc.replicas) - } - kvConfig := &nats.KeyValueConfig{ - Bucket: kvName, - Replicas: bc.replicas, - } - kv, err := js.CreateKeyValue(kvConfig) + // Setup server or cluster + cl, _, shutdown, nc, js := startJSClusterAndConnect(b, bc.clusterSize) + defer shutdown() + defer nc.Close() + + // Create bucket + if verbose { + b.Logf("Creating KV %s with R=%d", kvName, bc.replicas) + } + kvConfig := &nats.KeyValueConfig{ + Bucket: kvName, + Replicas: bc.replicas, + } + kv, err := js.CreateKeyValue(kvConfig) + if err != nil { + b.Fatalf("Error creating KV: %v", err) + } + + // Initialize all keys + rng := rand.New(rand.NewSource(int64(seed))) + value := make([]byte, bc.valueSize) + for _, key := range keys { + rng.Read(value) + _, err := kv.Create(key, value) if err != nil { - b.Fatalf("Error creating KV: %v", err) - } - kvs = append(kvs, kv) - - // Initialize all keys - rng := rand.New(rand.NewSource(int64(seed * i))) - value := make([]byte, bc.valueSize) - for _, key := range keys { - rng.Read(value) - _, err := kv.Create(key, value) - if err != nil { - b.Fatalf("Failed to initialize %s/%s: %v", kvName, key, err) - } + b.Fatalf("Failed to initialize %s/%s: %v", kvName, key, err) } } + // If replicated resource, connect to stream leader for lower variability + if bc.replicas > 1 { + nc.Close() + connectURL := cl.streamLeader("$G", fmt.Sprintf("KV_%s", kvName)).ClientURL() + nc, js = jsClientConnectURL(b, connectURL) + defer nc.Close() + } + + kv, err = js.KeyValue(kv.Bucket()) + if err != nil { + b.Fatalf("Error binding to KV: %v", err) + } + + // Set size of each operation, for throughput calculation + b.SetBytes(int64(bc.valueSize)) + // Discard time spent during setup // May reset again further in b.ResetTimer() @@ -1154,11 +1090,11 @@ func BenchmarkJetStreamKV(b *testing.B) { switch wc { case Get: - errors = runKVGet(b, kvs, keys) + errors = runKVGet(b, kv, keys) case Put: - errors = runKVPut(b, kvs, keys, bc.valueSize) + errors = runKVPut(b, kv, keys, bc.valueSize) case Update: - errors = runKVUpdate(b, kvs, keys, bc.valueSize) + errors = runKVUpdate(b, kv, keys, bc.valueSize) default: b.Fatalf("Unknown workload type: %v", wc) } @@ -1252,7 +1188,6 @@ func BenchmarkJetStreamObjStore(b *testing.B) { minObjSz int maxObjSz int }{ - // TODO remove duplicates and fix comments {nats.MemoryStorage, 100, 1024, 102400}, // mem storage, 100 objects sized (1KB-100KB) {nats.MemoryStorage, 100, 102400, 1048576}, // mem storage, 100 objects sized (100KB-1MB) {nats.MemoryStorage, 1000, 10240, 102400}, // mem storage, 1k objects of various size (10KB - 100KB) @@ -1261,7 +1196,6 @@ func BenchmarkJetStreamObjStore(b *testing.B) { {nats.FileStorage, 100, 102400, 1048576}, // file storage, 100 objects sized (100KB-1MB) {nats.FileStorage, 100, 1048576, 10485760}, // file storage, 100 objects sized (1MB-10MB) {nats.FileStorage, 10, 10485760, 104857600}, // file storage, 10 objects sized (10MB-100MB) - } var ( @@ -1294,23 +1228,10 @@ func BenchmarkJetStreamObjStore(b *testing.B) { if verbose { b.Logf("Setting up %d nodes", replicas) } - var ( - connectURL string - cl *cluster - ) - if clusterSize == 1 { - s := RunBasicJetStreamServer(b) - defer s.Shutdown() - connectURL = s.ClientURL() - } else { - cl = createJetStreamClusterExplicit(b, "BENCH_OBJ_STORE", clusterSize) - defer cl.shutdown() - cl.waitOnClusterReadyWithNumPeers(replicas) - cl.waitOnLeader() - // connect to leader and not replicas - connectURL = cl.leader().ClientURL() - } - nc, js := jsClientConnectURL(b, connectURL) + + // Setup server or cluster + cl, _, shutdown, nc, js := startJSClusterAndConnect(b, clusterSize) + defer shutdown() defer nc.Close() // Initialize object store @@ -1327,10 +1248,10 @@ func BenchmarkJetStreamObjStore(b *testing.B) { b.Fatalf("Error creating ObjectStore: %v", err) } - // if cluster_size > 1, connect to stream leader - if cl != nil { + // If replicated resource, connect to stream leader for lower variability + if clusterSize > 1 { nc.Close() - connectURL = cl.streamLeader("$G", fmt.Sprintf("OBJ_%s", objStoreName)).ClientURL() + connectURL := cl.streamLeader("$G", fmt.Sprintf("OBJ_%s", objStoreName)).ClientURL() nc, js := jsClientConnectURL(b, connectURL) defer nc.Close() objStore, err = js.ObjectStore(objStoreName) @@ -1368,9 +1289,41 @@ func BenchmarkJetStreamObjStore(b *testing.B) { } }, ) - } }, ) } } + +// Helper function to stand up a JS-enabled single server or cluster +func startJSClusterAndConnect(b *testing.B, clusterSize int) (c *cluster, s *Server, shutdown func(), nc *nats.Conn, js nats.JetStreamContext) { + b.Helper() + var err error + + if clusterSize == 1 { + s = RunBasicJetStreamServer(b) + shutdown = func() { + s.Shutdown() + } + } else { + c = createJetStreamClusterExplicit(b, "BENCH_PUB", clusterSize) + c.waitOnClusterReadyWithNumPeers(clusterSize) + c.waitOnLeader() + s = c.leader() + shutdown = func() { + c.shutdown() + } + } + + nc, err = nats.Connect(s.ClientURL()) + if err != nil { + b.Fatalf("failed to connect: %s", err) + } + + js, err = nc.JetStream() + if err != nil { + b.Fatalf("failed to init jetstream: %s", err) + } + + return c, s, shutdown, nc, js +}