Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpk: add support for tombstone records in produce and consume #23264

Merged
merged 5 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions src/go/rpk/pkg/cli/topic/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,16 @@ func (c *consumer) writeRecordJSON(r *kgo.Record, toStdErr bool) {
Value string `json:"value"`
}

var valuePtr *string
if r.Value != nil {
valueStr := string(r.Value)
valuePtr = &valueStr
}

m := struct {
Topic string `json:"topic"`
Key string `json:"key,omitempty"`
Value string `json:"value,omitempty"`
Value *string `json:"value,omitempty"`
ValueSize *int `json:"value_size,omitempty"` // non-nil if --meta-only
Headers []Header `json:"headers,omitempty"`
Timestamp int64 `json:"timestamp"` // millis
Expand All @@ -284,7 +290,7 @@ func (c *consumer) writeRecordJSON(r *kgo.Record, toStdErr bool) {
}{
Topic: r.Topic,
Key: string(r.Key),
Value: string(r.Value),
Value: valuePtr,
Headers: make([]Header, 0, len(r.Headers)),
Timestamp: r.Timestamp.UnixNano() / 1e6,

Expand All @@ -293,8 +299,8 @@ func (c *consumer) writeRecordJSON(r *kgo.Record, toStdErr bool) {
}

if c.metaOnly {
size := len(m.Value)
m.Value = ""
size := len(r.Value)
m.Value = nil
m.ValueSize = &size
}

Expand Down Expand Up @@ -1045,6 +1051,13 @@ equals sign between the key and value, and with the value hex encoded. Header
formatting actually just parses the internal format as a record format, so all
of the above rules about %K, %V, text, and numbers apply.

VALUES

Values for consumed records can be omitted by using the '--meta-only' flag.
Tombstone records (records with a 'null' value) have their value omitted
from the JSON output by default. All other records, including those with
an empty-string value (""), will have their values printed.

EXAMPLES

A key and value, separated by a space and ending in newline:
Expand Down
29 changes: 27 additions & 2 deletions src/go/rpk/pkg/cli/topic/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,16 @@ func newProduceCommand(fs afero.Fs, p *config.Params) *cobra.Command {
if topicName == "" {
topicName = r.Topic
}
if tombstone && len(r.Value) == 0 {
r.Value = nil
if len(r.Value) == 0 {
if tombstone {
// `null` value
r.Value = nil
} else {
// empty byte-slice
r.Value = []byte{}
}
}

if defaultKeySerde != nil || isKeyTopicName {
keySerde := defaultKeySerde
if isKeyTopicName && keySerde == nil {
Expand Down Expand Up @@ -444,6 +451,24 @@ message name is unnecessary. For example:
Produce to 'foo', using schema ID 1, message FQN Person.Name
rpk topic produce foo --schema-id 1 --schema-type Person.Name

TOMBSTONES

By default, records produced without a value will have an empty-string value, "".
The below example produces a record with the key 'not_a_tombstone_record' and the
value "":
rpk topic produce foo -k not_a_tombstone_record
[ret]

Tombstone records (records with a 'null' value) can be produced by using the '-Z'
flag and creating empty-string value records. Using the same example from above,
but adding the '-Z' flag will produce a record with the key 'tombstone_record'
and the value 'null':
rpk topic produce foo -k tombstone_record -Z
[ret]

It is important to note that records produced with values of string "null" are
not considered tombstones by Redpanda.

EXAMPLES

In the below examples, we can parse many records at once. The produce command
Expand Down
17 changes: 14 additions & 3 deletions tests/rptest/clients/rpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,8 @@ def produce(self,
schema_id=None,
schema_key_id=None,
proto_msg=None,
proto_key_msg=None):
proto_key_msg=None,
tombstone=False):

if timeout is None:
# For produce, we use a lower timeout than the general
Expand All @@ -599,8 +600,16 @@ def produce(self,

cmd = [
'produce', '--key', key, '-z', f'{compression_type}',
'--delivery-timeout', f'{timeout}s', '-f', '%v', topic
'--delivery-timeout', f'{timeout}s', topic
]

# An empty msg needs to be a newline for stdin purposes.
if msg == '':
msg = '\n'

if msg not in ['\n']:
cmd += ['-f', '%v']

use_schema_registry = False
if headers:
cmd += ['-H ' + h for h in headers]
Expand All @@ -618,6 +627,8 @@ def produce(self,
if proto_key_msg is not None:
cmd += ["--schema-key-type", proto_key_msg]
use_schema_registry = True
if tombstone:
cmd += ["--tombstone"]

# Run remote process with a slightly higher timeout than the
# rpk delivery timeout, so that we get a clean-ish rpk timeout
Expand Down Expand Up @@ -2018,4 +2029,4 @@ def connect_version(self):
raise RpkException(f"unable to parse connect version from {out}")

def run_connect_arbitrary(self, cmd):
return self._run_connect(cmd)
return self._run_connect(cmd)
43 changes: 43 additions & 0 deletions tests/rptest/tests/rpk_topic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,46 @@ def cond():

raise ducktape.errors.TimeoutError(
f'Message in {topic} never appeared.')

@cluster(num_nodes=4)
def test_produce_and_consume_tombstones(self):
topic = 'rp_dt_test_produce_and_consume_tombstones'
self._rpk.create_topic(topic)

num_messages = 2

tombstone_key = 'ISTOMBSTONE'
tombstone_value = ''

# Producing a record with an empty value and -Z flag results in a tombstone.
self._rpk.produce(topic,
tombstone_key,
tombstone_value,
tombstone=True)

not_tombstone_key = 'ISNOTTOMBSTONE'

# Producing a record with an empty value without the -Z flag results in an empty value.
self._rpk.produce(topic,
not_tombstone_key,
tombstone_value,
tombstone=False)

c = RpkConsumer(self._ctx, self.redpanda, topic)
c.start()

def cond():
return len(c.messages) == num_messages

wait_until(cond,
timeout_sec=30,
backoff_sec=2,
err_msg=f'Messages in {topic} never appeared.')

# Tombstone records do not have a value in the returned JSON
assert c.messages[0]['key'] == tombstone_key
assert 'value' not in c.messages[0]

# Records with an empty string have a value of `""` in the returned JSON
assert c.messages[1]['key'] == not_tombstone_key
assert c.messages[1]['value'] == ""
Loading