Skip to content

Commit

Permalink
Merge pull request #15178 from rockwotj/wasm-txn
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj authored Nov 30, 2023
2 parents 6f49d28 + 2b50891 commit 53f12fe
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 23 deletions.
12 changes: 11 additions & 1 deletion src/v/transform/tests/transform_processor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class ProcessorTestFixture : public ::testing::Test {

model::record_batch make_tiny_batch() {
return model::test::make_random_batch(model::test::record_batch_spec{
.offset = kafka::offset_cast(++_offset),
.offset = kafka::offset_cast(_offset++),
.allow_compression = false,
.count = 1});
}
Expand Down Expand Up @@ -194,4 +194,14 @@ TEST_F(ProcessorTestFixture, LagOffByOne) {
EXPECT_EQ(lag(), 0);
}

TEST_F(ProcessorTestFixture, LagOverflowBug) {
stop();
auto batch_one = make_tiny_batch();
push_batch(batch_one.copy());
start();
wait_for_committed_offset(batch_one.last_offset());
EXPECT_EQ(read_batch(), batch_one);
EXPECT_EQ(lag(), 0);
}

} // namespace transform
12 changes: 11 additions & 1 deletion src/v/transform/transform_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,17 @@ ss::future<kafka::offset> processor::load_start_offset() {
}
// The latest record is inclusive of the last record, so we want to start
// reading from the following record.
auto last_processed_offset = latest_committed.value_or(latest);
auto last_processed_offset = latest_committed.value();
if (
latest != kafka::offset::min()
&& last_processed_offset == kafka::offset::min()) {
// In cases where we committed the start of the log without any records,
// then the log has added records, we will overflow computing
// small_offset - min_offset. Instead normalize last processed to -1 so
// that the computed lag is correct (these ranges are inclusive).
// For example: latest(1) - last_processed(-1) = lag(2)
last_processed_offset = kafka::offset(-1);
}
report_lag(latest - last_processed_offset);
co_return kafka::next_offset(last_processed_offset);
}
Expand Down
2 changes: 1 addition & 1 deletion tests/go/transform-verifier/consume/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func createRecordValidator() recordValidator {
// The real thing we can guarantee is that once we get a new seqno, there are no
// gaps we've seen.
if seqno > latestSeqno && latestSeqno+1 != seqno {
return latestSeqno, fmt.Errorf("detected missing seqno: seqno=%d latest=%d", seqno, latestSeqno)
return latestSeqno, fmt.Errorf("detected missing seqno: partition=%d seqno=%d latest=%d", r.Partition, seqno, latestSeqno)
}
latestSeqno = max(seqno, latestSeqno)
return latestSeqno, nil
Expand Down
112 changes: 105 additions & 7 deletions tests/go/transform-verifier/produce/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"fmt"
"log/slog"
"math"
"math/rand"
"os"
"sync"
"time"

Expand All @@ -18,10 +20,11 @@ import (
)

var (
bps = 512 * bytesize.KB
totalBytes = 10 * bytesize.MB
messageSize = 1 * bytesize.KB
maxBatchSize = 1 * bytesize.MB
bps = 512 * bytesize.KB
totalBytes = 10 * bytesize.MB
messageSize = 1 * bytesize.KB
maxBatchSize = 1 * bytesize.MB
transactional = false
)

type ProduceStatus struct {
Expand Down Expand Up @@ -64,6 +67,7 @@ func NewCommand() *cobra.Command {
cmd.Flags().Var(&totalBytes, "max-bytes", "How many bytes to send overall")
cmd.Flags().Var(&messageSize, "message-size", "How many bytes to send per message")
cmd.Flags().Var(&maxBatchSize, "max-batch-size", "How many bytes to send per batch")
cmd.Flags().BoolVar(&transactional, "transactional", false, "Use transactions to produce, and occationally abort transactions")

return cmd
}
Expand All @@ -87,7 +91,9 @@ func createValueGenerator() func(size int) ([]byte, error) {
}
}

func createRecordGenerator(partition int32) func() (*kgo.Record, error) {
type recordGenerator = func() (*kgo.Record, error)

func createRecordGenerator(partition int32) recordGenerator {
seqno := uint64(0)
keyGen := createKeyGenerator()
valGen := createValueGenerator()
Expand Down Expand Up @@ -149,12 +155,18 @@ func produce(ctx context.Context) error {
partition := p // prevent silly golang loop variable bounding issues
wg.Go(func() error {
slog.Info("starting to produce", "partition", partition)
err := produceForPartition(ctx, partitionProduceConfig{
config := partitionProduceConfig{
maxBytes,
partition,
reporter,
rateLimiter,
})
}
var err error
if transactional {
err = produceTransactionallyForPartition(ctx, config)
} else {
err = produceForPartition(ctx, config)
}
slog.Info("finished producing", "partition", partition, "err", err)
return err
})
Expand Down Expand Up @@ -223,3 +235,89 @@ func produceForPartition(ctx context.Context, config partitionProduceConfig) err
wg.Wait()
return nil
}

func createInvalidRecordGenerator(partition int32) recordGenerator {
// Create an empty record with the highest seqno that would cause the checks in the consumer to fail.
return func() (*kgo.Record, error) {
return &kgo.Record{
Key: nil,
Value: nil,
Timestamp: time.Now(),
Partition: partition,
Headers: []kgo.RecordHeader{
common.MakeSeqnoHeader(math.MaxUint64),
},
}, nil
}
}

func produceTransactionallyForPartition(ctx context.Context, config partitionProduceConfig) error {
client, err := common.NewClient(
kgo.RecordPartitioner(kgo.ManualPartitioner()),
kgo.TransactionalID(fmt.Sprintf("%d-%s-txn-producer-%d", os.Getpid(), os.Args[0], config.partition)),
)
if err != nil {
return fmt.Errorf("unable to create client: %v", err)
}
defer client.Close()
generator := createRecordGenerator(config.partition)
invalidGenerator := createInvalidRecordGenerator(config.partition)
bytesSent := 0
produce := func(r *kgo.Record, commit kgo.TransactionEndTry) ProduceStatus {
if err := client.BeginTransaction(); err != nil {
slog.Warn("error starting txn", "err", err)
return ProduceStatus{ErrorCount: 1}
}
err := client.ProduceSync(ctx, r).FirstErr()
if ctx.Err() != nil {
// we're cancelled do nothing
return ProduceStatus{}
} else if err != nil {
slog.Warn("error producing record", "err", err)
return ProduceStatus{ErrorCount: 1}
}
if err := client.Flush(ctx); err != nil {
slog.Warn("error flushing record", "err", err)
return ProduceStatus{ErrorCount: 1}
}
if err := client.EndTransaction(ctx, commit); err != nil {
slog.Warn("error ending txn", "err", err)
return ProduceStatus{ErrorCount: 1}
}
seqnos := make(map[int]uint64)
h, err := common.FindSeqnoHeader(r)
if err != nil {
slog.Warn("error finding seqno header", "err", err)
return ProduceStatus{ErrorCount: 1}
}
seqnos[int(r.Partition)] = h
return ProduceStatus{LatestSeqnos: seqnos}
}

for bytesSent < config.maxBytes && ctx.Err() == nil {
r, err := generator()
if err != nil {
return fmt.Errorf("unable to create record: %v", err)
}
size := common.RecordSize(r)
if err := config.rateLimiter.WaitN(ctx, size); err != nil {
// Only return the error if we were not cancelled
if ctx.Err() == nil {
return fmt.Errorf("unable to rate limit: %v", err)
}
return nil
}
status := produce(r, kgo.TryCommit)
bytesSent += size
status.BytesSent += size
config.reporter(status)

// Every time we commit something successfully, also abort a record that would break the consumer to ensure those records aren't surfaced.
r, err = invalidGenerator()
if err != nil {
return fmt.Errorf("unable to create record: %v", err)
}
produce(r, kgo.TryAbort)
}
return nil
}
30 changes: 22 additions & 8 deletions tests/rptest/services/transform_verifier_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,20 @@ class TransformVerifierProduceConfig(typing.NamedTuple):
# Maximum size to batch records on the client
# i.e. 1MB
max_batch_size: str
transactional: bool = False

def serialize_cmd(self) -> str:
return " ".join([
cmd = [
"produce",
f"--topic={self.topic}",
f"--max-bytes={self.max_bytes}",
f"--bytes-per-second={self.bytes_per_second}",
f"--message-size={self.message_size}",
f"--max-batch-size={self.max_batch_size}",
])
]
if self.transactional:
cmd.append("--transactional")
return " ".join(cmd)

def deserialize_status(self, buf: str | bytes | bytearray):
return TransformVerifierProduceStatus.deserialize(buf)
Expand Down Expand Up @@ -135,6 +139,8 @@ def deserialize_status(self, buf: str | bytes | bytearray):
return TransformVerifierConsumeStatus.deserialize(buf)

def is_done(self, status: TransformVerifierConsumeStatus) -> bool:
if status.invalid_records > 0:
return True
for partition, amt in self.validate.latest_seqnos.items():
if status.latest_seqnos.get(partition, 0) < amt:
return False
Expand Down Expand Up @@ -170,16 +176,19 @@ class TransformVerifierService(Service):
_pid: typing.Optional[int]

@classmethod
def oneshot(cls, context: TestContext, redpanda: RedpandaService,
def oneshot(cls,
context: TestContext,
redpanda: RedpandaService,
config: TransformVerifierConsumeConfig
| TransformVerifierProduceConfig):
| TransformVerifierProduceConfig,
timeout_sec=300):
"""
Common pattern to use the service
"""
service = cls(context, redpanda, config)
service.start()
try:
service.wait()
service.wait(timeout_sec=timeout_sec)
final_status = service.get_status()
service.stop()
service.free()
Expand Down Expand Up @@ -252,18 +261,23 @@ def stop_node(self, node):
return

# Attempt a graceful stop
self._execute_cmd(node, "stop")
try:
self._execute_cmd(node, "stop")
except Exception as e:
self.logger.warn("unable to request /stop {self.who_am_i()}: {e}")

try:
wait_until(lambda: not node.account.exists(f"/proc/{self._pid}"),
timeout_sec=10,
backoff_sec=0.5)
self._pid = None
return
except TimeoutError as e:
self.logger.warn("gracefully stopping service failed: {e}")
self.logger.warn(
"gracefully stopping {self.who_am_i()} failed: {e}")

# Gracefully stop did not work, try a hard kill
self.logger.debug(f"Killing pid {self._pid}")
self.logger.debug(f"Killing pid for {self.who_am_i()}")
try:
node.account.signal(self._pid, signal.SIGKILL, allow_fail=False)
except RemoteCommandError as e:
Expand Down
16 changes: 11 additions & 5 deletions tests/rptest/tests/data_transforms_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import typing

from ducktape.mark import matrix
from rptest.clients.rpk import RpkTool
from rptest.services.cluster import cluster
from ducktape.utils.util import wait_until
Expand Down Expand Up @@ -126,7 +127,8 @@ def test_lifecycle(self):
self._deploy_wasm("identity-xform")
self._delete_wasm("identity-xform")

def _produce_input_topic(self) -> TransformVerifierProduceStatus:
def _produce_input_topic(
self, transactional: bool) -> TransformVerifierProduceStatus:
input_topic = self.topics[0]

status = TransformVerifierService.oneshot(
Expand All @@ -138,6 +140,7 @@ def _produce_input_topic(self) -> TransformVerifierProduceStatus:
max_bytes='1MB',
message_size='1KB',
topic=input_topic.name,
transactional=transactional,
))
return typing.cast(TransformVerifierProduceStatus, status)

Expand All @@ -153,16 +156,19 @@ def _consume_output_topic(
topic=output_topic.name,
bytes_per_second='1MB',
validate=status,
))
),
timeout_sec=10)
return typing.cast(TransformVerifierConsumeStatus, result)

@cluster(num_nodes=4)
def test_identity(self):
@matrix(transactional=[False, True])
def test_identity(self, transactional):
"""
Test that a transform that only copies records from the input to the output topic works as intended.
"""
self._deploy_wasm("identity-xform")
producer_status = self._produce_input_topic()
producer_status = self._produce_input_topic(
transactional=transactional)
consumer_status = self._consume_output_topic(producer_status)
self.logger.info(f"{consumer_status}")
assert consumer_status.invalid_records == 0, "transform verification failed with invalid records: {consumer_status}"
assert consumer_status.invalid_records == 0, f"transform verification failed with invalid records: {consumer_status}"

0 comments on commit 53f12fe

Please sign in to comment.