Skip to content

Commit

Permalink
Merge pull request #16978 from rockwotj/golang-sdk-mot
Browse files Browse the repository at this point in the history
transform-sdk/go: add support for multiple output topics
  • Loading branch information
rockwotj authored Mar 13, 2024
2 parents 5afeaba + 076eec4 commit 9631be1
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 11 deletions.
26 changes: 25 additions & 1 deletion src/transform-sdk/go/transform/abi.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

// An imported function to ensure that the broker supports this ABI version.
//
//go:wasmimport redpanda_transform check_abi_version_1
//go:wasmimport redpanda_transform check_abi_version_2
func checkAbiVersion()

// readRecordHeader reads all the data from the batch header into memory.
Expand Down Expand Up @@ -93,3 +93,27 @@ func readNextRecord(
//
//go:wasmimport redpanda_transform write_record
func writeRecord(data unsafe.Pointer, length int32) int32

// writeRecordWithOptions writes a new record by copying the data buffer.
//
// The data buffer here is the same format as `writeRecord`, but additionally it is
// possible to pass options for the write.
//
// At the time of writing the options that are supported are only setting a different
// output topic. The format for this options object is a series of keys with key specific
// data.
//
// Supported Options:
//
// - key=0x01 - Set output topic.
// The value is as follows:
// topicNameLength: varint
// topicName: byte[]
//
//go:wasmimport redpanda_transform write_record_with_options
func writeRecordWithOptions(
data unsafe.Pointer,
dataLength int32,
opts unsafe.Pointer,
optsLength int32,
) int32
5 changes: 3 additions & 2 deletions src/transform-sdk/go/transform/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
// limitations under the License.

/*
Package redpanda is the SDK for Redpanda's inline Data Transforms, based on WebAssembly.
Package transform is the SDK for Redpanda's inline Data Transforms, based on WebAssembly.
This library provides a framework for transforming records written within Redpanda from
an input to an output topic.
an input to an output topic. This version of the SDK is compatible with Redpanda 24.1 or
greater.
*/
package transform
40 changes: 40 additions & 0 deletions src/transform-sdk/go/transform/example_validation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package transform_test

import (
"encoding/json"

"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

// This example shows the basic usage of the package:
// This is a transform that validates the data is valid JSON,
// and outputs invalid JSON to a dead letter queue.
func Example_validation() {
transform.OnRecordWritten(jsonValidate)
}

// This will be called for each record in the source topic.
func jsonValidate(e transform.WriteEvent, w transform.RecordWriter) error {
if json.Valid(e.Record().Value) {
// Write the valid records to the "default" output topic, this is the
// first output topic specified in the configuration.
return w.Write(e.Record())
}
// If a record does not contain valid JSON then route it to another topic for
// triage and debugging.
return w.Write(e.Record(), transform.ToTopic("invalid_json"))
}
36 changes: 30 additions & 6 deletions src/transform-sdk/go/transform/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,40 @@ func (e *writeEvent) Record() Record {

type recordWriter struct {
outbuf *rwbuf.RWBuf
optbuf *rwbuf.RWBuf
}

func (w *recordWriter) Write(r Record) error {
func (w *recordWriter) Write(r Record, opts ...WriteOpt) error {
// Serialize the record
w.outbuf.Reset()
r.serializePayload(w.outbuf)
b := w.outbuf.ReadAll()
// Write the record back out to the broker
amt := int(writeRecord(unsafe.Pointer(&b[0]), int32(len(b))))
if amt != len(b) {
return errors.New("writing record failed with errno: " + strconv.Itoa(amt))

// Apply write options
wo := writeOpts{}
for _, opt := range opts {
opt.apply(&wo)
}

// Do the write
var amt int32
if wo.topic == "" {
// Directly write the record to the default output topic.
amt = writeRecord(unsafe.Pointer(&b[0]), int32(len(b)))
} else {
// Serialize the options
w.optbuf.Reset()
wo.serialize(w.optbuf)
o := w.optbuf.ReadAll()
amt = writeRecordWithOptions(
unsafe.Pointer(&b[0]),
int32(len(b)),
unsafe.Pointer(&o[0]),
int32(len(o)),
)
}
if int(amt) != len(b) {
return errors.New("writing record failed with errno: " + strconv.Itoa(int(amt)))
}
return nil
}
Expand All @@ -65,7 +89,7 @@ var (
currentHeader batchHeader = batchHeader{}
inbuf *rwbuf.RWBuf = rwbuf.New(128)
e writeEvent
w recordWriter = recordWriter{rwbuf.New(128)}
w recordWriter = recordWriter{rwbuf.New(128), rwbuf.New(32)}
)

// run our transformation loop
Expand Down
27 changes: 26 additions & 1 deletion src/transform-sdk/go/transform/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,39 @@ type WriteEvent interface {
Record() Record
}

type (
// writeOps is the internal struct carrying the options available for writes.
writeOpts struct {
topic string
}
// WriteOpt is an option to modify a Write.
WriteOpt interface{ apply(*writeOpts) }
// writeOptFunc is a closure implemenation of WriteOpt.
writeOptFunc func(*writeOpts)
)

// Implment WriteOpt for writeOptFunc
func (f writeOptFunc) apply(opts *writeOpts) {
f(opts)
}

// ToTopic specifies the output topic that the record will be written to.
func ToTopic(topic string) WriteOpt {
return writeOptFunc(func(o *writeOpts) {
o.topic = topic
})
}

// RecordWriter is an interface for writing transformed records to the destination topic.
type RecordWriter interface {
// Write writes a record to the output topic.
//
// When writing a record, only the key, value and headers are
// used other information like the timestamp will be overridden
// by the broker.
Write(Record) error
//
// WriteOpts can be added to control where records go, for example to another topic.
Write(Record, ...WriteOpt) error
}

// Headers are optional key/value pairs that are passed along with
Expand Down
19 changes: 19 additions & 0 deletions src/transform-sdk/go/transform/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package transform

import (
"encoding/binary"
"errors"

"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform/internal/rwbuf"
)
Expand Down Expand Up @@ -78,3 +79,21 @@ func (r Record) serializePayload(b *rwbuf.RWBuf) {
b.WriteVarint(0)
}
}

// Serialize the output topic option into an options buffer.
func (o writeOpts) serialize(b *rwbuf.RWBuf) {
_ = b.WriteByte(0x01)
b.WriteStringWithSize(o.topic)
}

func (o *writeOpts) deserialize(b *rwbuf.RWBuf) error {
k, err := b.ReadByte()
if err != nil {
return err
}
if k != 0x01 {
return errors.New("unknown options key")
}
o.topic, err = b.ReadSizedStringCopy()
return err
}
16 changes: 15 additions & 1 deletion src/transform-sdk/go/transform/serialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func makeRandomRecord() Record {
}
}

func TestRoundTrip(t *testing.T) {
func TestRecordRoundTrip(t *testing.T) {
r := makeRandomRecord()
b := rwbuf.New(0)
r.serializePayload(b)
Expand All @@ -86,3 +86,17 @@ func TestRoundTrip(t *testing.T) {
t.Fatalf("%#v != %#v", r, output)
}
}

func TestWriteOptionsRoundTrip(t *testing.T) {
original := writeOpts{topic: "foobar"}
b := rwbuf.New(0)
original.serialize(b)
got := writeOpts{}
err := got.deserialize(b)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(original, got) {
t.Fatalf("%#v != %#v", original, got)
}
}
4 changes: 4 additions & 0 deletions src/transform-sdk/go/transform/stub_abi.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ func readNextRecord(
func writeRecord(buf unsafe.Pointer, len int32) int32 {
panic("stub")
}

func writeRecordWithOptions(data unsafe.Pointer, dataLength int32, opts unsafe.Pointer, optsLength int32) int32 {
panic("stub")
}

0 comments on commit 9631be1

Please sign in to comment.