diff --git a/proto/data.proto b/proto/data.proto index 4b8510ccc4cb3..60a5d1c1675a4 100644 --- a/proto/data.proto +++ b/proto/data.proto @@ -99,6 +99,8 @@ enum ArrayType { BYTEA = 15; JSONB = 16; SERIAL = 17; + INT256 = 18; + UINT256 = 19; } message Array { diff --git a/src/common/src/array/arrow.rs b/src/common/src/array/arrow.rs index 53102dd60f3b5..fe869ce91bcba 100644 --- a/src/common/src/array/arrow.rs +++ b/src/common/src/array/arrow.rs @@ -438,6 +438,7 @@ impl From<&ListArray> for arrow_array::ListArray { ArrayImpl::Int64(a) => build(array, a, Int64Builder::with_capacity(a.len()), |b, v| { b.append_option(v) }), + ArrayImpl::Float32(a) => { build(array, a, Float32Builder::with_capacity(a.len()), |b, v| { b.append_option(v.map(|f| f.0)) @@ -454,6 +455,8 @@ impl From<&ListArray> for arrow_array::ListArray { StringBuilder::with_capacity(a.len(), a.data().len()), |b, v| b.append_option(v), ), + ArrayImpl::Int256(_a) => todo!(), + ArrayImpl::Uint256(_a) => todo!(), ArrayImpl::Bool(a) => { build(array, a, BooleanBuilder::with_capacity(a.len()), |b, v| { b.append_option(v) diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index 9ce4505397c3c..8bf63b781494b 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -29,6 +29,7 @@ mod iterator; mod jsonb_array; pub mod list_array; mod macros; +mod num256_array; mod primitive_array; pub mod serial_array; pub mod stream_chunk; @@ -66,6 +67,9 @@ pub use utf8_array::*; pub use vis::{Vis, VisRef}; pub use self::error::ArrayError; +pub use crate::array::num256_array::{ + Int256Array, Int256ArrayBuilder, Uint256Array, Uint256ArrayBuilder, +}; use crate::buffer::Bitmap; use crate::types::*; use crate::util::iter_util::ZipEqFast; @@ -338,6 +342,8 @@ macro_rules! for_all_variants { { Int16, int16, I16Array, I16ArrayBuilder }, { Int32, int32, I32Array, I32ArrayBuilder }, { Int64, int64, I64Array, I64ArrayBuilder }, + { Int256, int256, Int256Array, Int256ArrayBuilder }, + { Uint256, uint256, Uint256Array, Uint256ArrayBuilder }, { Float32, float32, F32Array, F32ArrayBuilder }, { Float64, float64, F64Array, F64ArrayBuilder }, { Utf8, utf8, Utf8Array, Utf8ArrayBuilder }, @@ -375,6 +381,18 @@ impl From> for ArrayImpl { } } +impl From for ArrayImpl { + fn from(arr: Int256Array) -> Self { + Self::Int256(arr) + } +} + +impl From for ArrayImpl { + fn from(arr: Uint256Array) -> Self { + Self::Uint256(arr) + } +} + impl From for ArrayImpl { fn from(arr: BoolArray) -> Self { Self::Bool(arr) @@ -695,6 +713,8 @@ impl ArrayImpl { PbArrayType::Bytea => { read_string_array::(array, cardinality)? } + PbArrayType::Int256 => Int256Array::from_protobuf(array, cardinality)?, + PbArrayType::Uint256 => Uint256Array::from_protobuf(array, cardinality)?, }; Ok(array) } diff --git a/src/common/src/array/num256_array.rs b/src/common/src/array/num256_array.rs new file mode 100644 index 0000000000000..0c7574fba87e8 --- /dev/null +++ b/src/common/src/array/num256_array.rs @@ -0,0 +1,206 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::{Cursor, Read}; + +use ethnum::{I256, U256}; +use risingwave_pb::common::buffer::CompressionType; +use risingwave_pb::common::Buffer; +use risingwave_pb::data::PbArray; + +use crate::array::{Array, ArrayBuilder, ArrayImpl, ArrayResult}; +use crate::buffer::{Bitmap, BitmapBuilder}; +use crate::types::num256::{Int256, Int256Ref, Uint256, Uint256Ref}; +use crate::types::Scalar; + +#[derive(Debug)] +pub struct Int256ArrayBuilder { + bitmap: BitmapBuilder, + data: Vec, +} + +#[derive(Debug, Clone)] +pub struct Int256Array { + bitmap: Bitmap, + data: Vec, +} + +#[derive(Debug)] +pub struct Uint256ArrayBuilder { + bitmap: BitmapBuilder, + data: Vec, +} + +#[derive(Debug, Clone)] +pub struct Uint256Array { + bitmap: Bitmap, + data: Vec, +} + +#[rustfmt::skip] +macro_rules! impl_array_for_num256 { + ( + $array:ty, + $array_builder:ty, + $scalar:ident, + $scalar_ref:ident < $gen:tt > , + $variant_name:ident + ) => { + impl Array for $array { + type Builder = $array_builder; + type OwnedItem = $scalar; + type RefItem<$gen> = $scalar_ref<$gen>; + + unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> { + $scalar_ref(self.data.get_unchecked(idx)) + } + + fn len(&self) -> usize { + self.data.len() + } + + fn to_protobuf(&self) -> PbArray { + let mut output_buffer = Vec::::with_capacity(self.len() * $scalar::size()); + + for v in self.iter() { + v.map(|node| node.to_protobuf(&mut output_buffer)); + } + + let buffer = Buffer { + compression: CompressionType::None as i32, + body: output_buffer, + }; + + PbArray { + null_bitmap: Some(self.null_bitmap().to_protobuf()), + values: vec![buffer], + array_type: $scalar::array_type() as i32, + struct_array_data: None, + list_array_data: None, + } + } + + fn null_bitmap(&self) -> &Bitmap { + &self.bitmap + } + + fn into_null_bitmap(self) -> Bitmap { + self.bitmap + } + + fn set_bitmap(&mut self, bitmap: Bitmap) { + self.bitmap = bitmap; + } + + fn create_builder(&self, capacity: usize) -> super::ArrayBuilderImpl { + let array_builder = Self::Builder::new(capacity); + super::ArrayBuilderImpl::$variant_name(array_builder) + } + } + + impl ArrayBuilder for $array_builder { + type ArrayType = $array; + + fn with_meta(capacity: usize, _meta: super::ArrayMeta) -> Self { + Self { + bitmap: BitmapBuilder::with_capacity(capacity), + data: Vec::with_capacity(capacity), + } + } + + fn append_n( + &mut self, + n: usize, + value: Option<::RefItem<'_>>, + ) { + match value { + Some(x) => { + self.bitmap.append_n(n, true); + self.data + .extend(std::iter::repeat(x).take(n).map(|x| x.0.clone())); + } + None => { + self.bitmap.append_n(n, false); + self.data + .extend(std::iter::repeat($scalar::default().into_inner()).take(n)); + } + } + } + + fn append_array(&mut self, other: &Self::ArrayType) { + for bit in other.bitmap.iter() { + self.bitmap.append(bit); + } + self.data.extend_from_slice(&other.data); + } + + fn pop(&mut self) -> Option<()> { + self.data.pop().map(|_| self.bitmap.pop().unwrap()) + } + + fn finish(self) -> Self::ArrayType { + Self::ArrayType { + bitmap: self.bitmap.finish(), + data: self.data, + } + } + } + + impl $array { + pub fn from_protobuf(array: &PbArray, cardinality:usize) -> ArrayResult { + ensure!( + array.get_values().len() == 1, + "Must have only 1 buffer in array" + ); + + let buf = array.get_values()[0].get_body().as_slice(); + + let mut builder = <$array_builder>::new(cardinality); + let bitmap: Bitmap = array.get_null_bitmap()?.into(); + let mut cursor = Cursor::new(buf); + for not_null in bitmap.iter() { + if not_null { + let mut buf = [0u8; $scalar::size()]; + cursor.read_exact(&mut buf)?; + let item = <$scalar>::from_be_bytes(buf); + builder.append(Some(item.as_scalar_ref())); + } else { + builder.append(None); + } + } + let arr = builder.finish(); + ensure_eq!(arr.len(), cardinality); + + Ok(arr.into()) + } + } + + }; +} + +impl_array_for_num256!( + Uint256Array, + Uint256ArrayBuilder, + Uint256, + Uint256Ref<'a>, + Uint256 +); + +impl_array_for_num256!( + Int256Array, + Int256ArrayBuilder, + Int256, + Int256Ref<'a>, + Int256 +); diff --git a/src/common/src/hash/key.rs b/src/common/src/hash/key.rs index 928cb3cbc4873..ebeeb5fc62ea7 100644 --- a/src/common/src/hash/key.rs +++ b/src/common/src/hash/key.rs @@ -37,6 +37,7 @@ use crate::array::{ }; use crate::collection::estimate_size::EstimateSize; use crate::row::{OwnedRow, RowDeserializer}; +use crate::types::num256::{Int256Ref, Uint256Ref}; use crate::types::{DataType, Date, Decimal, ScalarRef, Time, Timestamp, F32, F64}; use crate::util::hash_util::Crc32FastBuilder; use crate::util::iter_util::ZipEqFast; @@ -365,6 +366,30 @@ impl<'a> HashKeySerDe<'a> for &'a str { } } +impl<'a> HashKeySerDe<'a> for Int256Ref<'a> { + type S = [u8; 32]; + + fn serialize(self) -> Self::S { + unimplemented!("HashKeySerDe cannot be implemented for non-primitive types") + } + + fn deserialize(_source: &mut R) -> Self { + unimplemented!("HashKeySerDe cannot be implemented for non-primitive types") + } +} + +impl<'a> HashKeySerDe<'a> for Uint256Ref<'a> { + type S = [u8; 32]; + + fn serialize(self) -> Self::S { + unimplemented!("HashKeySerDe cannot be implemented for non-primitive types") + } + + fn deserialize(_source: &mut R) -> Self { + unimplemented!("HashKeySerDe cannot be implemented for non-primitive types") + } +} + /// Same as str. impl<'a> HashKeySerDe<'a> for &'a [u8] { type S = Vec; diff --git a/src/common/src/test_utils/rand_array.rs b/src/common/src/test_utils/rand_array.rs index 4ad5d0359522c..d45eebaf7572d 100644 --- a/src/common/src/test_utils/rand_array.rs +++ b/src/common/src/test_utils/rand_array.rs @@ -27,6 +27,7 @@ use rand::{Rng, SeedableRng}; use crate::array::serial_array::Serial; use crate::array::{Array, ArrayBuilder, ArrayRef, JsonbVal, ListValue, StructValue}; +use crate::types::num256::{Int256, Uint256}; use crate::types::{Date, Decimal, Interval, NativeType, Scalar, Time, Timestamp}; pub trait RandValue { @@ -118,6 +119,22 @@ impl RandValue for Serial { } } +impl RandValue for Int256 { + fn rand_value(rand: &mut R) -> Self { + let mut bytes = [0u8; 32]; + rand.fill_bytes(&mut bytes); + Int256::from_ne_bytes(bytes) + } +} + +impl RandValue for Uint256 { + fn rand_value(rand: &mut R) -> Self { + let mut bytes = [0u8; 32]; + rand.fill_bytes(&mut bytes); + Uint256::from_ne_bytes(bytes) + } +} + impl RandValue for JsonbVal { fn rand_value(_rand: &mut R) -> Self { JsonbVal::dummy() diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 5dfbc83c4df9a..9f35a305948f4 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -47,7 +47,7 @@ pub mod struct_type; pub mod to_binary; pub mod to_text; -mod num256; +pub mod num256; mod ordered_float; use chrono::{Datelike, NaiveDate, NaiveDateTime, Timelike}; diff --git a/src/common/src/types/num256.rs b/src/common/src/types/num256.rs index c140762a0a6fd..d05b610dc3e00 100644 --- a/src/common/src/types/num256.rs +++ b/src/common/src/types/num256.rs @@ -19,24 +19,26 @@ use std::mem; use bytes::Bytes; use ethnum::{I256, U256}; use postgres_types::{ToSql, Type}; +use risingwave_pb::data::ArrayType; use serde::{Serialize, Serializer}; use to_text::ToText; +use crate::array::ArrayResult; use crate::types::to_binary::ToBinary; use crate::types::{to_text, DataType, Scalar, ScalarRef}; #[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Default, Hash)] pub struct Uint256(Box); #[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)] -pub struct Uint256Ref<'a>(&'a U256); +pub struct Uint256Ref<'a>(pub &'a U256); #[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Default, Hash)] pub struct Int256(Box); #[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)] -pub struct Int256Ref<'a>(&'a I256); +pub struct Int256Ref<'a>(pub &'a I256); macro_rules! impl_common_for_num256 { - ($scalar:ident, $scalar_ref:ident < $gen:tt > , $inner:ty) => { + ($scalar:ident, $scalar_ref:ident < $gen:tt > , $inner:ty, $array_type:ident) => { impl Scalar for $scalar { type ScalarRefType<$gen> = $scalar_ref<$gen>; @@ -67,11 +69,52 @@ macro_rules! impl_common_for_num256 { } } + impl $scalar { + #[inline] + pub fn into_inner(self) -> $inner { + *self.0 + } + + #[inline] + pub const fn size() -> usize { + mem::size_of::<$inner>() + } + + #[inline] + pub fn array_type() -> ArrayType { + ArrayType::$array_type + } + + #[inline] + pub fn from_ne_bytes(bytes: [u8; mem::size_of::<$inner>()]) -> Self { + Self(Box::new(<$inner>::from_ne_bytes(bytes))) + } + + #[inline] + pub fn from_be_bytes(bytes: [u8; mem::size_of::<$inner>()]) -> Self { + Self(Box::new(<$inner>::from_be_bytes(bytes))) + } + } + impl $scalar_ref<'_> { #[inline] pub fn to_le_bytes(self) -> [u8; mem::size_of::<$inner>()] { self.0.to_le_bytes() } + + #[inline] + pub fn to_be_bytes(self) -> [u8; mem::size_of::<$inner>()] { + self.0.to_be_bytes() + } + + #[inline] + pub fn to_ne_bytes(self) -> [u8; mem::size_of::<$inner>()] { + self.0.to_ne_bytes() + } + + pub fn to_protobuf(self, output: &mut T) -> ArrayResult { + output.write(&self.to_be_bytes()).map_err(Into::into) + } } impl ToText for $scalar_ref<'_> { @@ -98,5 +141,5 @@ macro_rules! impl_common_for_num256 { }; } -impl_common_for_num256!(Uint256, Uint256Ref<'a>, U256); -impl_common_for_num256!(Int256, Int256Ref<'a>, I256); +impl_common_for_num256!(Uint256, Uint256Ref<'a>, U256, Uint256); +impl_common_for_num256!(Int256, Int256Ref<'a>, I256, Int256);