Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

a better design for get auto table id #980

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/com/pingcap/tispark/TiBatchWrite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ class TiBatchWrite(@transient val df: DataFrame,
.create(
tiDBInfo.getId,
tiTableInfo.getId,
catalog,
tiSession.getConf,
tiTableInfo.isAutoIncColUnsigned,
step
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RowIDAllocatorSuite extends BaseTiSparkTest {
ti.tiSession.getCatalog.getTable(dbName, tableName)
// corner case allocate unsigned long's max value.
val allocator =
RowIDAllocator.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getCatalog, true, -2L)
RowIDAllocator.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getConf, true, -2L)
assert(allocator.getEnd - allocator.getStart == -2L)
}

Expand All @@ -35,11 +35,11 @@ class RowIDAllocatorSuite extends BaseTiSparkTest {
val tiTableInfo =
ti.tiSession.getCatalog.getTable(dbName, tableName)
var allocator =
RowIDAllocator.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getCatalog, false, 1000)
RowIDAllocator.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getConf, false, 1000)
assert(allocator.getEnd - allocator.getStart == 1000)

allocator = RowIDAllocator
.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getCatalog, false, 10000)
.create(tiDBInfo.getId, tiTableInfo.getId, ti.tiSession.getConf, false, 10000)
assert(allocator.getEnd - allocator.getStart == 10000)
}

Expand Down
154 changes: 142 additions & 12 deletions tikv-client/src/main/java/com/pingcap/tikv/allocator/RowIDAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,20 @@
package com.pingcap.tikv.allocator;

import com.google.common.primitives.UnsignedLongs;
import com.pingcap.tikv.catalog.Catalog;
import com.google.protobuf.ByteString;
import com.pingcap.tikv.Snapshot;
import com.pingcap.tikv.TiConfiguration;
import com.pingcap.tikv.TiSession;
import com.pingcap.tikv.TiSessionCache;
import com.pingcap.tikv.TwoPhaseCommitter;
import com.pingcap.tikv.codec.CodecDataInput;
import com.pingcap.tikv.codec.CodecDataOutput;
import com.pingcap.tikv.codec.MetaCodec;
import com.pingcap.tikv.exception.TiBatchWriteException;
import com.pingcap.tikv.util.BackOffer;
import com.pingcap.tikv.util.ConcreteBackOffer;
import java.util.Arrays;
import java.util.function.Function;

/**
* RowIDAllocator read current start from TiKV and write back 'start+step' back to TiKV. It designs
Expand All @@ -26,19 +38,21 @@ public final class RowIDAllocator {
private long end;
private final long dbId;
private long step;
private final TiConfiguration conf;

private RowIDAllocator(long dbId, long step) {
private RowIDAllocator(long dbId, long step, TiConfiguration conf) {
this.dbId = dbId;
this.step = step;
this.conf = conf;
}

public static RowIDAllocator create(
long dbId, long tableId, Catalog catalog, boolean unsigned, long step) {
RowIDAllocator allocator = new RowIDAllocator(dbId, step);
long dbId, long tableId, TiConfiguration conf, boolean unsigned, long step) {
RowIDAllocator allocator = new RowIDAllocator(dbId, step, conf);
if (unsigned) {
allocator.initUnsigned(catalog, tableId);
allocator.initUnsigned(TiSession.create(conf).createSnapshot(), tableId);
} else {
allocator.initSigned(catalog, tableId);
allocator.initSigned(TiSession.create(conf).createSnapshot(), tableId);
}
return allocator;
}
Expand All @@ -51,26 +65,142 @@ public long getEnd() {
return end;
}

private void initSigned(Catalog catalog, long tableId) {
// set key value pair to tikv via two phase committer protocol.
private void set(ByteString key, byte[] value) {
TiSession session = TiSessionCache.getSession(conf);
TwoPhaseCommitter twoPhaseCommitter =
new TwoPhaseCommitter(conf, session.getTimestamp().getVersion());

twoPhaseCommitter.prewritePrimaryKey(
ConcreteBackOffer.newCustomBackOff(BackOffer.PREWRITE_MAX_BACKOFF),
key.toByteArray(),
value);

twoPhaseCommitter.commitPrimaryKey(
ConcreteBackOffer.newCustomBackOff(BackOffer.BATCH_COMMIT_BACKOFF),
key.toByteArray(),
session.getTimestamp().getVersion());
}

private void updateMeta(ByteString key, byte[] oldVal, Snapshot snapshot) {
// 1. encode hash meta key
// 2. load meta via hash meta key from TiKV
// 3. update meta's filed count and set it back to TiKV
CodecDataOutput cdo = new CodecDataOutput();
ByteString metaKey = MetaCodec.encodeHashMetaKey(cdo, key.toByteArray());
long fieldCount;
ByteString metaVal = snapshot.get(metaKey);

// decode long from bytes
// big endian the 8 bytes
fieldCount = new CodecDataInput(metaVal.toByteArray()).readLong();

// update meta field count only oldVal is null
if (oldVal == null || oldVal.length == 0) {
fieldCount++;
cdo.reset();
cdo.writeLong(fieldCount);

set(metaKey, cdo.toBytes());
}
}

private long updateHash(
ByteString key,
ByteString field,
Function<byte[], byte[]> calculateNewVal,
Snapshot snapshot) {
// 1. encode hash data key
// 2. get value in byte from get operation
// 3. calculate new value via calculateNewVal
// 4. check old value equals to new value or not
// 5. set the new value back to TiKV via 2pc
// 6. encode a hash meta key
// 7. update a hash meta field count if needed

CodecDataOutput cdo = new CodecDataOutput();
MetaCodec.encodeHashDataKey(cdo, key.toByteArray(), field.toByteArray());
ByteString dataKey = cdo.toByteString();
byte[] oldVal = snapshot.get(dataKey.toByteArray());

byte[] newVal = calculateNewVal.apply(oldVal);
if (Arrays.equals(newVal, oldVal)) {
// not need to update
return 0L;
}

set(dataKey, newVal);
updateMeta(key, oldVal, snapshot);
return Long.parseLong(new String(newVal));
}

private boolean isDBExisted(long dbId, Snapshot snapshot) {
ByteString dbKey = MetaCodec.encodeDatabaseID(dbId);
ByteString json = MetaCodec.hashGet(MetaCodec.KEY_DBs, dbKey, snapshot);
if (json == null || json.isEmpty()) {
return false;
}
return true;
}

private boolean isTableExisted(long dbId, long tableId, Snapshot snapshot) {
ByteString dbKey = MetaCodec.encodeDatabaseID(dbId);
ByteString tableKey = MetaCodec.tableKey(tableId);
return !MetaCodec.hashGet(dbKey, tableKey, snapshot).isEmpty();
}
/**
* read current row id from TiKV and write the calculated value back to TiKV. The calculation rule
* is start(read from TiKV) + step.
*/
public long getAutoTableId(long dbId, long tableId, long step, Snapshot snapshot) {
if (isDBExisted(dbId, snapshot) && isTableExisted(dbId, tableId, snapshot)) {
return updateHash(
MetaCodec.encodeDatabaseID(dbId),
MetaCodec.autoTableIDKey(tableId),
(oldVal) -> {
long base = 0;
if (oldVal != null && oldVal.length != 0) {
base = Long.parseLong(new String(oldVal));
}

base += step;
return String.valueOf(base).getBytes();
},
snapshot);
}

throw new IllegalArgumentException("table or database is not existed");
}

/** read current row id from TiKV according to database id and table id. */
public long getAutoTableId(long dbId, long tableId, Snapshot snapshot) {
ByteString dbKey = MetaCodec.encodeDatabaseID(dbId);
ByteString tblKey = MetaCodec.autoTableIDKey(tableId);
ByteString val = MetaCodec.hashGet(dbKey, tblKey, snapshot);
if (val.isEmpty()) return 0L;
return Long.parseLong(val.toStringUtf8());
}

private void initSigned(Snapshot snapshot, long tableId) {
long newEnd;
// get new start from TiKV, and calculate new end and set it back to TiKV.
long newStart = catalog.getAutoTableId(dbId, tableId);
long newStart = getAutoTableId(dbId, tableId, snapshot);
long tmpStep = Math.min(Long.MAX_VALUE - newStart, step);
if (tmpStep != step) {
throw new TiBatchWriteException("cannot allocate ids for this write");
}
if (newStart == Long.MAX_VALUE) {
throw new TiBatchWriteException("cannot allocate more ids since it ");
}
newEnd = catalog.getAutoTableId(dbId, tableId, tmpStep);
newEnd = getAutoTableId(dbId, tableId, tmpStep, snapshot);

end = newEnd;
}

private void initUnsigned(Catalog catalog, long tableId) {
private void initUnsigned(Snapshot snapshot, long tableId) {
long newEnd;
// get new start from TiKV, and calculate new end and set it back to TiKV.
long newStart = catalog.getAutoTableId(dbId, tableId);
long newStart = getAutoTableId(dbId, tableId, snapshot);
// for unsigned long, -1L is max value.
long tmpStep = UnsignedLongs.min(-1L - newStart, step);
if (tmpStep != step) {
Expand All @@ -81,7 +211,7 @@ private void initUnsigned(Catalog catalog, long tableId) {
throw new TiBatchWriteException(
"cannot allocate more ids since the start reaches " + "unsigned long's max value ");
}
newEnd = catalog.getAutoTableId(dbId, tableId, tmpStep);
newEnd = getAutoTableId(dbId, tableId, tmpStep, snapshot);

end = newEnd;
}
Expand Down
17 changes: 0 additions & 17 deletions tikv-client/src/main/java/com/pingcap/tikv/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,23 +156,6 @@ public Catalog(
periodUnit);
}

/**
* read current row id from TiKV and write the calculated value back to TiKV. The calculation rule
* is start(read from TiKV) + step.
*/
public synchronized long getAutoTableId(long dbId, long tableId, long step) {
Snapshot snapshot = snapshotProvider.get();
CatalogTransaction newTrx = new CatalogTransaction(snapshot);
return newTrx.getAutoTableId(dbId, tableId, step);
}

/** read current row id from TiKV according to database id and table id. */
public synchronized long getAutoTableId(long dbId, long tableId) {
Snapshot snapshot = snapshotProvider.get();
CatalogTransaction newTrx = new CatalogTransaction(snapshot);
return newTrx.getAutoTableId(dbId, tableId);
}

public synchronized void reloadCache(boolean loadTables) {
Snapshot snapshot = snapshotProvider.get();
CatalogTransaction newTrx = new CatalogTransaction(snapshot);
Expand Down
Loading