diff --git a/src/transform-sdk/go/transform/abi.go b/src/transform-sdk/go/transform/abi.go index 13503aa955e4c..9e0c5aaf5c039 100644 --- a/src/transform-sdk/go/transform/abi.go +++ b/src/transform-sdk/go/transform/abi.go @@ -25,7 +25,7 @@ import ( // An imported function to ensure that the broker supports this ABI version. // -//go:wasmimport redpanda_transform check_abi_version_1 +//go:wasmimport redpanda_transform check_abi_version_2 func checkAbiVersion() // readRecordHeader reads all the data from the batch header into memory. @@ -93,3 +93,27 @@ func readNextRecord( // //go:wasmimport redpanda_transform write_record func writeRecord(data unsafe.Pointer, length int32) int32 + +// writeRecordWithOptions writes a new record by copying the data buffer. +// +// The data buffer here is the same format as `writeRecord`, but additionally it is +// possible to pass options for the write. +// +// At the time of writing the options that are supported are only setting a different +// output topic. The format for this options object is a series of keys with key specific +// data. +// +// Supported Options: +// +// - key=0x01 - Set output topic. +// The value is as follows: +// topicNameLength: varint +// topicName: byte[] +// +//go:wasmimport redpanda_transform write_record_with_options +func writeRecordWithOptions( + data unsafe.Pointer, + dataLength int32, + opts unsafe.Pointer, + optsLength int32, +) int32 diff --git a/src/transform-sdk/go/transform/doc.go b/src/transform-sdk/go/transform/doc.go index e8df7ace44d28..e29293482397b 100644 --- a/src/transform-sdk/go/transform/doc.go +++ b/src/transform-sdk/go/transform/doc.go @@ -13,9 +13,10 @@ // limitations under the License. /* -Package redpanda is the SDK for Redpanda's inline Data Transforms, based on WebAssembly. +Package transform is the SDK for Redpanda's inline Data Transforms, based on WebAssembly. This library provides a framework for transforming records written within Redpanda from -an input to an output topic. +an input to an output topic. This version of the SDK is compatible with Redpanda 24.1 or +greater. */ package transform diff --git a/src/transform-sdk/go/transform/example_validation_test.go b/src/transform-sdk/go/transform/example_validation_test.go new file mode 100644 index 0000000000000..16ed839c20af1 --- /dev/null +++ b/src/transform-sdk/go/transform/example_validation_test.go @@ -0,0 +1,40 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transform_test + +import ( + "encoding/json" + + "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" +) + +// This example shows the basic usage of the package: +// This is a transform that validates the data is valid JSON, +// and outputs invalid JSON to a dead letter queue. +func Example_validation() { + transform.OnRecordWritten(jsonValidate) +} + +// This will be called for each record in the source topic. +func jsonValidate(e transform.WriteEvent, w transform.RecordWriter) error { + if json.Valid(e.Record().Value) { + // Write the valid records to the "default" output topic, this is the + // first output topic specified in the configuration. + return w.Write(e.Record()) + } + // If a record does not contain valid JSON then route it to another topic for + // triage and debugging. + return w.Write(e.Record(), transform.ToTopic("invalid_json")) +} diff --git a/src/transform-sdk/go/transform/processor.go b/src/transform-sdk/go/transform/processor.go index 4c7ce5ec6a61c..f55f2c4b5de86 100644 --- a/src/transform-sdk/go/transform/processor.go +++ b/src/transform-sdk/go/transform/processor.go @@ -46,16 +46,40 @@ func (e *writeEvent) Record() Record { type recordWriter struct { outbuf *rwbuf.RWBuf + optbuf *rwbuf.RWBuf } -func (w *recordWriter) Write(r Record) error { +func (w *recordWriter) Write(r Record, opts ...WriteOpt) error { + // Serialize the record w.outbuf.Reset() r.serializePayload(w.outbuf) b := w.outbuf.ReadAll() - // Write the record back out to the broker - amt := int(writeRecord(unsafe.Pointer(&b[0]), int32(len(b)))) - if amt != len(b) { - return errors.New("writing record failed with errno: " + strconv.Itoa(amt)) + + // Apply write options + wo := writeOpts{} + for _, opt := range opts { + opt.apply(&wo) + } + + // Do the write + var amt int32 + if wo.topic == "" { + // Directly write the record to the default output topic. + amt = writeRecord(unsafe.Pointer(&b[0]), int32(len(b))) + } else { + // Serialize the options + w.optbuf.Reset() + wo.serialize(w.optbuf) + o := w.optbuf.ReadAll() + amt = writeRecordWithOptions( + unsafe.Pointer(&b[0]), + int32(len(b)), + unsafe.Pointer(&o[0]), + int32(len(o)), + ) + } + if int(amt) != len(b) { + return errors.New("writing record failed with errno: " + strconv.Itoa(int(amt))) } return nil } @@ -65,7 +89,7 @@ var ( currentHeader batchHeader = batchHeader{} inbuf *rwbuf.RWBuf = rwbuf.New(128) e writeEvent - w recordWriter = recordWriter{rwbuf.New(128)} + w recordWriter = recordWriter{rwbuf.New(128), rwbuf.New(32)} ) // run our transformation loop diff --git a/src/transform-sdk/go/transform/sdk.go b/src/transform-sdk/go/transform/sdk.go index 2ea83d4d75a12..8b3b9499c8ad7 100644 --- a/src/transform-sdk/go/transform/sdk.go +++ b/src/transform-sdk/go/transform/sdk.go @@ -35,6 +35,29 @@ type WriteEvent interface { Record() Record } +type ( + // writeOps is the internal struct carrying the options available for writes. + writeOpts struct { + topic string + } + // WriteOpt is an option to modify a Write. + WriteOpt interface{ apply(*writeOpts) } + // writeOptFunc is a closure implemenation of WriteOpt. + writeOptFunc func(*writeOpts) +) + +// Implment WriteOpt for writeOptFunc +func (f writeOptFunc) apply(opts *writeOpts) { + f(opts) +} + +// ToTopic specifies the output topic that the record will be written to. +func ToTopic(topic string) WriteOpt { + return writeOptFunc(func(o *writeOpts) { + o.topic = topic + }) +} + // RecordWriter is an interface for writing transformed records to the destination topic. type RecordWriter interface { // Write writes a record to the output topic. @@ -42,7 +65,9 @@ type RecordWriter interface { // When writing a record, only the key, value and headers are // used other information like the timestamp will be overridden // by the broker. - Write(Record) error + // + // WriteOpts can be added to control where records go, for example to another topic. + Write(Record, ...WriteOpt) error } // Headers are optional key/value pairs that are passed along with diff --git a/src/transform-sdk/go/transform/serialize.go b/src/transform-sdk/go/transform/serialize.go index 9fe5fef777623..ce8b203830aff 100644 --- a/src/transform-sdk/go/transform/serialize.go +++ b/src/transform-sdk/go/transform/serialize.go @@ -16,6 +16,7 @@ package transform import ( "encoding/binary" + "errors" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform/internal/rwbuf" ) @@ -78,3 +79,21 @@ func (r Record) serializePayload(b *rwbuf.RWBuf) { b.WriteVarint(0) } } + +// Serialize the output topic option into an options buffer. +func (o writeOpts) serialize(b *rwbuf.RWBuf) { + _ = b.WriteByte(0x01) + b.WriteStringWithSize(o.topic) +} + +func (o *writeOpts) deserialize(b *rwbuf.RWBuf) error { + k, err := b.ReadByte() + if err != nil { + return err + } + if k != 0x01 { + return errors.New("unknown options key") + } + o.topic, err = b.ReadSizedStringCopy() + return err +} diff --git a/src/transform-sdk/go/transform/serialize_test.go b/src/transform-sdk/go/transform/serialize_test.go index d4edd2f588d8e..3bc3b31df3b9a 100644 --- a/src/transform-sdk/go/transform/serialize_test.go +++ b/src/transform-sdk/go/transform/serialize_test.go @@ -69,7 +69,7 @@ func makeRandomRecord() Record { } } -func TestRoundTrip(t *testing.T) { +func TestRecordRoundTrip(t *testing.T) { r := makeRandomRecord() b := rwbuf.New(0) r.serializePayload(b) @@ -86,3 +86,17 @@ func TestRoundTrip(t *testing.T) { t.Fatalf("%#v != %#v", r, output) } } + +func TestWriteOptionsRoundTrip(t *testing.T) { + original := writeOpts{topic: "foobar"} + b := rwbuf.New(0) + original.serialize(b) + got := writeOpts{} + err := got.deserialize(b) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(original, got) { + t.Fatalf("%#v != %#v", original, got) + } +} diff --git a/src/transform-sdk/go/transform/stub_abi.go b/src/transform-sdk/go/transform/stub_abi.go index 63d9ad07b5290..3d3cce188d03c 100644 --- a/src/transform-sdk/go/transform/stub_abi.go +++ b/src/transform-sdk/go/transform/stub_abi.go @@ -56,3 +56,7 @@ func readNextRecord( func writeRecord(buf unsafe.Pointer, len int32) int32 { panic("stub") } + +func writeRecordWithOptions(data unsafe.Pointer, dataLength int32, opts unsafe.Pointer, optsLength int32) int32 { + panic("stub") +}