Skip to content

Commit

Permalink
fix(conector-node): do not store sink row inside upsert iceberg sink (r…
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Mar 17, 2023
1 parent 6fd8821 commit 7cd7c9d
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.grpc.Status;
import java.util.List;
import java.util.TreeMap;
import org.apache.iceberg.data.Record;

public class SinkRowMap {
TreeMap<List<Comparable<Object>>, SinkRowOp> map = new TreeMap<>(new PkComparator());
Expand All @@ -27,7 +28,7 @@ public void clear() {
map.clear();
}

public void insert(List<Comparable<Object>> key, SinkRow row) {
public void insert(List<Comparable<Object>> key, Record row) {
if (!map.containsKey(key)) {
map.put(key, SinkRowOp.insertOp(row));
} else {
Expand All @@ -42,19 +43,20 @@ public void insert(List<Comparable<Object>> key, SinkRow row) {
}
}

public void delete(List<Comparable<Object>> key, SinkRow row) {
public void delete(List<Comparable<Object>> key, Record row) {
if (!map.containsKey(key)) {
map.put(key, SinkRowOp.deleteOp(row));
} else {
SinkRowOp sinkRowOp = map.get(key);
SinkRow insert = sinkRowOp.getInsert();
Record insert = sinkRowOp.getInsert();
if (insert == null) {
throw Status.FAILED_PRECONDITION
.withDescription("try to double delete a primary key")
.asRuntimeException();
}
assertRowValuesEqual(insert, row);
SinkRow delete = sinkRowOp.getDelete();
// TODO: may enable it again
// assertRowValuesEqual(insert, row);
Record delete = sinkRowOp.getDelete();
if (delete != null) {
map.put(key, SinkRowOp.deleteOp(delete));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

package com.risingwave.connector;

import com.risingwave.connector.api.sink.SinkRow;
import io.grpc.Status;
import org.apache.iceberg.data.Record;

public class SinkRowOp {
private final SinkRow delete;
private final SinkRow insert;
private final Record delete;
private final Record insert;

public static SinkRowOp insertOp(SinkRow row) {
public static SinkRowOp insertOp(Record row) {
if (row == null) {
throw Status.FAILED_PRECONDITION
.withDescription("row op must not be null to initialize insertOp")
Expand All @@ -30,7 +30,7 @@ public static SinkRowOp insertOp(SinkRow row) {
return new SinkRowOp(null, row);
}

public static SinkRowOp deleteOp(SinkRow row) {
public static SinkRowOp deleteOp(Record row) {
if (row == null) {
throw Status.FAILED_PRECONDITION
.withDescription("row op must not be null to initialize deleteOp")
Expand All @@ -39,7 +39,7 @@ public static SinkRowOp deleteOp(SinkRow row) {
return new SinkRowOp(row, null);
}

public static SinkRowOp updateOp(SinkRow delete, SinkRow insert) {
public static SinkRowOp updateOp(Record delete, Record insert) {
if (delete == null || insert == null) {
throw Status.FAILED_PRECONDITION
.withDescription("row ops must not be null initialize updateOp")
Expand All @@ -48,7 +48,7 @@ public static SinkRowOp updateOp(SinkRow delete, SinkRow insert) {
return new SinkRowOp(delete, insert);
}

private SinkRowOp(SinkRow delete, SinkRow insert) {
private SinkRowOp(Record delete, Record insert) {
this.delete = delete;
this.insert = insert;
}
Expand All @@ -57,11 +57,11 @@ public boolean isDelete() {
return insert == null && delete != null;
}

public SinkRow getDelete() {
public Record getDelete() {
return delete;
}

public SinkRow getInsert() {
public Record getInsert() {
return insert;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public UpsertIcebergSink(
.collect(Collectors.toList());
}

private Record newRecord(Schema schema, SinkRow row) {
private static Record newRecord(Schema schema, SinkRow row) {
Record record = GenericRecord.create(schema);
for (int i = 0; i < schema.columns().size(); i++) {
record.set(i, row.get(i));
Expand Down Expand Up @@ -174,10 +174,10 @@ public void write(Iterator<SinkRow> rows) {
}
switch (row.getOp()) {
case INSERT:
sinkRowMap.insert(getKeyFromRow(row), row);
sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row));
break;
case DELETE:
sinkRowMap.delete(getKeyFromRow(row), row);
sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row));
break;
case UPDATE_DELETE:
if (updateBufferExists) {
Expand All @@ -186,7 +186,7 @@ public void write(Iterator<SinkRow> rows) {
"an UPDATE_INSERT should precede an UPDATE_DELETE")
.asRuntimeException();
}
sinkRowMap.delete(getKeyFromRow(row), row);
sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row));
updateBufferExists = true;
break;
case UPDATE_INSERT:
Expand All @@ -196,7 +196,7 @@ public void write(Iterator<SinkRow> rows) {
"an UPDATE_INSERT should precede an UPDATE_DELETE")
.asRuntimeException();
}
sinkRowMap.insert(getKeyFromRow(row), row);
sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row));
updateBufferExists = false;
break;
default:
Expand All @@ -217,13 +217,13 @@ public void sync() {
newEqualityDeleteWriter(entry.getKey());
DataWriter<Record> dataWriter = newDataWriter(entry.getKey());
for (SinkRowOp sinkRowOp : entry.getValue().map.values()) {
SinkRow insert = sinkRowOp.getInsert();
SinkRow delete = sinkRowOp.getDelete();
Record insert = sinkRowOp.getInsert();
Record delete = sinkRowOp.getDelete();
if (insert != null) {
dataWriter.write(newRecord(rowSchema, insert));
dataWriter.write(insert);
}
if (delete != null) {
equalityDeleteWriter.write(newRecord(deleteRowSchema, delete));
equalityDeleteWriter.write(delete);
}
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import com.risingwave.proto.Data;
import java.util.ArrayList;
import java.util.List;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -31,29 +35,42 @@ public void testInsert() {
SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1);
List<Comparable<Object>> key = new ArrayList<>();
key.add((Comparable<Object>) row.get(0));
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
Record r = GenericRecord.create(schema);
r.set(0, row.get(0));

sinkRowMap.insert(key, row);
sinkRowMap.insert(key, r);
assertEquals(1, sinkRowMap.map.size());
assertEquals(null, sinkRowMap.map.get(key).getDelete());
assertEquals(row, sinkRowMap.map.get(key).getInsert());
assertEquals(r, sinkRowMap.map.get(key).getInsert());
}

@Test
public void testInsertAfterDelete() {
SinkRowMap sinkRowMap = new SinkRowMap();
Schema schema =
new Schema(
Types.NestedField.optional(0, "id", Types.IntegerType.get()),
Types.NestedField.optional(1, "name", Types.StringType.get()));

SinkRow row1 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Alice");
List<Comparable<Object>> key1 = new ArrayList<>();
key1.add((Comparable<Object>) row1.get(0));
Record r1 = GenericRecord.create(schema);
r1.set(0, row1.get(0));
r1.set(1, row1.get(1));
SinkRow row2 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Bob");
List<Comparable<Object>> key2 = new ArrayList<>();
key2.add((Comparable<Object>) row2.get(0));
Record r2 = GenericRecord.create(schema);
r2.set(0, row2.get(0));
r2.set(1, row2.get(1));

sinkRowMap.delete(key1, row1);
sinkRowMap.insert(key1, row2);
sinkRowMap.delete(key1, r1);
sinkRowMap.insert(key1, r2);
assertEquals(1, sinkRowMap.map.size());
assertEquals(row1, sinkRowMap.map.get(key1).getDelete());
assertEquals(row2, sinkRowMap.map.get(key1).getInsert());
assertEquals(r1, sinkRowMap.map.get(key1).getDelete());
assertEquals(r2, sinkRowMap.map.get(key1).getInsert());
}

@Test
Expand All @@ -62,11 +79,14 @@ public void testInsertAfterInsert() {
SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1);
List<Comparable<Object>> key = new ArrayList<>();
key.add((Comparable<Object>) row.get(0));
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
Record r = GenericRecord.create(schema);
r.set(0, row.get(0));

sinkRowMap.insert(key, row);
sinkRowMap.insert(key, r);
boolean exceptionThrown = false;
try {
sinkRowMap.insert(key, row);
sinkRowMap.insert(key, r);
} catch (RuntimeException e) {
exceptionThrown = true;
Assert.assertTrue(
Expand All @@ -87,10 +107,14 @@ public void testDelete() {
List<Comparable<Object>> key = new ArrayList<>();
key.add((Comparable<Object>) row.get(0));

sinkRowMap.delete(key, row);
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
Record r = GenericRecord.create(schema);
r.set(0, row.get(0));

sinkRowMap.delete(key, r);
assertEquals(1, sinkRowMap.map.size());
assertEquals(null, sinkRowMap.map.get(key).getInsert());
assertEquals(row, sinkRowMap.map.get(key).getDelete());
assertEquals(r, sinkRowMap.map.get(key).getDelete());
}

@Test
Expand All @@ -100,10 +124,14 @@ public void testDeleteAfterDelete() {
List<Comparable<Object>> key = new ArrayList<>();
key.add((Comparable<Object>) row.get(0));

sinkRowMap.delete(key, row);
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
Record r = GenericRecord.create(schema);
r.set(0, row.get(0));

sinkRowMap.delete(key, r);
boolean exceptionThrown = false;
try {
sinkRowMap.delete(key, row);
sinkRowMap.delete(key, r);
} catch (RuntimeException e) {
exceptionThrown = true;
Assert.assertTrue(
Expand All @@ -122,28 +150,44 @@ public void testDeleteAfterInsert() {
List<Comparable<Object>> key = new ArrayList<>();
key.add((Comparable<Object>) row.get(0));

sinkRowMap.insert(key, row);
sinkRowMap.delete(key, row);
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
Record r = GenericRecord.create(schema);
r.set(0, row.get(0));

sinkRowMap.insert(key, r);
sinkRowMap.delete(key, r);
assertEquals(0, sinkRowMap.map.size());
}

@Test
public void testDeleteAfterUpdate() {
SinkRowMap sinkRowMap = new SinkRowMap();

Schema schema =
new Schema(
Types.NestedField.optional(0, "id", Types.IntegerType.get()),
Types.NestedField.optional(1, "name", Types.StringType.get()));

SinkRow row1 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Alice");
List<Comparable<Object>> key1 = new ArrayList<>();
key1.add((Comparable<Object>) row1.get(0));
Record r1 = GenericRecord.create(schema);
r1.set(0, row1.get(0));
r1.set(1, row1.get(1));

SinkRow row2 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Clare");
List<Comparable<Object>> key2 = new ArrayList<>();
key2.add((Comparable<Object>) row2.get(0));
Record r2 = GenericRecord.create(schema);
r2.set(0, row2.get(0));
r2.set(1, row2.get(1));

sinkRowMap.delete(key1, row1);
sinkRowMap.insert(key2, row2);
sinkRowMap.delete(key2, row2);
sinkRowMap.delete(key1, r1);
sinkRowMap.insert(key2, r2);
sinkRowMap.delete(key2, r2);
assertEquals(1, sinkRowMap.map.size());
assertEquals(null, sinkRowMap.map.get(key1).getInsert());
assertEquals(row1, sinkRowMap.map.get(key1).getDelete());
assertEquals(r1, sinkRowMap.map.get(key1).getDelete());
}

@Test
Expand All @@ -153,7 +197,10 @@ public void testClear() {
SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1);
List<Comparable<Object>> key = new ArrayList<>();
key.add((Comparable<Object>) row.get(0));
sinkRowMap.insert(key, row);
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
Record r = GenericRecord.create(schema);
r.set(0, row.get(0));
sinkRowMap.insert(key, r);

sinkRowMap.clear();
assertEquals(0, sinkRowMap.map.size());
Expand Down

0 comments on commit 7cd7c9d

Please sign in to comment.