From 5118fe0eba98fb81392b87a716f0ceb9d4305be7 Mon Sep 17 00:00:00 2001 From: Zhexuan Yang Date: Fri, 2 Aug 2019 16:16:50 +0800 Subject: [PATCH] a better design for get auto table id --- .../com/pingcap/tispark/TiBatchWrite.scala | 2 +- .../datasource/RowIDAllocatorSuite.scala | 6 +- .../tikv/allocator/RowIDAllocator.java | 156 +++++++++++- .../com/pingcap/tikv/catalog/Catalog.java | 17 -- .../tikv/catalog/CatalogTransaction.java | 236 +----------------- .../com/pingcap/tikv/codec/MetaCodec.java | 116 +++++++++ 6 files changed, 277 insertions(+), 256 deletions(-) create mode 100644 tikv-client/src/main/java/com/pingcap/tikv/codec/MetaCodec.java diff --git a/core/src/main/scala/com/pingcap/tispark/TiBatchWrite.scala b/core/src/main/scala/com/pingcap/tispark/TiBatchWrite.scala index 82506e1b40..44ddcd846b 100644 --- a/core/src/main/scala/com/pingcap/tispark/TiBatchWrite.scala +++ b/core/src/main/scala/com/pingcap/tispark/TiBatchWrite.scala @@ -387,7 +387,7 @@ class TiBatchWrite(@transient val df: DataFrame, .create( tiDBInfo.getId, tiTableInfo.getId, - catalog, + tiSession.getConf, tiTableInfo.isAutoIncColUnsigned, step ) diff --git a/core/src/test/scala/com/pingcap/tispark/datasource/RowIDAllocatorSuite.scala b/core/src/test/scala/com/pingcap/tispark/datasource/RowIDAllocatorSuite.scala index ed4f0ac79b..defd518d0f 100644 --- a/core/src/test/scala/com/pingcap/tispark/datasource/RowIDAllocatorSuite.scala +++ b/core/src/test/scala/com/pingcap/tispark/datasource/RowIDAllocatorSuite.scala @@ -18,7 +18,7 @@ class RowIDAllocatorSuite extends BaseTiSparkTest { ti.tiSession.getCatalog.getTable(dbName, tableName) // corner case allocate unsigned long's max value. val allocator = - RowIDAllocator.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getCatalog, true, -2L) + RowIDAllocator.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getConf, true, -2L) assert(allocator.getEnd - allocator.getStart == -2L) } @@ -35,11 +35,11 @@ class RowIDAllocatorSuite extends BaseTiSparkTest { val tiTableInfo = ti.tiSession.getCatalog.getTable(dbName, tableName) var allocator = - RowIDAllocator.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getCatalog, false, 1000) + RowIDAllocator.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getConf, false, 1000) assert(allocator.getEnd - allocator.getStart == 1000) allocator = RowIDAllocator - .create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getCatalog, false, 10000) + .create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getConf, false, 10000) assert(allocator.getEnd - allocator.getStart == 10000) } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/allocator/RowIDAllocator.java b/tikv-client/src/main/java/com/pingcap/tikv/allocator/RowIDAllocator.java index 2c08d0f95e..bddd66e631 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/allocator/RowIDAllocator.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/allocator/RowIDAllocator.java @@ -15,8 +15,20 @@ package com.pingcap.tikv.allocator; import com.google.common.primitives.UnsignedLongs; -import com.pingcap.tikv.catalog.Catalog; +import com.google.protobuf.ByteString; +import com.pingcap.tikv.Snapshot; +import com.pingcap.tikv.TiConfiguration; +import com.pingcap.tikv.TiSession; +import com.pingcap.tikv.TiSessionCache; +import com.pingcap.tikv.TwoPhaseCommitter; +import com.pingcap.tikv.codec.CodecDataInput; +import com.pingcap.tikv.codec.CodecDataOutput; +import com.pingcap.tikv.codec.MetaCodec; import com.pingcap.tikv.exception.TiBatchWriteException; +import com.pingcap.tikv.util.BackOffer; +import com.pingcap.tikv.util.ConcreteBackOffer; +import java.util.Arrays; +import java.util.function.Function; /** * RowIDAllocator read current start from TiKV and write back 'start+step' back to TiKV. It designs @@ -26,19 +38,21 @@ public final class RowIDAllocator { private long end; private final long dbId; private long step; + private final TiConfiguration conf; - private RowIDAllocator(long dbId, long step) { + private RowIDAllocator(long dbId, long step, TiConfiguration conf) { this.dbId = dbId; this.step = step; + this.conf = conf; } public static RowIDAllocator create( - long dbId, long tableId, Catalog catalog, boolean unsigned, long step) { - RowIDAllocator allocator = new RowIDAllocator(dbId, step); + long dbId, long tableId, TiConfiguration conf, boolean unsigned, long step) { + RowIDAllocator allocator = new RowIDAllocator(dbId, step, conf); if (unsigned) { - allocator.initUnsigned(catalog, tableId); + allocator.initUnsigned(TiSession.create(conf).createSnapshot(), tableId); } else { - allocator.initSigned(catalog, tableId); + allocator.initSigned(TiSession.create(conf).createSnapshot(), tableId); } return allocator; } @@ -50,11 +64,129 @@ public long getStart() { public long getEnd() { return end; } + /** + * read current row id from TiKV and write the calculated value back to TiKV. The calculation rule + * is start(read from TiKV) + step. + */ - private void initSigned(Catalog catalog, long tableId) { + /** read current row id from TiKV according to database id and table id. */ + + // set key value pair to tikv via two phase committer protocol. + private void set(ByteString key, byte[] value) { + TiSession session = TiSessionCache.getSession(conf); + TwoPhaseCommitter twoPhaseCommitter = + new TwoPhaseCommitter(conf, session.getTimestamp().getVersion()); + + twoPhaseCommitter.prewritePrimaryKey( + ConcreteBackOffer.newCustomBackOff(BackOffer.PREWRITE_MAX_BACKOFF), + key.toByteArray(), + value); + + twoPhaseCommitter.commitPrimaryKey( + ConcreteBackOffer.newCustomBackOff(BackOffer.BATCH_COMMIT_BACKOFF), + key.toByteArray(), + session.getTimestamp().getVersion()); + } + + private void updateMeta(ByteString key, byte[] oldVal, Snapshot snapshot) { + // 1. encode hash meta key + // 2. load meta via hash meta key from TiKV + // 3. update meta's filed count and set it back to TiKV + CodecDataOutput cdo = new CodecDataOutput(); + ByteString metaKey = MetaCodec.encodeHashMetaKey(cdo, key.toByteArray()); + long fieldCount; + ByteString metaVal = snapshot.get(metaKey); + + // decode long from bytes + // big endian the 8 bytes + fieldCount = new CodecDataInput(metaVal.toByteArray()).readLong(); + + // update meta field count only oldVal is null + if (oldVal == null || oldVal.length == 0) { + fieldCount++; + cdo.reset(); + cdo.writeLong(fieldCount); + + set(metaKey, cdo.toBytes()); + } + } + + private long updateHash( + ByteString key, + ByteString field, + Function calculateNewVal, + Snapshot snapshot) { + // 1. encode hash data key + // 2. get value in byte from get operation + // 3. calculate new value via calculateNewVal + // 4. check old value equals to new value or not + // 5. set the new value back to TiKV via 2pc + // 6. encode a hash meta key + // 7. update a hash meta field count if needed + + CodecDataOutput cdo = new CodecDataOutput(); + MetaCodec.encodeHashDataKey(cdo, key.toByteArray(), field.toByteArray()); + ByteString dataKey = cdo.toByteString(); + byte[] oldVal = snapshot.get(dataKey.toByteArray()); + + byte[] newVal = calculateNewVal.apply(oldVal); + if (Arrays.equals(newVal, oldVal)) { + // not need to update + return 0L; + } + + set(dataKey, newVal); + updateMeta(key, oldVal, snapshot); + return Long.parseLong(new String(newVal)); + } + + private boolean isDBExisted(long dbId, Snapshot snapshot) { + ByteString dbKey = MetaCodec.encodeDatabaseID(dbId); + ByteString json = MetaCodec.hashGet(MetaCodec.KEY_DBs, dbKey, snapshot); + if (json == null || json.isEmpty()) { + return false; + } + return true; + } + + private boolean isTableExisted(long dbId, long tableId, Snapshot snapshot) { + ByteString dbKey = MetaCodec.encodeDatabaseID(dbId); + ByteString tableKey = MetaCodec.tableKey(tableId); + return !MetaCodec.hashGet(dbKey, tableKey, snapshot).isEmpty(); + } + + public long getAutoTableId(long dbId, long tableId, long step, Snapshot snapshot) { + if (isDBExisted(dbId, snapshot) && isTableExisted(dbId, tableId, snapshot)) { + return updateHash( + MetaCodec.encodeDatabaseID(dbId), + MetaCodec.autoTableIDKey(tableId), + (oldVal) -> { + long base = 0; + if (oldVal != null && oldVal.length != 0) { + base = Long.parseLong(new String(oldVal)); + } + + base += step; + return String.valueOf(base).getBytes(); + }, + snapshot); + } + + throw new IllegalArgumentException("table or database is not existed"); + } + + public long getAutoTableId(long dbId, long tableId, Snapshot snapshot) { + ByteString dbKey = MetaCodec.encodeDatabaseID(dbId); + ByteString tblKey = MetaCodec.autoTableIDKey(tableId); + ByteString val = MetaCodec.hashGet(dbKey, tblKey, snapshot); + if (val.isEmpty()) return 0L; + return Long.parseLong(val.toStringUtf8()); + } + + private void initSigned(Snapshot snapshot, long tableId) { long newEnd; // get new start from TiKV, and calculate new end and set it back to TiKV. - long newStart = catalog.getAutoTableId(dbId, tableId); + long newStart = getAutoTableId(dbId, tableId, snapshot); long tmpStep = Math.min(Long.MAX_VALUE - newStart, step); if (tmpStep != step) { throw new TiBatchWriteException("cannot allocate ids for this write"); @@ -62,15 +194,15 @@ private void initSigned(Catalog catalog, long tableId) { if (newStart == Long.MAX_VALUE) { throw new TiBatchWriteException("cannot allocate more ids since it "); } - newEnd = catalog.getAutoTableId(dbId, tableId, tmpStep); + newEnd = getAutoTableId(dbId, tableId, tmpStep, snapshot); end = newEnd; } - private void initUnsigned(Catalog catalog, long tableId) { + private void initUnsigned(Snapshot snapshot, long tableId) { long newEnd; // get new start from TiKV, and calculate new end and set it back to TiKV. - long newStart = catalog.getAutoTableId(dbId, tableId); + long newStart = getAutoTableId(dbId, tableId, snapshot); // for unsigned long, -1L is max value. long tmpStep = UnsignedLongs.min(-1L - newStart, step); if (tmpStep != step) { @@ -81,7 +213,7 @@ private void initUnsigned(Catalog catalog, long tableId) { throw new TiBatchWriteException( "cannot allocate more ids since the start reaches " + "unsigned long's max value "); } - newEnd = catalog.getAutoTableId(dbId, tableId, tmpStep); + newEnd = getAutoTableId(dbId, tableId, tmpStep, snapshot); end = newEnd; } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/catalog/Catalog.java b/tikv-client/src/main/java/com/pingcap/tikv/catalog/Catalog.java index 05179ab609..a0d093110e 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/catalog/Catalog.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/catalog/Catalog.java @@ -156,23 +156,6 @@ public Catalog( periodUnit); } - /** - * read current row id from TiKV and write the calculated value back to TiKV. The calculation rule - * is start(read from TiKV) + step. - */ - public synchronized long getAutoTableId(long dbId, long tableId, long step) { - Snapshot snapshot = snapshotProvider.get(); - CatalogTransaction newTrx = new CatalogTransaction(snapshot); - return newTrx.getAutoTableId(dbId, tableId, step); - } - - /** read current row id from TiKV according to database id and table id. */ - public synchronized long getAutoTableId(long dbId, long tableId) { - Snapshot snapshot = snapshotProvider.get(); - CatalogTransaction newTrx = new CatalogTransaction(snapshot); - return newTrx.getAutoTableId(dbId, tableId); - } - public synchronized void reloadCache(boolean loadTables) { Snapshot snapshot = snapshotProvider.get(); CatalogTransaction newTrx = new CatalogTransaction(snapshot); diff --git a/tikv-client/src/main/java/com/pingcap/tikv/catalog/CatalogTransaction.java b/tikv-client/src/main/java/com/pingcap/tikv/catalog/CatalogTransaction.java index d0788710b1..24b372cbc8 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/catalog/CatalogTransaction.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/catalog/CatalogTransaction.java @@ -15,253 +15,43 @@ package com.pingcap.tikv.catalog; -import static com.google.common.base.Preconditions.checkArgument; +import static com.pingcap.tikv.codec.MetaCodec.KEY_DBs; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; -import com.pingcap.tikv.*; -import com.pingcap.tikv.codec.Codec.BytesCodec; -import com.pingcap.tikv.codec.Codec.IntegerCodec; +import com.pingcap.tikv.Snapshot; import com.pingcap.tikv.codec.CodecDataInput; -import com.pingcap.tikv.codec.CodecDataOutput; import com.pingcap.tikv.codec.KeyUtils; +import com.pingcap.tikv.codec.MetaCodec; import com.pingcap.tikv.exception.TiClientInternalException; import com.pingcap.tikv.meta.TiDBInfo; import com.pingcap.tikv.meta.TiTableInfo; -import com.pingcap.tikv.util.BackOffer; -import com.pingcap.tikv.util.ConcreteBackOffer; import com.pingcap.tikv.util.Pair; import java.nio.charset.StandardCharsets; -import java.util.*; -import java.util.function.Function; +import java.util.List; +import java.util.Objects; import org.apache.log4j.Logger; -import org.tikv.kvproto.Kvrpcpb; public class CatalogTransaction { protected static final Logger logger = Logger.getLogger(CatalogTransaction.class); private final Snapshot snapshot; - private final byte[] prefix; - private final TiConfiguration conf; - - private static final byte[] META_PREFIX = new byte[] {'m'}; - - private static final byte HASH_DATA_FLAG = 'h'; - private static final byte HASH_META_FLAG = 'H'; - private static final byte STR_DATA_FLAG = 's'; - - private static ByteString KEY_DBs = ByteString.copyFromUtf8("DBs"); - private static String KEY_TABLE = "Table"; - private static ByteString KEY_SCHEMA_VERSION = ByteString.copyFromUtf8("SchemaVersionKey"); - - private static final String ENCODED_DB_PREFIX = "DB"; - private static final String KEY_TID = "TID"; CatalogTransaction(Snapshot snapshot) { this.snapshot = snapshot; - this.conf = snapshot.getConf(); - this.prefix = META_PREFIX; - } - - private void encodeStringDataKey(CodecDataOutput cdo, byte[] key) { - cdo.write(prefix); - BytesCodec.writeBytes(cdo, key); - IntegerCodec.writeULong(cdo, STR_DATA_FLAG); - } - - private void encodeHashDataKey(CodecDataOutput cdo, byte[] key, byte[] field) { - cdo.write(prefix); - BytesCodec.writeBytes(cdo, key); - IntegerCodec.writeULong(cdo, HASH_DATA_FLAG); - BytesCodec.writeBytes(cdo, field); - } - - private ByteString encodeHashMetaKey(CodecDataOutput cdo, byte[] key) { - cdo.write(prefix); - BytesCodec.writeBytes(cdo, key); - IntegerCodec.writeULong(cdo, HASH_META_FLAG); - return cdo.toByteString(); - } - - private void encodeHashDataKeyPrefix(CodecDataOutput cdo, byte[] key) { - cdo.write(prefix); - BytesCodec.writeBytes(cdo, key); - IntegerCodec.writeULong(cdo, HASH_DATA_FLAG); - } - - private Pair decodeHashDataKey(ByteString rawKey) { - checkArgument( - KeyUtils.hasPrefix(rawKey, ByteString.copyFrom(prefix)), - "invalid encoded hash data key prefix: " + new String(prefix)); - CodecDataInput cdi = new CodecDataInput(rawKey.toByteArray()); - cdi.skipBytes(prefix.length); - byte[] key = BytesCodec.readBytes(cdi); - long typeFlag = IntegerCodec.readULong(cdi); - if (typeFlag != HASH_DATA_FLAG) { - throw new TiClientInternalException("Invalid hash data flag: " + typeFlag); - } - byte[] field = BytesCodec.readBytes(cdi); - return Pair.create(ByteString.copyFrom(key), ByteString.copyFrom(field)); - } - - private static ByteString autoTableIDKey(long tableId) { - return ByteString.copyFrom(String.format("%s:%d", KEY_TID, tableId).getBytes()); - } - - private static ByteString tableKey(long tableId) { - return ByteString.copyFrom(String.format("%s:%d", KEY_TABLE, tableId).getBytes()); - } - - private static ByteString encodeDatabaseID(long id) { - return ByteString.copyFrom(String.format("%s:%d", ENCODED_DB_PREFIX, id).getBytes()); - } - - private boolean isDBExisted(long dbId) { - return getDatabase(dbId) != null; - } - - private boolean isTableExisted(long dbId, long tableId) { - ByteString dbKey = encodeDatabaseID(dbId); - ByteString tableKey = tableKey(tableId); - return !hashGet(dbKey, tableKey).isEmpty(); - } - - // set key value pair to tikv via two phase committer protocol. - private void set(ByteString key, byte[] value) { - TiSession session = TiSessionCache.getSession(conf); - TwoPhaseCommitter twoPhaseCommitter = - new TwoPhaseCommitter(conf, session.getTimestamp().getVersion()); - - twoPhaseCommitter.prewritePrimaryKey( - ConcreteBackOffer.newCustomBackOff(BackOffer.PREWRITE_MAX_BACKOFF), - key.toByteArray(), - value); - - twoPhaseCommitter.commitPrimaryKey( - ConcreteBackOffer.newCustomBackOff(BackOffer.BATCH_COMMIT_BACKOFF), - key.toByteArray(), - session.getTimestamp().getVersion()); - } - - private void updateMeta(ByteString key, byte[] oldVal) { - // 1. encode hash meta key - // 2. load meta via hash meta key from TiKV - // 3. update meta's filed count and set it back to TiKV - CodecDataOutput cdo = new CodecDataOutput(); - ByteString metaKey = encodeHashMetaKey(cdo, key.toByteArray()); - long fieldCount; - ByteString metaVal = snapshot.get(metaKey); - - // decode long from bytes - // big endian the 8 bytes - fieldCount = new CodecDataInput(metaVal.toByteArray()).readLong(); - - // update meta field count only oldVal is null - if (oldVal == null || oldVal.length == 0) { - fieldCount++; - cdo.reset(); - cdo.writeLong(fieldCount); - - set(metaKey, cdo.toBytes()); - } - } - - private long updateHash( - ByteString key, ByteString field, Function calculateNewVal) { - // 1. encode hash data key - // 2. get value in byte from get operation - // 3. calculate new value via calculateNewVal - // 4. check old value equals to new value or not - // 5. set the new value back to TiKV via 2pc - // 6. encode a hash meta key - // 7. update a hash meta field count if needed - - CodecDataOutput cdo = new CodecDataOutput(); - encodeHashDataKey(cdo, key.toByteArray(), field.toByteArray()); - ByteString dataKey = cdo.toByteString(); - byte[] oldVal = snapshot.get(dataKey.toByteArray()); - - byte[] newVal = calculateNewVal.apply(oldVal); - if (Arrays.equals(newVal, oldVal)) { - // not need to update - return 0L; - } - - set(dataKey, newVal); - updateMeta(key, oldVal); - return Long.parseLong(new String(newVal)); - } - - public long getAutoTableId(long dbId, long tableId, long step) { - if (isDBExisted(dbId) && isTableExisted(dbId, tableId)) { - return updateHash( - encodeDatabaseID(dbId), - autoTableIDKey(tableId), - (oldVal) -> { - long base = 0; - if (oldVal != null && oldVal.length != 0) { - base = Long.parseLong(new String(oldVal)); - } - - base += step; - return String.valueOf(base).getBytes(); - }); - } - - throw new IllegalArgumentException("table or database is not existed"); - } - - public long getAutoTableId(long dbId, long tableId) { - ByteString dbKey = encodeDatabaseID(dbId); - ByteString tblKey = autoTableIDKey(tableId); - ByteString val = hashGet(dbKey, tblKey); - if (val.isEmpty()) return 0L; - return Long.parseLong(val.toStringUtf8()); - } - - private ByteString hashGet(ByteString key, ByteString field) { - CodecDataOutput cdo = new CodecDataOutput(); - encodeHashDataKey(cdo, key.toByteArray(), field.toByteArray()); - return snapshot.get(cdo.toByteString()); - } - - private ByteString bytesGet(ByteString key) { - CodecDataOutput cdo = new CodecDataOutput(); - encodeStringDataKey(cdo, key.toByteArray()); - return snapshot.get(cdo.toByteString()); - } - - private List> hashGetFields(ByteString key) { - CodecDataOutput cdo = new CodecDataOutput(); - encodeHashDataKeyPrefix(cdo, key.toByteArray()); - ByteString encodedKey = cdo.toByteString(); - - Iterator iterator = snapshot.scan(encodedKey); - List> fields = new ArrayList<>(); - while (iterator.hasNext()) { - Kvrpcpb.KvPair kv = iterator.next(); - if (kv == null || kv.getKey() == null) { - continue; - } - if (!KeyUtils.hasPrefix(kv.getKey(), encodedKey)) { - break; - } - fields.add(Pair.create(decodeHashDataKey(kv.getKey()).second, kv.getValue())); - } - - return fields; } long getLatestSchemaVersion() { - ByteString versionBytes = bytesGet(KEY_SCHEMA_VERSION); + ByteString versionBytes = MetaCodec.bytesGet(MetaCodec.KEY_SCHEMA_VERSION, this.snapshot); CodecDataInput cdi = new CodecDataInput(versionBytes.toByteArray()); return Long.parseLong(new String(cdi.toByteArray(), StandardCharsets.UTF_8)); } public List getDatabases() { - List> fields = hashGetFields(KEY_DBs); + List> fields = + MetaCodec.hashGetFields(MetaCodec.KEY_DBs, this.snapshot); ImmutableList.Builder builder = ImmutableList.builder(); for (Pair pair : fields) { builder.add(parseFromJson(pair.second, TiDBInfo.class)); @@ -270,8 +60,8 @@ public List getDatabases() { } TiDBInfo getDatabase(long id) { - ByteString dbKey = encodeDatabaseID(id); - ByteString json = hashGet(KEY_DBs, dbKey); + ByteString dbKey = MetaCodec.encodeDatabaseID(id); + ByteString json = MetaCodec.hashGet(KEY_DBs, dbKey, this.snapshot); if (json == null || json.isEmpty()) { return null; } @@ -279,11 +69,11 @@ TiDBInfo getDatabase(long id) { } List getTables(long dbId) { - ByteString dbKey = encodeDatabaseID(dbId); - List> fields = hashGetFields(dbKey); + ByteString dbKey = MetaCodec.encodeDatabaseID(dbId); + List> fields = MetaCodec.hashGetFields(dbKey, this.snapshot); ImmutableList.Builder builder = ImmutableList.builder(); for (Pair pair : fields) { - if (KeyUtils.hasPrefix(pair.first, ByteString.copyFromUtf8(KEY_TABLE))) { + if (KeyUtils.hasPrefix(pair.first, ByteString.copyFromUtf8(MetaCodec.KEY_TABLE))) { builder.add(parseFromJson(pair.second, TiTableInfo.class)); } } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/codec/MetaCodec.java b/tikv-client/src/main/java/com/pingcap/tikv/codec/MetaCodec.java new file mode 100644 index 0000000000..0d9f4c963f --- /dev/null +++ b/tikv-client/src/main/java/com/pingcap/tikv/codec/MetaCodec.java @@ -0,0 +1,116 @@ +package com.pingcap.tikv.codec; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.protobuf.ByteString; +import com.pingcap.tikv.Snapshot; +import com.pingcap.tikv.codec.Codec.BytesCodec; +import com.pingcap.tikv.codec.Codec.IntegerCodec; +import com.pingcap.tikv.exception.TiClientInternalException; +import com.pingcap.tikv.util.Pair; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.tikv.kvproto.Kvrpcpb; +import org.tikv.kvproto.Kvrpcpb.KvPair; + +public class MetaCodec { + private static final byte[] META_PREFIX = new byte[] {'m'}; + private static final byte HASH_DATA_FLAG = 'h'; + private static final byte HASH_META_FLAG = 'H'; + private static final byte STR_DATA_FLAG = 's'; + + public static ByteString KEY_DBs = ByteString.copyFromUtf8("DBs"); + public static String KEY_TABLE = "Table"; + public static ByteString KEY_SCHEMA_VERSION = ByteString.copyFromUtf8("SchemaVersionKey"); + + public static final String ENCODED_DB_PREFIX = "DB"; + public static final String KEY_TID = "TID"; + + public static void encodeStringDataKey(CodecDataOutput cdo, byte[] key) { + cdo.write(META_PREFIX); + BytesCodec.writeBytes(cdo, key); + IntegerCodec.writeULong(cdo, STR_DATA_FLAG); + } + + public static void encodeHashDataKey(CodecDataOutput cdo, byte[] key, byte[] field) { + cdo.write(META_PREFIX); + BytesCodec.writeBytes(cdo, key); + IntegerCodec.writeULong(cdo, HASH_DATA_FLAG); + BytesCodec.writeBytes(cdo, field); + } + + public static ByteString encodeHashMetaKey(CodecDataOutput cdo, byte[] key) { + cdo.write(META_PREFIX); + BytesCodec.writeBytes(cdo, key); + IntegerCodec.writeULong(cdo, HASH_META_FLAG); + return cdo.toByteString(); + } + + public static void encodeHashDataKeyPrefix(CodecDataOutput cdo, byte[] key) { + cdo.write(META_PREFIX); + BytesCodec.writeBytes(cdo, key); + IntegerCodec.writeULong(cdo, HASH_DATA_FLAG); + } + + public static Pair decodeHashDataKey(ByteString rawKey) { + checkArgument( + KeyUtils.hasPrefix(rawKey, ByteString.copyFrom(META_PREFIX)), + "invalid encoded hash data key prefix: " + new String(META_PREFIX)); + CodecDataInput cdi = new CodecDataInput(rawKey.toByteArray()); + cdi.skipBytes(META_PREFIX.length); + byte[] key = BytesCodec.readBytes(cdi); + long typeFlag = IntegerCodec.readULong(cdi); + if (typeFlag != HASH_DATA_FLAG) { + throw new TiClientInternalException("Invalid hash data flag: " + typeFlag); + } + byte[] field = BytesCodec.readBytes(cdi); + return Pair.create(ByteString.copyFrom(key), ByteString.copyFrom(field)); + } + + public static ByteString autoTableIDKey(long tableId) { + return ByteString.copyFrom(String.format("%s:%d", KEY_TID, tableId).getBytes()); + } + + public static ByteString tableKey(long tableId) { + return ByteString.copyFrom(String.format("%s:%d", KEY_TABLE, tableId).getBytes()); + } + + public static ByteString encodeDatabaseID(long id) { + return ByteString.copyFrom(String.format("%s:%d", ENCODED_DB_PREFIX, id).getBytes()); + } + + public static ByteString hashGet(ByteString key, ByteString field, Snapshot snapshot) { + CodecDataOutput cdo = new CodecDataOutput(); + MetaCodec.encodeHashDataKey(cdo, key.toByteArray(), field.toByteArray()); + return snapshot.get(cdo.toByteString()); + } + + public static ByteString bytesGet(ByteString key, Snapshot snapshot) { + CodecDataOutput cdo = new CodecDataOutput(); + MetaCodec.encodeStringDataKey(cdo, key.toByteArray()); + return snapshot.get(cdo.toByteString()); + } + + public static List> hashGetFields( + ByteString key, Snapshot snapshot) { + CodecDataOutput cdo = new CodecDataOutput(); + MetaCodec.encodeHashDataKeyPrefix(cdo, key.toByteArray()); + ByteString encodedKey = cdo.toByteString(); + + Iterator iterator = snapshot.scan(encodedKey); + List> fields = new ArrayList<>(); + while (iterator.hasNext()) { + Kvrpcpb.KvPair kv = iterator.next(); + if (kv == null || kv.getKey() == null) { + continue; + } + if (!KeyUtils.hasPrefix(kv.getKey(), encodedKey)) { + break; + } + fields.add(Pair.create(MetaCodec.decodeHashDataKey(kv.getKey()).second, kv.getValue())); + } + + return fields; + } +}