Skip to content

Commit

Permalink
a better design for get auto table id (#980)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhexuany authored Aug 2, 2019
1 parent 0586fa6 commit 1973d53
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 256 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/com/pingcap/tispark/TiBatchWrite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ class TiBatchWrite(@transient val df: DataFrame,
.create(
tiDBInfo.getId,
tiTableInfo.getId,
catalog,
tiSession.getConf,
tiTableInfo.isAutoIncColUnsigned,
step
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RowIDAllocatorSuite extends BaseTiSparkTest {
ti.tiSession.getCatalog.getTable(dbName, tableName)
// corner case allocate unsigned long's max value.
val allocator =
RowIDAllocator.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getCatalog, true, -2L)
RowIDAllocator.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getConf, true, -2L)
assert(allocator.getEnd - allocator.getStart == -2L)
}

Expand All @@ -35,11 +35,11 @@ class RowIDAllocatorSuite extends BaseTiSparkTest {
val tiTableInfo =
ti.tiSession.getCatalog.getTable(dbName, tableName)
var allocator =
RowIDAllocator.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getCatalog, false, 1000)
RowIDAllocator.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getConf, false, 1000)
assert(allocator.getEnd - allocator.getStart == 1000)

allocator = RowIDAllocator
.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getCatalog, false, 10000)
.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getConf, false, 10000)
assert(allocator.getEnd - allocator.getStart == 10000)
}

Expand Down
154 changes: 142 additions & 12 deletions tikv-client/src/main/java/com/pingcap/tikv/allocator/RowIDAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,20 @@
package com.pingcap.tikv.allocator;

import com.google.common.primitives.UnsignedLongs;
import com.pingcap.tikv.catalog.Catalog;
import com.google.protobuf.ByteString;
import com.pingcap.tikv.Snapshot;
import com.pingcap.tikv.TiConfiguration;
import com.pingcap.tikv.TiSession;
import com.pingcap.tikv.TiSessionCache;
import com.pingcap.tikv.TwoPhaseCommitter;
import com.pingcap.tikv.codec.CodecDataInput;
import com.pingcap.tikv.codec.CodecDataOutput;
import com.pingcap.tikv.codec.MetaCodec;
import com.pingcap.tikv.exception.TiBatchWriteException;
import com.pingcap.tikv.util.BackOffer;
import com.pingcap.tikv.util.ConcreteBackOffer;
import java.util.Arrays;
import java.util.function.Function;

/**
* RowIDAllocator read current start from TiKV and write back 'start+step' back to TiKV. It designs
Expand All @@ -26,19 +38,21 @@ public final class RowIDAllocator {
private long end;
private final long dbId;
private long step;
private final TiConfiguration conf;

private RowIDAllocator(long dbId, long step) {
private RowIDAllocator(long dbId, long step, TiConfiguration conf) {
this.dbId = dbId;
this.step = step;
this.conf = conf;
}

public static RowIDAllocator create(
long dbId, long tableId, Catalog catalog, boolean unsigned, long step) {
RowIDAllocator allocator = new RowIDAllocator(dbId, step);
long dbId, long tableId, TiConfiguration conf, boolean unsigned, long step) {
RowIDAllocator allocator = new RowIDAllocator(dbId, step, conf);
if (unsigned) {
allocator.initUnsigned(catalog, tableId);
allocator.initUnsigned(TiSession.create(conf).createSnapshot(), tableId);
} else {
allocator.initSigned(catalog, tableId);
allocator.initSigned(TiSession.create(conf).createSnapshot(), tableId);
}
return allocator;
}
Expand All @@ -51,26 +65,142 @@ public long getEnd() {
return end;
}

private void initSigned(Catalog catalog, long tableId) {
// set key value pair to tikv via two phase committer protocol.
private void set(ByteString key, byte[] value) {
TiSession session = TiSessionCache.getSession(conf);
TwoPhaseCommitter twoPhaseCommitter =
new TwoPhaseCommitter(conf, session.getTimestamp().getVersion());

twoPhaseCommitter.prewritePrimaryKey(
ConcreteBackOffer.newCustomBackOff(BackOffer.PREWRITE_MAX_BACKOFF),
key.toByteArray(),
value);

twoPhaseCommitter.commitPrimaryKey(
ConcreteBackOffer.newCustomBackOff(BackOffer.BATCH_COMMIT_BACKOFF),
key.toByteArray(),
session.getTimestamp().getVersion());
}

private void updateMeta(ByteString key, byte[] oldVal, Snapshot snapshot) {
// 1. encode hash meta key
// 2. load meta via hash meta key from TiKV
// 3. update meta's filed count and set it back to TiKV
CodecDataOutput cdo = new CodecDataOutput();
ByteString metaKey = MetaCodec.encodeHashMetaKey(cdo, key.toByteArray());
long fieldCount;
ByteString metaVal = snapshot.get(metaKey);

// decode long from bytes
// big endian the 8 bytes
fieldCount = new CodecDataInput(metaVal.toByteArray()).readLong();

// update meta field count only oldVal is null
if (oldVal == null || oldVal.length == 0) {
fieldCount++;
cdo.reset();
cdo.writeLong(fieldCount);

set(metaKey, cdo.toBytes());
}
}

private long updateHash(
ByteString key,
ByteString field,
Function<byte[], byte[]> calculateNewVal,
Snapshot snapshot) {
// 1. encode hash data key
// 2. get value in byte from get operation
// 3. calculate new value via calculateNewVal
// 4. check old value equals to new value or not
// 5. set the new value back to TiKV via 2pc
// 6. encode a hash meta key
// 7. update a hash meta field count if needed

CodecDataOutput cdo = new CodecDataOutput();
MetaCodec.encodeHashDataKey(cdo, key.toByteArray(), field.toByteArray());
ByteString dataKey = cdo.toByteString();
byte[] oldVal = snapshot.get(dataKey.toByteArray());

byte[] newVal = calculateNewVal.apply(oldVal);
if (Arrays.equals(newVal, oldVal)) {
// not need to update
return 0L;
}

set(dataKey, newVal);
updateMeta(key, oldVal, snapshot);
return Long.parseLong(new String(newVal));
}

private boolean isDBExisted(long dbId, Snapshot snapshot) {
ByteString dbKey = MetaCodec.encodeDatabaseID(dbId);
ByteString json = MetaCodec.hashGet(MetaCodec.KEY_DBs, dbKey, snapshot);
if (json == null || json.isEmpty()) {
return false;
}
return true;
}

private boolean isTableExisted(long dbId, long tableId, Snapshot snapshot) {
ByteString dbKey = MetaCodec.encodeDatabaseID(dbId);
ByteString tableKey = MetaCodec.tableKey(tableId);
return !MetaCodec.hashGet(dbKey, tableKey, snapshot).isEmpty();
}
/**
* read current row id from TiKV and write the calculated value back to TiKV. The calculation rule
* is start(read from TiKV) + step.
*/
public long getAutoTableId(long dbId, long tableId, long step, Snapshot snapshot) {
if (isDBExisted(dbId, snapshot) && isTableExisted(dbId, tableId, snapshot)) {
return updateHash(
MetaCodec.encodeDatabaseID(dbId),
MetaCodec.autoTableIDKey(tableId),
(oldVal) -> {
long base = 0;
if (oldVal != null && oldVal.length != 0) {
base = Long.parseLong(new String(oldVal));
}

base += step;
return String.valueOf(base).getBytes();
},
snapshot);
}

throw new IllegalArgumentException("table or database is not existed");
}

/** read current row id from TiKV according to database id and table id. */
public long getAutoTableId(long dbId, long tableId, Snapshot snapshot) {
ByteString dbKey = MetaCodec.encodeDatabaseID(dbId);
ByteString tblKey = MetaCodec.autoTableIDKey(tableId);
ByteString val = MetaCodec.hashGet(dbKey, tblKey, snapshot);
if (val.isEmpty()) return 0L;
return Long.parseLong(val.toStringUtf8());
}

private void initSigned(Snapshot snapshot, long tableId) {
long newEnd;
// get new start from TiKV, and calculate new end and set it back to TiKV.
long newStart = catalog.getAutoTableId(dbId, tableId);
long newStart = getAutoTableId(dbId, tableId, snapshot);
long tmpStep = Math.min(Long.MAX_VALUE - newStart, step);
if (tmpStep != step) {
throw new TiBatchWriteException("cannot allocate ids for this write");
}
if (newStart == Long.MAX_VALUE) {
throw new TiBatchWriteException("cannot allocate more ids since it ");
}
newEnd = catalog.getAutoTableId(dbId, tableId, tmpStep);
newEnd = getAutoTableId(dbId, tableId, tmpStep, snapshot);

end = newEnd;
}

private void initUnsigned(Catalog catalog, long tableId) {
private void initUnsigned(Snapshot snapshot, long tableId) {
long newEnd;
// get new start from TiKV, and calculate new end and set it back to TiKV.
long newStart = catalog.getAutoTableId(dbId, tableId);
long newStart = getAutoTableId(dbId, tableId, snapshot);
// for unsigned long, -1L is max value.
long tmpStep = UnsignedLongs.min(-1L - newStart, step);
if (tmpStep != step) {
Expand All @@ -81,7 +211,7 @@ private void initUnsigned(Catalog catalog, long tableId) {
throw new TiBatchWriteException(
"cannot allocate more ids since the start reaches " + "unsigned long's max value ");
}
newEnd = catalog.getAutoTableId(dbId, tableId, tmpStep);
newEnd = getAutoTableId(dbId, tableId, tmpStep, snapshot);

end = newEnd;
}
Expand Down
17 changes: 0 additions & 17 deletions tikv-client/src/main/java/com/pingcap/tikv/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,23 +156,6 @@ public Catalog(
periodUnit);
}

/**
* read current row id from TiKV and write the calculated value back to TiKV. The calculation rule
* is start(read from TiKV) + step.
*/
public synchronized long getAutoTableId(long dbId, long tableId, long step) {
Snapshot snapshot = snapshotProvider.get();
CatalogTransaction newTrx = new CatalogTransaction(snapshot);
return newTrx.getAutoTableId(dbId, tableId, step);
}

/** read current row id from TiKV according to database id and table id. */
public synchronized long getAutoTableId(long dbId, long tableId) {
Snapshot snapshot = snapshotProvider.get();
CatalogTransaction newTrx = new CatalogTransaction(snapshot);
return newTrx.getAutoTableId(dbId, tableId);
}

public synchronized void reloadCache(boolean loadTables) {
Snapshot snapshot = snapshotProvider.get();
CatalogTransaction newTrx = new CatalogTransaction(snapshot);
Expand Down
Loading

0 comments on commit 1973d53

Please sign in to comment.