Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InventoryTaskSplitter compatible with large integer primary key #32604

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.preparer.inventory;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.Range;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
import org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;

import java.math.BigInteger;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;

/**
* Inventory position calculator.
*/
@NoArgsConstructor(access = AccessLevel.NONE)
public final class InventoryPositionCalculator {

/**
* Get position by integer unique key range.
*
* @param tableRecordsCount table records count
* @param uniqueKeyValuesRange unique key values range
* @param shardingSize sharding size
* @return position collection
*/
public static Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final long tableRecordsCount, final Range<Long> uniqueKeyValuesRange, final long shardingSize) {
if (0 == tableRecordsCount) {
return Collections.singletonList(new IntegerPrimaryKeyIngestPosition(0, 0));
}
Collection<IngestPosition> result = new LinkedList<>();
long splitCount = tableRecordsCount / shardingSize + (tableRecordsCount % shardingSize > 0 ? 1 : 0);
long interval = BigInteger.valueOf(uniqueKeyValuesRange.getMaximum()).subtract(BigInteger.valueOf(uniqueKeyValuesRange.getMinimum())).divide(BigInteger.valueOf(splitCount)).longValue();
IntervalToRangeIterator rangeIterator = new IntervalToRangeIterator(uniqueKeyValuesRange.getMinimum(), uniqueKeyValuesRange.getMaximum(), interval);
while (rangeIterator.hasNext()) {
Range<Long> range = rangeIterator.next();
result.add(new IntegerPrimaryKeyIngestPosition(range.getMinimum(), range.getMaximum()));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,16 @@
import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryPositionCalculator;
import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryRecordsCountCalculator;
import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;

Expand Down Expand Up @@ -122,7 +121,9 @@ private Collection<IngestPosition> getInventoryPositions(final InventoryDumperCo
if (1 == uniqueKeyColumns.size()) {
int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) {
return getPositionByIntegerUniqueKeyRange(dumperContext, tableRecordsCount, jobItemContext);
Range<Long> uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dumperContext);
int shardingSize = jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
return InventoryPositionCalculator.getPositionByIntegerUniqueKeyRange(tableRecordsCount, uniqueKeyValuesRange, shardingSize);
}
if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) {
// TODO Support string unique key table splitting. Ascii characters ordering are different in different versions of databases.
Expand All @@ -132,23 +133,6 @@ private Collection<IngestPosition> getInventoryPositions(final InventoryDumperCo
return Collections.singleton(new UnsupportedKeyIngestPosition());
}

private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final InventoryDumperContext dumperContext, final long tableRecordsCount, final TransmissionJobItemContext jobItemContext) {
if (0L == tableRecordsCount) {
return Collections.singletonList(new IntegerPrimaryKeyIngestPosition(0L, 0L));
}
Collection<IngestPosition> result = new LinkedList<>();
Range<Long> uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dumperContext);
int shardingSize = jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
long splitCount = tableRecordsCount / shardingSize + (tableRecordsCount % shardingSize > 0L ? 1 : 0);
long interval = (uniqueKeyValuesRange.getMaximum() - uniqueKeyValuesRange.getMinimum()) / splitCount;
IntervalToRangeIterator rangeIterator = new IntervalToRangeIterator(uniqueKeyValuesRange.getMinimum(), uniqueKeyValuesRange.getMaximum(), interval);
while (rangeIterator.hasNext()) {
Range<Long> range = rangeIterator.next();
result.add(new IntegerPrimaryKeyIngestPosition(range.getMinimum(), range.getMaximum()));
}
return result;
}

private Range<Long> getUniqueKeyValuesRange(final TransmissionJobItemContext jobItemContext, final InventoryDumperContext dumperContext) {
String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName();
PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.commons.lang3.Range;

import java.math.BigInteger;
import java.util.Iterator;
import java.util.NoSuchElementException;

Expand All @@ -30,11 +31,11 @@
*/
public final class IntervalToRangeIterator implements Iterator<Range<Long>> {

private final long maximum;
private final BigInteger maximum;

private final long interval;
private final BigInteger interval;

private long current;
private BigInteger current;

public IntervalToRangeIterator(final long minimum, final long maximum, final long interval) {
if (minimum > maximum) {
Expand All @@ -43,24 +44,28 @@ public IntervalToRangeIterator(final long minimum, final long maximum, final lon
if (interval < 0L) {
throw new IllegalArgumentException("interval is less than zero");
}
this.maximum = maximum;
this.interval = interval;
current = minimum;
this.maximum = BigInteger.valueOf(maximum);
this.interval = BigInteger.valueOf(interval);
this.current = BigInteger.valueOf(minimum);
}

@Override
public boolean hasNext() {
return current <= maximum;
return current.compareTo(maximum) <= 0;
}

@Override
public Range<Long> next() {
if (!hasNext()) {
throw new NoSuchElementException("");
}
long upperLimit = Math.min(maximum, current + interval);
Range<Long> result = Range.between(current, upperLimit);
current = upperLimit + 1L;
BigInteger upperLimit = min(maximum, current.add(interval));
Range<Long> result = Range.between(current.longValue(), upperLimit.longValue());
current = upperLimit.add(BigInteger.ONE);
return result;
}

private BigInteger min(final BigInteger one, final BigInteger another) {
return one.compareTo(another) < 0 ? one : another;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.preparer.inventory;

import org.apache.commons.lang3.Range;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
import org.junit.jupiter.api.Test;

import java.util.List;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

class InventoryPositionCalculatorTest {

@Test
void assertGetPositionByIntegerUniqueKeyRange() {
List<IngestPosition> actualPositions = (List<IngestPosition>) InventoryPositionCalculator.getPositionByIntegerUniqueKeyRange(200L, Range.between(1L, 600L), 100L);
assertThat(actualPositions.size(), is(2));
for (IngestPosition each : actualPositions) {
assertThat(each, instanceOf(IntegerPrimaryKeyIngestPosition.class));
}
assertPosition(new IntegerPrimaryKeyIngestPosition(1L, 300L), (IntegerPrimaryKeyIngestPosition) actualPositions.get(0));
assertPosition(new IntegerPrimaryKeyIngestPosition(301L, 600L), (IntegerPrimaryKeyIngestPosition) actualPositions.get(1));
}

private void assertPosition(final IntegerPrimaryKeyIngestPosition expected, final IntegerPrimaryKeyIngestPosition actual) {
assertThat(actual.getBeginValue(), is(expected.getBeginValue()));
assertThat(actual.getEndValue(), is(expected.getEndValue()));
}

@Test
void assertGetPositionByIntegerUniqueKeyRangeWithZeroTotalRecordsCount() {
List<IngestPosition> actualPositions = (List<IngestPosition>) InventoryPositionCalculator.getPositionByIntegerUniqueKeyRange(0L, Range.between(0L, 0L), 1L);
assertThat(actualPositions.size(), is(1));
assertThat(actualPositions.get(0), instanceOf(IntegerPrimaryKeyIngestPosition.class));
assertPosition(new IntegerPrimaryKeyIngestPosition(0L, 0L), (IntegerPrimaryKeyIngestPosition) actualPositions.get(0));
}

@Test
void assertGetPositionByIntegerUniqueKeyRangeWithTheSameMinMax() {
List<IngestPosition> actualPositions = (List<IngestPosition>) InventoryPositionCalculator.getPositionByIntegerUniqueKeyRange(200L, Range.between(5L, 5L), 100L);
assertThat(actualPositions.size(), is(1));
assertThat(actualPositions.get(0), instanceOf(IntegerPrimaryKeyIngestPosition.class));
assertPosition(new IntegerPrimaryKeyIngestPosition(5L, 5L), (IntegerPrimaryKeyIngestPosition) actualPositions.get(0));
}

@Test
void assertGetPositionByIntegerUniqueKeyRangeOverflow() {
long tableRecordsCount = Long.MAX_VALUE - 1L;
long shardingSize = tableRecordsCount / 2L;
long minimum = Long.MIN_VALUE + 1L;
long maximum = Long.MAX_VALUE;
List<IngestPosition> actualPositions = (List<IngestPosition>) InventoryPositionCalculator.getPositionByIntegerUniqueKeyRange(
tableRecordsCount, Range.between(minimum, maximum), shardingSize);
assertThat(actualPositions.size(), is(2));
for (IngestPosition each : actualPositions) {
assertThat(each, instanceOf(IntegerPrimaryKeyIngestPosition.class));
}
assertPosition(new IntegerPrimaryKeyIngestPosition(minimum, 0L), (IntegerPrimaryKeyIngestPosition) actualPositions.get(0));
assertPosition(new IntegerPrimaryKeyIngestPosition(1L, maximum), (IntegerPrimaryKeyIngestPosition) actualPositions.get(1));
}
}