From 3977573e004556b21b15766d928f09348a967478 Mon Sep 17 00:00:00 2001 From: xuchuan Date: Tue, 26 Sep 2023 20:52:07 +0800 Subject: [PATCH] chore(datastore): use revison instead of timestamp as record versions (#2804) --- .../mlops/api/DataStoreController.java | 6 +- .../starwhale/mlops/datastore/DataStore.java | 28 ++++---- .../datastore/DataStoreQueryRequest.java | 2 +- .../mlops/datastore/DataStoreScanRequest.java | 5 +- .../mlops/datastore/MemoryTable.java | 2 + .../mlops/datastore/impl/MemoryRecord.java | 2 +- .../mlops/datastore/impl/MemoryTableImpl.java | 65 ++++++++++++------- .../src/main/protobuf/table_meta.proto | 1 + server/controller/src/main/protobuf/wal.proto | 2 +- .../datastore/impl/MemoryTableImplTest.java | 19 ++---- 10 files changed, 74 insertions(+), 58 deletions(-) diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/DataStoreController.java b/server/controller/src/main/java/ai/starwhale/mlops/api/DataStoreController.java index 029ffbacd3..5e27186003 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/api/DataStoreController.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/DataStoreController.java @@ -311,7 +311,7 @@ private RecordList queryRecordList(QueryTableRequest request) { .rawResult(request.isRawResult()) .encodeWithType(request.isEncodeWithType()) .ignoreNonExistingTable(request.isIgnoreNonExistingTable()) - .timestamp(StringUtils.hasText(request.getRevision()) ? Long.parseLong(request.getRevision()) : 0) + .revision(StringUtils.hasText(request.getRevision()) ? Long.parseLong(request.getRevision()) : 0) .build()); } @@ -502,13 +502,13 @@ private RecordList scanRecordList(ScanTableRequest request) { "table name should not be null or empty: " + x ); } - var ts = StringUtils.hasText(x.getRevision()) ? Long.parseLong(x.getRevision()) : 0; + var revision = StringUtils.hasText(x.getRevision()) ? Long.parseLong(x.getRevision()) : 0; return DataStoreScanRequest.TableInfo.builder() .tableName(x.getTableName()) .columnPrefix(x.getColumnPrefix()) .columns(DataStoreController.convertColumns(x.getColumns())) .keepNone(x.isKeepNone()) - .timestamp(ts) + .revision(revision) .build(); }) .collect(Collectors.toList())) diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java index 8f228a9ec1..cbdf2d1777 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java @@ -238,12 +238,12 @@ public RecordList query(DataStoreQueryRequest req) { var schema = table.getSchema(); var columns = this.getColumnAliases(schema, req.getColumns()); var results = new ArrayList(); - var timestamp = req.getTimestamp(); - if (timestamp == 0) { - timestamp = System.currentTimeMillis(); + var revision = req.getRevision(); + if (revision == 0) { + revision = table.getLastRevision(); } var iterator = table.query( - timestamp, + revision, columns, req.getOrderBy(), req.getFilter(), @@ -324,7 +324,7 @@ public RecordList scan(DataStoreScanRequest req) { class TableMeta { String tableName; - long timestamp; + long revision; MemoryTable table; TableSchema schema; Map columns; @@ -332,22 +332,20 @@ class TableMeta { boolean keepNone; } - var currentTimestamp = System.currentTimeMillis(); - var tables = req.getTables().stream().map(info -> { var ret = new TableMeta(); ret.tableName = info.getTableName(); - if (info.getTimestamp() > 0) { - ret.timestamp = info.getTimestamp(); - } else if (req.getTimestamp() > 0) { - ret.timestamp = req.getTimestamp(); - } else { - ret.timestamp = currentTimestamp; - } ret.table = this.getTable(info.getTableName(), req.isIgnoreNonExistingTable(), false); if (ret.table == null) { return null; } + if (info.getRevision() > 0) { + ret.revision = info.getRevision(); + } else if (req.getRevision() > 0) { + ret.revision = req.getRevision(); + } else { + ret.revision = ret.table.getLastRevision(); + } ret.schema = ret.table.getSchema(); ret.columns = this.getColumnAliases(ret.schema, info.getColumns()); if (info.getColumnPrefix() != null) { @@ -424,7 +422,7 @@ class TableRecords { for (var table : tables) { var r = new TableRecords(); r.meta = table; - r.iterator = table.table.scan(table.timestamp, + r.iterator = table.table.scan(table.revision, table.columns, req.getStart(), req.getStartType(), diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreQueryRequest.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreQueryRequest.java index 4877bfbf39..3095708ff0 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreQueryRequest.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreQueryRequest.java @@ -31,7 +31,7 @@ public class DataStoreQueryRequest { private String tableName; // timestamp in milliseconds, used to filter out the data that is older than the timestamp for this table - private long timestamp; + private long revision; private Map columns; private List orderBy; private boolean descending; diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreScanRequest.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreScanRequest.java index acdd160aa6..7203ac0573 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreScanRequest.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreScanRequest.java @@ -37,15 +37,14 @@ public static class TableInfo { private String tableName; // timestamp in milliseconds, used to filter out the data that is older than the timestamp for this table - private long timestamp; + private long revision; private String columnPrefix; private Map columns; private boolean keepNone; } private List tables; - // timestamp in milliseconds, used to filter out the data that is older than the timestamp for all tables - private long timestamp; + private long revision; private String start; private String startType; @Builder.Default diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java index 832af82ee6..4d2e063560 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java @@ -58,5 +58,7 @@ Iterator scan( long getLastUpdateTime(); + long getLastRevision(); + Map getColumnStatistics(Map columnMapping); } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryRecord.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryRecord.java index 8c30c5d75c..091a37b385 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryRecord.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryRecord.java @@ -25,7 +25,7 @@ @Builder public class MemoryRecord { - private long timestamp; + private long revision; private boolean deleted; private Map values; diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java index b7ef01879d..a68be725ff 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java @@ -68,13 +68,14 @@ import java.util.stream.Collectors; import lombok.Getter; import lombok.NonNull; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.conf.Configuration; @Slf4j public class MemoryTableImpl implements MemoryTable { - private static final String TIMESTAMP_COLUMN_NAME = "^"; + private static final String REVISION_COLUMN_NAME = "^"; static final String DELETED_FLAG_COLUMN_NAME = "-"; @@ -101,6 +102,15 @@ public class MemoryTableImpl implements MemoryTable { @Getter private long lastUpdateTime = 0; + @Getter + private transient long lastRevision = 0; + + @Setter + private boolean useTimestampAsRevision = false; // unittest only + + private static final long MIN_TIMESTAMP = 86400L * 1000 * 365 * 50; + private static final long MAX_TIMESTAMP = 86400L * 1000 * 365 * 100; + private final TreeMap> recordMap = new TreeMap<>(); private final Lock lock = new ReentrantLock(); @@ -145,6 +155,7 @@ var record = reader.read(); var metadata = metaBuilder.build(); this.lastWalLogId = metadata.getLastWalLogId(); this.lastUpdateTime = metadata.getLastUpdateTime(); + this.lastRevision = metadata.getLastRevision(); } if (record == null) { break; @@ -152,11 +163,11 @@ var record = reader.read(); var key = record.remove(this.schema.getKeyColumn()); this.statisticsMap.computeIfAbsent(this.schema.getKeyColumn(), k -> new ColumnStatistics()) .update(key); - var timestamp = (Int64Value) record.remove(TIMESTAMP_COLUMN_NAME); + var revision = (Int64Value) record.remove(REVISION_COLUMN_NAME); var deletedFlag = (BoolValue) record.remove(DELETED_FLAG_COLUMN_NAME); this.recordMap.computeIfAbsent(key, k -> new ArrayList<>()) .add(MemoryRecord.builder() - .timestamp(timestamp.getValue()) + .revision(revision.getValue()) .deleted(deletedFlag.isValue()) .values(record) .build()); @@ -184,6 +195,7 @@ public void save() throws IOException { metadata = JsonFormat.printer().print(TableMeta.MetaData.newBuilder() .setLastWalLogId(this.lastWalLogId) .setLastUpdateTime(this.lastUpdateTime) + .setLastRevision(this.lastRevision) .build()); } catch (InvalidProtocolBufferException e) { throw new SwProcessException(ErrorType.DATASTORE, "failed to print table meta", e); @@ -193,11 +205,11 @@ public void save() throws IOException { for (var entry : new TreeMap<>(this.statisticsMap).entrySet()) { columnSchema.put(entry.getKey(), entry.getValue().createSchema(entry.getKey(), index++)); } - var timestampColumnSchema = new ColumnSchema(TIMESTAMP_COLUMN_NAME, index++); - timestampColumnSchema.setType(ColumnType.INT64); + var revisionColumnSchema = new ColumnSchema(REVISION_COLUMN_NAME, index++); + revisionColumnSchema.setType(ColumnType.INT64); var deletedFlagColumnSchema = new ColumnSchema(DELETED_FLAG_COLUMN_NAME, index); deletedFlagColumnSchema.setType(ColumnType.BOOL); - columnSchema.put(TIMESTAMP_COLUMN_NAME, timestampColumnSchema); + columnSchema.put(REVISION_COLUMN_NAME, revisionColumnSchema); columnSchema.put(DELETED_FLAG_COLUMN_NAME, deletedFlagColumnSchema); try { @@ -217,7 +229,7 @@ public void save() throws IOException { recordMap.putAll(record.getValues()); } recordMap.put(this.schema.getKeyColumn(), entry.getKey()); - recordMap.put(TIMESTAMP_COLUMN_NAME, new Int64Value(record.getTimestamp())); + recordMap.put(REVISION_COLUMN_NAME, new Int64Value(record.getRevision())); recordMap.put(DELETED_FLAG_COLUMN_NAME, BaseValue.valueOf(record.isDeleted())); list.add(recordMap); } @@ -250,7 +262,7 @@ public void updateFromWal(Wal.WalEntry entry) { } var recordList = entry.getRecordsList(); if (!recordList.isEmpty()) { - this.insertRecords(entry.getTimestamp(), + this.insertRecords(entry.getRevision(), recordList.stream() .map(r -> WalRecordDecoder.decodeRecord(this.schema, r)) .collect(Collectors.toList())); @@ -293,11 +305,11 @@ public long update(TableSchemaDesc schema, @NonNull List> re } decodedRecords.add(decodedRecord); } - var timestamp = System.currentTimeMillis(); + var revision = this.useTimestampAsRevision ? System.currentTimeMillis() : this.lastRevision + 1; var logEntryBuilder = Wal.WalEntry.newBuilder() .setEntryType(Wal.WalEntry.Type.UPDATE) .setTableName(this.tableName) - .setTimestamp(timestamp); + .setRevision(revision); var logSchemaBuilder = this.schema.getDiff(schema); if (logSchemaBuilder != null) { logEntryBuilder.setTableSchema(logSchemaBuilder); @@ -319,13 +331,20 @@ public long update(TableSchemaDesc schema, @NonNull List> re this.lastUpdateTime = System.currentTimeMillis(); this.schema = recordSchema; if (!decodedRecords.isEmpty()) { - this.insertRecords(timestamp, decodedRecords); + this.insertRecords(revision, decodedRecords); } + return revision; + } - return timestamp; + private long normalizeRevision(long revision) { + return revision >= MIN_TIMESTAMP && !this.useTimestampAsRevision ? revision - MAX_TIMESTAMP : revision; } - private void insertRecords(long timestamp, List> records) { + private void insertRecords(long revision, List> records) { + revision = this.normalizeRevision(revision); + if (revision > this.lastRevision) { + this.lastRevision = revision; + } for (var record : records) { var newRecord = new HashMap<>(record); var key = newRecord.remove(this.schema.getKeyColumn()); @@ -335,12 +354,12 @@ private void insertRecords(long timestamp, List> records) if (deletedFlag) { if (versions.isEmpty() || !versions.get(versions.size() - 1).isDeleted()) { versions.add(MemoryRecord.builder() - .timestamp(timestamp) + .revision(revision) .deleted(true) .build()); } } else { - var old = this.getRecordMap(key, versions, timestamp); + var old = this.getRecordMap(key, versions, revision); if (old != null) { for (var it = newRecord.entrySet().iterator(); it.hasNext(); ) { var entry = it.next(); @@ -354,7 +373,7 @@ private void insertRecords(long timestamp, List> records) } if (versions.isEmpty() || !newRecord.isEmpty()) { versions.add(MemoryRecord.builder() - .timestamp(timestamp) + .revision(revision) .values(newRecord) .build()); } @@ -366,12 +385,14 @@ private void insertRecords(long timestamp, List> records) } } - private Map getRecordMap(BaseValue key, List versions, long timestamp) { + + private Map getRecordMap(BaseValue key, List versions, long revision) { + revision = this.normalizeRevision(revision); var ret = new HashMap(); boolean deleted = false; boolean hasVersion = false; for (var record : versions) { - if (record.getTimestamp() <= timestamp) { + if (record.getRevision() <= revision) { // record may be empty, use hasVersion to mark if there is a record hasVersion = true; if (record.isDeleted()) { @@ -395,7 +416,7 @@ private Map getRecordMap(BaseValue key, List ve @Override public Iterator query( - long timestamp, + long revision, @NonNull Map columns, List orderBy, TableQueryFilter filter, @@ -421,7 +442,7 @@ public Iterator query( this.checkFilter(filter); } var stream = this.recordMap.entrySet().stream() - .map(entry -> this.getRecordMap(entry.getKey(), entry.getValue(), timestamp)) + .map(entry -> this.getRecordMap(entry.getKey(), entry.getValue(), revision)) .filter(record -> record != null && (filter == null || this.match(filter, record))); if (orderBy != null) { stream = stream.sorted((a, b) -> { @@ -445,7 +466,7 @@ public Iterator query( @Override public Iterator scan( - long timestamp, + long revision, @NonNull Map columns, String start, String startType, @@ -510,7 +531,7 @@ public Iterator scan( return Collections.emptyIterator(); } var iterator = this.recordMap.subMap(startKey, startInclusive, endKey, endInclusive).entrySet().stream() - .map(entry -> this.getRecordMap(entry.getKey(), entry.getValue(), timestamp)) + .map(entry -> this.getRecordMap(entry.getKey(), entry.getValue(), revision)) .filter(Objects::nonNull) .iterator(); return new Iterator<>() { diff --git a/server/controller/src/main/protobuf/table_meta.proto b/server/controller/src/main/protobuf/table_meta.proto index 0a6fb8e706..6eeaedd31f 100644 --- a/server/controller/src/main/protobuf/table_meta.proto +++ b/server/controller/src/main/protobuf/table_meta.proto @@ -5,4 +5,5 @@ option java_package = "ai.starwhale.mlops.datastore"; message MetaData { int64 last_wal_log_id = 1; int64 last_update_time = 2; + int64 last_revision = 3; } \ No newline at end of file diff --git a/server/controller/src/main/protobuf/wal.proto b/server/controller/src/main/protobuf/wal.proto index 41adb6a799..2b398d6a85 100644 --- a/server/controller/src/main/protobuf/wal.proto +++ b/server/controller/src/main/protobuf/wal.proto @@ -50,5 +50,5 @@ message WalEntry { TableSchema table_schema = 3; repeated Record records = 4; int64 id = 5; - int64 timestamp = 6; + int64 revision = 6; } diff --git a/server/controller/src/test/java/ai/starwhale/mlops/datastore/impl/MemoryTableImplTest.java b/server/controller/src/test/java/ai/starwhale/mlops/datastore/impl/MemoryTableImplTest.java index 155c175499..88465871c7 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/datastore/impl/MemoryTableImplTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/datastore/impl/MemoryTableImplTest.java @@ -637,6 +637,7 @@ public void testUpdateWalError() { @Test public void testUpdateFromWal() throws IOException { + this.memoryTable.setUseTimestampAsRevision(true); this.memoryTable.update( new TableSchemaDesc("key", List.of( ColumnSchemaDesc.builder().name("key").type("STRING").build(), @@ -776,6 +777,7 @@ public void testUpdateFromWal() throws IOException { } }); } + this.memoryTable.setUseTimestampAsRevision(false); this.memoryTable.update(desc, records); MemoryTableImplTest.this.walManager.terminate(); MemoryTableImplTest.this.walManager = new WalManager(MemoryTableImplTest.this.storageAccessService, @@ -3085,24 +3087,17 @@ public void testScanUnknown() { } @Test - public void testQueryScanTimestamp() throws Exception { - var t1 = System.currentTimeMillis(); - Thread.sleep(100); - this.memoryTable.update(this.memoryTable.getSchema().toTableSchemaDesc(), + public void testQueryScanVersion() { + var t1 = this.memoryTable.getLastRevision(); + var t2 = this.memoryTable.update(this.memoryTable.getSchema().toTableSchemaDesc(), List.of(Map.of("key", "0", "d", "7", "e", "8"), Map.of("key", "9", "d", "6"))); - var t2 = System.currentTimeMillis(); - Thread.sleep(100); - this.memoryTable.update(this.memoryTable.getSchema().toTableSchemaDesc(), + var t3 = this.memoryTable.update(this.memoryTable.getSchema().toTableSchemaDesc(), List.of(Map.of("key", "0", "d", "8", "h", "t"), Map.of("key", "9", "-", "1"))); - var t3 = System.currentTimeMillis(); - Thread.sleep(100); - this.memoryTable.update(this.memoryTable.getSchema().toTableSchemaDesc(), + var t4 = this.memoryTable.update(this.memoryTable.getSchema().toTableSchemaDesc(), List.of(Map.of("key", "0", "d", "9"), Map.of("key", "9", "d", "8"))); - var t4 = System.currentTimeMillis(); - Thread.sleep(100); this.memoryTable.update(this.memoryTable.getSchema().toTableSchemaDesc(), List.of(Map.of("key", "a", "d", "7"))); var columns = Map.of("d", "d", "e", "e", "h", "h");