Skip to content

Commit

Permalink
chore(datastore): use revison instead of timestamp as record versions (
Browse files Browse the repository at this point in the history
  • Loading branch information
xuchuan authored Sep 26, 2023
1 parent 1390f25 commit 3977573
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ private RecordList queryRecordList(QueryTableRequest request) {
.rawResult(request.isRawResult())
.encodeWithType(request.isEncodeWithType())
.ignoreNonExistingTable(request.isIgnoreNonExistingTable())
.timestamp(StringUtils.hasText(request.getRevision()) ? Long.parseLong(request.getRevision()) : 0)
.revision(StringUtils.hasText(request.getRevision()) ? Long.parseLong(request.getRevision()) : 0)
.build());
}

Expand Down Expand Up @@ -502,13 +502,13 @@ private RecordList scanRecordList(ScanTableRequest request) {
"table name should not be null or empty: " + x
);
}
var ts = StringUtils.hasText(x.getRevision()) ? Long.parseLong(x.getRevision()) : 0;
var revision = StringUtils.hasText(x.getRevision()) ? Long.parseLong(x.getRevision()) : 0;
return DataStoreScanRequest.TableInfo.builder()
.tableName(x.getTableName())
.columnPrefix(x.getColumnPrefix())
.columns(DataStoreController.convertColumns(x.getColumns()))
.keepNone(x.isKeepNone())
.timestamp(ts)
.revision(revision)
.build();
})
.collect(Collectors.toList()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,12 @@ public RecordList query(DataStoreQueryRequest req) {
var schema = table.getSchema();
var columns = this.getColumnAliases(schema, req.getColumns());
var results = new ArrayList<RecordResult>();
var timestamp = req.getTimestamp();
if (timestamp == 0) {
timestamp = System.currentTimeMillis();
var revision = req.getRevision();
if (revision == 0) {
revision = table.getLastRevision();
}
var iterator = table.query(
timestamp,
revision,
columns,
req.getOrderBy(),
req.getFilter(),
Expand Down Expand Up @@ -324,30 +324,28 @@ public RecordList scan(DataStoreScanRequest req) {
class TableMeta {

String tableName;
long timestamp;
long revision;
MemoryTable table;
TableSchema schema;
Map<String, String> columns;
Map<String, ColumnSchema> columnSchemaMap;
boolean keepNone;
}

var currentTimestamp = System.currentTimeMillis();

var tables = req.getTables().stream().map(info -> {
var ret = new TableMeta();
ret.tableName = info.getTableName();
if (info.getTimestamp() > 0) {
ret.timestamp = info.getTimestamp();
} else if (req.getTimestamp() > 0) {
ret.timestamp = req.getTimestamp();
} else {
ret.timestamp = currentTimestamp;
}
ret.table = this.getTable(info.getTableName(), req.isIgnoreNonExistingTable(), false);
if (ret.table == null) {
return null;
}
if (info.getRevision() > 0) {
ret.revision = info.getRevision();
} else if (req.getRevision() > 0) {
ret.revision = req.getRevision();
} else {
ret.revision = ret.table.getLastRevision();
}
ret.schema = ret.table.getSchema();
ret.columns = this.getColumnAliases(ret.schema, info.getColumns());
if (info.getColumnPrefix() != null) {
Expand Down Expand Up @@ -424,7 +422,7 @@ class TableRecords {
for (var table : tables) {
var r = new TableRecords();
r.meta = table;
r.iterator = table.table.scan(table.timestamp,
r.iterator = table.table.scan(table.revision,
table.columns,
req.getStart(),
req.getStartType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class DataStoreQueryRequest {

private String tableName;
// timestamp in milliseconds, used to filter out the data that is older than the timestamp for this table
private long timestamp;
private long revision;
private Map<String, String> columns;
private List<OrderByDesc> orderBy;
private boolean descending;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,14 @@ public static class TableInfo {

private String tableName;
// timestamp in milliseconds, used to filter out the data that is older than the timestamp for this table
private long timestamp;
private long revision;
private String columnPrefix;
private Map<String, String> columns;
private boolean keepNone;
}

private List<TableInfo> tables;
// timestamp in milliseconds, used to filter out the data that is older than the timestamp for all tables
private long timestamp;
private long revision;
private String start;
private String startType;
@Builder.Default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,7 @@ Iterator<RecordResult> scan(

long getLastUpdateTime();

long getLastRevision();

Map<String, ColumnStatistics> getColumnStatistics(Map<String, String> columnMapping);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
@Builder
public class MemoryRecord {

private long timestamp;
private long revision;
private boolean deleted;
private Map<String, BaseValue> values;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;

@Slf4j
public class MemoryTableImpl implements MemoryTable {

private static final String TIMESTAMP_COLUMN_NAME = "^";
private static final String REVISION_COLUMN_NAME = "^";

static final String DELETED_FLAG_COLUMN_NAME = "-";

Expand All @@ -101,6 +102,15 @@ public class MemoryTableImpl implements MemoryTable {
@Getter
private long lastUpdateTime = 0;

@Getter
private transient long lastRevision = 0;

@Setter
private boolean useTimestampAsRevision = false; // unittest only

private static final long MIN_TIMESTAMP = 86400L * 1000 * 365 * 50;
private static final long MAX_TIMESTAMP = 86400L * 1000 * 365 * 100;

private final TreeMap<BaseValue, List<MemoryRecord>> recordMap = new TreeMap<>();

private final Lock lock = new ReentrantLock();
Expand Down Expand Up @@ -145,18 +155,19 @@ var record = reader.read();
var metadata = metaBuilder.build();
this.lastWalLogId = metadata.getLastWalLogId();
this.lastUpdateTime = metadata.getLastUpdateTime();
this.lastRevision = metadata.getLastRevision();
}
if (record == null) {
break;
}
var key = record.remove(this.schema.getKeyColumn());
this.statisticsMap.computeIfAbsent(this.schema.getKeyColumn(), k -> new ColumnStatistics())
.update(key);
var timestamp = (Int64Value) record.remove(TIMESTAMP_COLUMN_NAME);
var revision = (Int64Value) record.remove(REVISION_COLUMN_NAME);
var deletedFlag = (BoolValue) record.remove(DELETED_FLAG_COLUMN_NAME);
this.recordMap.computeIfAbsent(key, k -> new ArrayList<>())
.add(MemoryRecord.builder()
.timestamp(timestamp.getValue())
.revision(revision.getValue())
.deleted(deletedFlag.isValue())
.values(record)
.build());
Expand Down Expand Up @@ -184,6 +195,7 @@ public void save() throws IOException {
metadata = JsonFormat.printer().print(TableMeta.MetaData.newBuilder()
.setLastWalLogId(this.lastWalLogId)
.setLastUpdateTime(this.lastUpdateTime)
.setLastRevision(this.lastRevision)
.build());
} catch (InvalidProtocolBufferException e) {
throw new SwProcessException(ErrorType.DATASTORE, "failed to print table meta", e);
Expand All @@ -193,11 +205,11 @@ public void save() throws IOException {
for (var entry : new TreeMap<>(this.statisticsMap).entrySet()) {
columnSchema.put(entry.getKey(), entry.getValue().createSchema(entry.getKey(), index++));
}
var timestampColumnSchema = new ColumnSchema(TIMESTAMP_COLUMN_NAME, index++);
timestampColumnSchema.setType(ColumnType.INT64);
var revisionColumnSchema = new ColumnSchema(REVISION_COLUMN_NAME, index++);
revisionColumnSchema.setType(ColumnType.INT64);
var deletedFlagColumnSchema = new ColumnSchema(DELETED_FLAG_COLUMN_NAME, index);
deletedFlagColumnSchema.setType(ColumnType.BOOL);
columnSchema.put(TIMESTAMP_COLUMN_NAME, timestampColumnSchema);
columnSchema.put(REVISION_COLUMN_NAME, revisionColumnSchema);
columnSchema.put(DELETED_FLAG_COLUMN_NAME, deletedFlagColumnSchema);

try {
Expand All @@ -217,7 +229,7 @@ public void save() throws IOException {
recordMap.putAll(record.getValues());
}
recordMap.put(this.schema.getKeyColumn(), entry.getKey());
recordMap.put(TIMESTAMP_COLUMN_NAME, new Int64Value(record.getTimestamp()));
recordMap.put(REVISION_COLUMN_NAME, new Int64Value(record.getRevision()));
recordMap.put(DELETED_FLAG_COLUMN_NAME, BaseValue.valueOf(record.isDeleted()));
list.add(recordMap);
}
Expand Down Expand Up @@ -250,7 +262,7 @@ public void updateFromWal(Wal.WalEntry entry) {
}
var recordList = entry.getRecordsList();
if (!recordList.isEmpty()) {
this.insertRecords(entry.getTimestamp(),
this.insertRecords(entry.getRevision(),
recordList.stream()
.map(r -> WalRecordDecoder.decodeRecord(this.schema, r))
.collect(Collectors.toList()));
Expand Down Expand Up @@ -293,11 +305,11 @@ public long update(TableSchemaDesc schema, @NonNull List<Map<String, Object>> re
}
decodedRecords.add(decodedRecord);
}
var timestamp = System.currentTimeMillis();
var revision = this.useTimestampAsRevision ? System.currentTimeMillis() : this.lastRevision + 1;
var logEntryBuilder = Wal.WalEntry.newBuilder()
.setEntryType(Wal.WalEntry.Type.UPDATE)
.setTableName(this.tableName)
.setTimestamp(timestamp);
.setRevision(revision);
var logSchemaBuilder = this.schema.getDiff(schema);
if (logSchemaBuilder != null) {
logEntryBuilder.setTableSchema(logSchemaBuilder);
Expand All @@ -319,13 +331,20 @@ public long update(TableSchemaDesc schema, @NonNull List<Map<String, Object>> re
this.lastUpdateTime = System.currentTimeMillis();
this.schema = recordSchema;
if (!decodedRecords.isEmpty()) {
this.insertRecords(timestamp, decodedRecords);
this.insertRecords(revision, decodedRecords);
}
return revision;
}

return timestamp;
private long normalizeRevision(long revision) {
return revision >= MIN_TIMESTAMP && !this.useTimestampAsRevision ? revision - MAX_TIMESTAMP : revision;
}

private void insertRecords(long timestamp, List<Map<String, BaseValue>> records) {
private void insertRecords(long revision, List<Map<String, BaseValue>> records) {
revision = this.normalizeRevision(revision);
if (revision > this.lastRevision) {
this.lastRevision = revision;
}
for (var record : records) {
var newRecord = new HashMap<>(record);
var key = newRecord.remove(this.schema.getKeyColumn());
Expand All @@ -335,12 +354,12 @@ private void insertRecords(long timestamp, List<Map<String, BaseValue>> records)
if (deletedFlag) {
if (versions.isEmpty() || !versions.get(versions.size() - 1).isDeleted()) {
versions.add(MemoryRecord.builder()
.timestamp(timestamp)
.revision(revision)
.deleted(true)
.build());
}
} else {
var old = this.getRecordMap(key, versions, timestamp);
var old = this.getRecordMap(key, versions, revision);
if (old != null) {
for (var it = newRecord.entrySet().iterator(); it.hasNext(); ) {
var entry = it.next();
Expand All @@ -354,7 +373,7 @@ private void insertRecords(long timestamp, List<Map<String, BaseValue>> records)
}
if (versions.isEmpty() || !newRecord.isEmpty()) {
versions.add(MemoryRecord.builder()
.timestamp(timestamp)
.revision(revision)
.values(newRecord)
.build());
}
Expand All @@ -366,12 +385,14 @@ private void insertRecords(long timestamp, List<Map<String, BaseValue>> records)
}
}

private Map<String, BaseValue> getRecordMap(BaseValue key, List<MemoryRecord> versions, long timestamp) {

private Map<String, BaseValue> getRecordMap(BaseValue key, List<MemoryRecord> versions, long revision) {
revision = this.normalizeRevision(revision);
var ret = new HashMap<String, BaseValue>();
boolean deleted = false;
boolean hasVersion = false;
for (var record : versions) {
if (record.getTimestamp() <= timestamp) {
if (record.getRevision() <= revision) {
// record may be empty, use hasVersion to mark if there is a record
hasVersion = true;
if (record.isDeleted()) {
Expand All @@ -395,7 +416,7 @@ private Map<String, BaseValue> getRecordMap(BaseValue key, List<MemoryRecord> ve

@Override
public Iterator<RecordResult> query(
long timestamp,
long revision,
@NonNull Map<String, String> columns,
List<OrderByDesc> orderBy,
TableQueryFilter filter,
Expand All @@ -421,7 +442,7 @@ public Iterator<RecordResult> query(
this.checkFilter(filter);
}
var stream = this.recordMap.entrySet().stream()
.map(entry -> this.getRecordMap(entry.getKey(), entry.getValue(), timestamp))
.map(entry -> this.getRecordMap(entry.getKey(), entry.getValue(), revision))
.filter(record -> record != null && (filter == null || this.match(filter, record)));
if (orderBy != null) {
stream = stream.sorted((a, b) -> {
Expand All @@ -445,7 +466,7 @@ public Iterator<RecordResult> query(

@Override
public Iterator<RecordResult> scan(
long timestamp,
long revision,
@NonNull Map<String, String> columns,
String start,
String startType,
Expand Down Expand Up @@ -510,7 +531,7 @@ public Iterator<RecordResult> scan(
return Collections.emptyIterator();
}
var iterator = this.recordMap.subMap(startKey, startInclusive, endKey, endInclusive).entrySet().stream()
.map(entry -> this.getRecordMap(entry.getKey(), entry.getValue(), timestamp))
.map(entry -> this.getRecordMap(entry.getKey(), entry.getValue(), revision))
.filter(Objects::nonNull)
.iterator();
return new Iterator<>() {
Expand Down
1 change: 1 addition & 0 deletions server/controller/src/main/protobuf/table_meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ option java_package = "ai.starwhale.mlops.datastore";
message MetaData {
int64 last_wal_log_id = 1;
int64 last_update_time = 2;
int64 last_revision = 3;
}
2 changes: 1 addition & 1 deletion server/controller/src/main/protobuf/wal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ message WalEntry {
TableSchema table_schema = 3;
repeated Record records = 4;
int64 id = 5;
int64 timestamp = 6;
int64 revision = 6;
}
Loading

0 comments on commit 3977573

Please sign in to comment.