Skip to content

Commit

Permalink
feat(common): introduce ArrayBuilder and Array for Int256/Uint256 (ri…
Browse files Browse the repository at this point in the history
…singwavelabs#8989)

Co-authored-by: lmatz <lmatz823@gmail.com>
  • Loading branch information
shanicky and lmatz authored Apr 6, 2023
1 parent 83b7c79 commit 29389e1
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 6 deletions.
2 changes: 2 additions & 0 deletions proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ enum ArrayType {
BYTEA = 15;
JSONB = 16;
SERIAL = 17;
INT256 = 18;
UINT256 = 19;
}

message Array {
Expand Down
3 changes: 3 additions & 0 deletions src/common/src/array/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ impl From<&ListArray> for arrow_array::ListArray {
ArrayImpl::Int64(a) => build(array, a, Int64Builder::with_capacity(a.len()), |b, v| {
b.append_option(v)
}),

ArrayImpl::Float32(a) => {
build(array, a, Float32Builder::with_capacity(a.len()), |b, v| {
b.append_option(v.map(|f| f.0))
Expand All @@ -454,6 +455,8 @@ impl From<&ListArray> for arrow_array::ListArray {
StringBuilder::with_capacity(a.len(), a.data().len()),
|b, v| b.append_option(v),
),
ArrayImpl::Int256(_a) => todo!(),
ArrayImpl::Uint256(_a) => todo!(),
ArrayImpl::Bool(a) => {
build(array, a, BooleanBuilder::with_capacity(a.len()), |b, v| {
b.append_option(v)
Expand Down
20 changes: 20 additions & 0 deletions src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod iterator;
mod jsonb_array;
pub mod list_array;
mod macros;
mod num256_array;
mod primitive_array;
pub mod serial_array;
pub mod stream_chunk;
Expand Down Expand Up @@ -66,6 +67,9 @@ pub use utf8_array::*;
pub use vis::{Vis, VisRef};

pub use self::error::ArrayError;
pub use crate::array::num256_array::{
Int256Array, Int256ArrayBuilder, Uint256Array, Uint256ArrayBuilder,
};
use crate::buffer::Bitmap;
use crate::types::*;
use crate::util::iter_util::ZipEqFast;
Expand Down Expand Up @@ -338,6 +342,8 @@ macro_rules! for_all_variants {
{ Int16, int16, I16Array, I16ArrayBuilder },
{ Int32, int32, I32Array, I32ArrayBuilder },
{ Int64, int64, I64Array, I64ArrayBuilder },
{ Int256, int256, Int256Array, Int256ArrayBuilder },
{ Uint256, uint256, Uint256Array, Uint256ArrayBuilder },
{ Float32, float32, F32Array, F32ArrayBuilder },
{ Float64, float64, F64Array, F64ArrayBuilder },
{ Utf8, utf8, Utf8Array, Utf8ArrayBuilder },
Expand Down Expand Up @@ -375,6 +381,18 @@ impl<T: PrimitiveArrayItemType> From<PrimitiveArray<T>> for ArrayImpl {
}
}

impl From<Int256Array> for ArrayImpl {
fn from(arr: Int256Array) -> Self {
Self::Int256(arr)
}
}

impl From<Uint256Array> for ArrayImpl {
fn from(arr: Uint256Array) -> Self {
Self::Uint256(arr)
}
}

impl From<BoolArray> for ArrayImpl {
fn from(arr: BoolArray) -> Self {
Self::Bool(arr)
Expand Down Expand Up @@ -695,6 +713,8 @@ impl ArrayImpl {
PbArrayType::Bytea => {
read_string_array::<BytesArrayBuilder, BytesValueReader>(array, cardinality)?
}
PbArrayType::Int256 => Int256Array::from_protobuf(array, cardinality)?,
PbArrayType::Uint256 => Uint256Array::from_protobuf(array, cardinality)?,
};
Ok(array)
}
Expand Down
206 changes: 206 additions & 0 deletions src/common/src/array/num256_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::{Cursor, Read};

use ethnum::{I256, U256};
use risingwave_pb::common::buffer::CompressionType;
use risingwave_pb::common::Buffer;
use risingwave_pb::data::PbArray;

use crate::array::{Array, ArrayBuilder, ArrayImpl, ArrayResult};
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::types::num256::{Int256, Int256Ref, Uint256, Uint256Ref};
use crate::types::Scalar;

#[derive(Debug)]
pub struct Int256ArrayBuilder {
bitmap: BitmapBuilder,
data: Vec<I256>,
}

#[derive(Debug, Clone)]
pub struct Int256Array {
bitmap: Bitmap,
data: Vec<I256>,
}

#[derive(Debug)]
pub struct Uint256ArrayBuilder {
bitmap: BitmapBuilder,
data: Vec<U256>,
}

#[derive(Debug, Clone)]
pub struct Uint256Array {
bitmap: Bitmap,
data: Vec<U256>,
}

#[rustfmt::skip]
macro_rules! impl_array_for_num256 {
(
$array:ty,
$array_builder:ty,
$scalar:ident,
$scalar_ref:ident < $gen:tt > ,
$variant_name:ident
) => {
impl Array for $array {
type Builder = $array_builder;
type OwnedItem = $scalar;
type RefItem<$gen> = $scalar_ref<$gen>;

unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> {
$scalar_ref(self.data.get_unchecked(idx))
}

fn len(&self) -> usize {
self.data.len()
}

fn to_protobuf(&self) -> PbArray {
let mut output_buffer = Vec::<u8>::with_capacity(self.len() * $scalar::size());

for v in self.iter() {
v.map(|node| node.to_protobuf(&mut output_buffer));
}

let buffer = Buffer {
compression: CompressionType::None as i32,
body: output_buffer,
};

PbArray {
null_bitmap: Some(self.null_bitmap().to_protobuf()),
values: vec![buffer],
array_type: $scalar::array_type() as i32,
struct_array_data: None,
list_array_data: None,
}
}

fn null_bitmap(&self) -> &Bitmap {
&self.bitmap
}

fn into_null_bitmap(self) -> Bitmap {
self.bitmap
}

fn set_bitmap(&mut self, bitmap: Bitmap) {
self.bitmap = bitmap;
}

fn create_builder(&self, capacity: usize) -> super::ArrayBuilderImpl {
let array_builder = Self::Builder::new(capacity);
super::ArrayBuilderImpl::$variant_name(array_builder)
}
}

impl ArrayBuilder for $array_builder {
type ArrayType = $array;

fn with_meta(capacity: usize, _meta: super::ArrayMeta) -> Self {
Self {
bitmap: BitmapBuilder::with_capacity(capacity),
data: Vec::with_capacity(capacity),
}
}

fn append_n(
&mut self,
n: usize,
value: Option<<Self::ArrayType as Array>::RefItem<'_>>,
) {
match value {
Some(x) => {
self.bitmap.append_n(n, true);
self.data
.extend(std::iter::repeat(x).take(n).map(|x| x.0.clone()));
}
None => {
self.bitmap.append_n(n, false);
self.data
.extend(std::iter::repeat($scalar::default().into_inner()).take(n));
}
}
}

fn append_array(&mut self, other: &Self::ArrayType) {
for bit in other.bitmap.iter() {
self.bitmap.append(bit);
}
self.data.extend_from_slice(&other.data);
}

fn pop(&mut self) -> Option<()> {
self.data.pop().map(|_| self.bitmap.pop().unwrap())
}

fn finish(self) -> Self::ArrayType {
Self::ArrayType {
bitmap: self.bitmap.finish(),
data: self.data,
}
}
}

impl $array {
pub fn from_protobuf(array: &PbArray, cardinality:usize) -> ArrayResult<ArrayImpl> {
ensure!(
array.get_values().len() == 1,
"Must have only 1 buffer in array"
);

let buf = array.get_values()[0].get_body().as_slice();

let mut builder = <$array_builder>::new(cardinality);
let bitmap: Bitmap = array.get_null_bitmap()?.into();
let mut cursor = Cursor::new(buf);
for not_null in bitmap.iter() {
if not_null {
let mut buf = [0u8; $scalar::size()];
cursor.read_exact(&mut buf)?;
let item = <$scalar>::from_be_bytes(buf);
builder.append(Some(item.as_scalar_ref()));
} else {
builder.append(None);
}
}
let arr = builder.finish();
ensure_eq!(arr.len(), cardinality);

Ok(arr.into())
}
}

};
}

impl_array_for_num256!(
Uint256Array,
Uint256ArrayBuilder,
Uint256,
Uint256Ref<'a>,
Uint256
);

impl_array_for_num256!(
Int256Array,
Int256ArrayBuilder,
Int256,
Int256Ref<'a>,
Int256
);
25 changes: 25 additions & 0 deletions src/common/src/hash/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::array::{
};
use crate::collection::estimate_size::EstimateSize;
use crate::row::{OwnedRow, RowDeserializer};
use crate::types::num256::{Int256Ref, Uint256Ref};
use crate::types::{DataType, Date, Decimal, ScalarRef, Time, Timestamp, F32, F64};
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::iter_util::ZipEqFast;
Expand Down Expand Up @@ -365,6 +366,30 @@ impl<'a> HashKeySerDe<'a> for &'a str {
}
}

impl<'a> HashKeySerDe<'a> for Int256Ref<'a> {
type S = [u8; 32];

fn serialize(self) -> Self::S {
unimplemented!("HashKeySerDe cannot be implemented for non-primitive types")
}

fn deserialize<R: Read>(_source: &mut R) -> Self {
unimplemented!("HashKeySerDe cannot be implemented for non-primitive types")
}
}

impl<'a> HashKeySerDe<'a> for Uint256Ref<'a> {
type S = [u8; 32];

fn serialize(self) -> Self::S {
unimplemented!("HashKeySerDe cannot be implemented for non-primitive types")
}

fn deserialize<R: Read>(_source: &mut R) -> Self {
unimplemented!("HashKeySerDe cannot be implemented for non-primitive types")
}
}

/// Same as str.
impl<'a> HashKeySerDe<'a> for &'a [u8] {
type S = Vec<u8>;
Expand Down
17 changes: 17 additions & 0 deletions src/common/src/test_utils/rand_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use rand::{Rng, SeedableRng};

use crate::array::serial_array::Serial;
use crate::array::{Array, ArrayBuilder, ArrayRef, JsonbVal, ListValue, StructValue};
use crate::types::num256::{Int256, Uint256};
use crate::types::{Date, Decimal, Interval, NativeType, Scalar, Time, Timestamp};

pub trait RandValue {
Expand Down Expand Up @@ -118,6 +119,22 @@ impl RandValue for Serial {
}
}

impl RandValue for Int256 {
fn rand_value<R: Rng>(rand: &mut R) -> Self {
let mut bytes = [0u8; 32];
rand.fill_bytes(&mut bytes);
Int256::from_ne_bytes(bytes)
}
}

impl RandValue for Uint256 {
fn rand_value<R: Rng>(rand: &mut R) -> Self {
let mut bytes = [0u8; 32];
rand.fill_bytes(&mut bytes);
Uint256::from_ne_bytes(bytes)
}
}

impl RandValue for JsonbVal {
fn rand_value<R: rand::Rng>(_rand: &mut R) -> Self {
JsonbVal::dummy()
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub mod struct_type;
pub mod to_binary;
pub mod to_text;

mod num256;
pub mod num256;
mod ordered_float;

use chrono::{Datelike, NaiveDate, NaiveDateTime, Timelike};
Expand Down
Loading

0 comments on commit 29389e1

Please sign in to comment.