Skip to content

Commit

Permalink
fix ByteBuffer jdk version error and support unique table and largeint
Browse files Browse the repository at this point in the history
  • Loading branch information
bingquanzhao committed Aug 17, 2023
1 parent efa135f commit e0cf5a6
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 14 deletions.
2 changes: 1 addition & 1 deletion docs/en/connector-v2/source/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ directly return the data
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| BIGINT | BIGINT |
| LARGEINT | `NOT SUPPORT` |
| LARGEINT | STRING |
| BOOLEAN | BOOLEAN |
| DECIMAL | DECIMAL((Get the designated column's specified column size)+1,<br/>(Gets the designated column's number of digits to right of the decimal point.))) |
| FLOAT | FLOAT |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -68,15 +69,15 @@ public void stopBufferData() throws IOException {
// add Empty buffer as finish flag.
boolean isEmpty = false;
if (currentWriteBuffer != null) {
currentWriteBuffer.flip();
((Buffer) currentWriteBuffer).flip();
// check if the current write buffer is empty.
isEmpty = currentWriteBuffer.limit() == 0;
readQueue.put(currentWriteBuffer);
currentWriteBuffer = null;
}
if (!isEmpty) {
ByteBuffer byteBuffer = writeQueue.take();
byteBuffer.flip();
((Buffer) byteBuffer).flip();
checkState(byteBuffer.limit() == 0);
readQueue.put(byteBuffer);
}
Expand All @@ -96,7 +97,7 @@ public void write(byte[] buf) throws InterruptedException {
currentWriteBuffer.put(buf, wPos, nWrite);
wPos += nWrite;
if (currentWriteBuffer.remaining() == 0) {
currentWriteBuffer.flip();
((Buffer) currentWriteBuffer).flip();
readQueue.put(currentWriteBuffer);
currentWriteBuffer = null;
}
Expand Down Expand Up @@ -125,7 +126,7 @@ public int read(byte[] buf) throws InterruptedException {
}

private void recycleBuffer(ByteBuffer buffer) throws InterruptedException {
buffer.clear();
((Buffer) buffer).clear();
writeQueue.put(buffer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -89,7 +88,13 @@ public RowBatch readArrow() {
this.root = arrowStreamReader.getVectorSchemaRoot();
while (arrowStreamReader.loadNextBatch()) {
fieldVectors = root.getFieldVectors();

// 适配 unique 模型隐藏列
for (int i = 0; i < fieldVectors.size(); i++) {
String fieldName = fieldVectors.get(i).getField().getName();
if (fieldName.equals("__DORIS_DELETE_SIGN__")) {
fieldVectors.remove(fieldVectors.get(i));
}
}
if (fieldVectors.size() != fieldTypes.length) {
log.error(
"Schema size '{}' is not equal to arrow field size '{}'.",
Expand Down Expand Up @@ -285,10 +290,15 @@ private void convertArrowValue(
return null;
}
byte[] bytes = fixedSizeBinaryVector.get(rowIndex);
new String(bytes, StandardCharsets.UTF_8);
BigInteger value = new BigInteger(bytes);
System.out.println(value);
return value.toString();
int left = 0, right = bytes.length - 1;
while (left < right) {
byte temp = bytes[left];
bytes[left] = bytes[right];
bytes[right] = temp;
left++;
right--;
}
return new BigInteger(bytes).toString();
});
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public class DorisIT extends TestSuiteBase implements TestResource {
+ " F_TINYINT,\n"
+ " F_SMALLINT,\n"
+ " F_DECIMAL,\n"
+ " F_LARGEINT,\n"
+ " F_BOOLEAN,\n"
+ " F_DOUBLE,\n"
+ " F_FLOAT,\n"
Expand All @@ -108,11 +109,11 @@ public class DorisIT extends TestSuiteBase implements TestResource {
+ " F_DATETIME,\n"
+ " F_DATE\n"
+ ")values(\n"
+ "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?,?\n"
+ "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?\n"
+ ")";

private final String COLUMN_STRING =
"F_ID, F_INT, F_BIGINT, F_TINYINT, F_SMALLINT, F_DECIMAL, F_BOOLEAN, F_DOUBLE, F_FLOAT, "
"F_ID, F_INT, F_BIGINT, F_TINYINT, F_SMALLINT, F_DECIMAL, F_LARGEINT, F_BOOLEAN, F_DOUBLE, F_FLOAT, "
+ "F_CHAR, F_VARCHAR_11, F_STRING, F_DATETIME_P, F_DATETIME, F_DATE";

@BeforeAll
Expand Down Expand Up @@ -274,6 +275,7 @@ private String createTableForTest(String db) {
+ "F_TINYINT tinyint null,\n"
+ "F_SMALLINT smallint null,\n"
+ "F_DECIMAL decimal(18,6) null,\n"
+ "F_LARGEINT largeint null,\n"
+ "F_BOOLEAN boolean null,\n"
+ "F_DOUBLE double null,\n"
+ "F_FLOAT float null,\n"
Expand Down Expand Up @@ -326,6 +328,7 @@ private List<SeaTunnelRow> genDorisTestData(Long nums) {
GenerateTestData.genTinyint(),
GenerateTestData.genSmallint(),
GenerateTestData.genBigDecimal(18, 6),
GenerateTestData.genBigInteger(126),
GenerateTestData.genBoolean(),
GenerateTestData.genDouble(),
GenerateTestData.genFloat(0, 1000),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static float genFloat(float min, float max) {
}

public static BigInteger genBigInteger(int bits) {
if (bits > 128) bits = 128;
if (bits > 128) bits = 127;
return new BigInteger(bits, ThreadLocalRandom.current());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ source{
F_TINYINT = "TINYINT"
F_SMALLINT = "SMALLINT"
F_DECIMAL = "DECIMAL(18,6)"
F_LARGEINT = "STRING"
F_BOOLEAN = "BOOLEAN"
F_DOUBLE = "DOUBLE"
F_FLOAT = "FLOAT"
Expand Down

0 comments on commit e0cf5a6

Please sign in to comment.