Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Borrow -> AsRef
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 28, 2021
1 parent c070ad7 commit a0358be
Show file tree
Hide file tree
Showing 14 changed files with 58 additions and 59 deletions.
24 changes: 14 additions & 10 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use std::fs::File;
use std::sync::Arc;
use std::{collections::HashMap, io::Read};

use arrow2::array::Array;
use arrow2::io::ipc::IpcField;
use arrow2::{
columns::Columns,
datatypes::{DataType, Schema},
error::Result,
io::{
Expand All @@ -13,15 +15,17 @@ use arrow2::{
write_file, Compression, Encoding, RowGroupIterator, Version, WriteOptions,
},
},
record_batch::RecordBatch,
};

use clap::{App, Arg};

use flate2::read::GzDecoder;

/// Read gzipped JSON file
fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec<IpcField>, Vec<RecordBatch>) {
pub fn read_gzip_json(
version: &str,
file_name: &str,
) -> Result<(Schema, Vec<IpcField>, Vec<Columns<Arc<dyn Array>>>)> {
let path = format!(
"../testing/arrow-testing/data/arrow-ipc-stream/integration/{}/{}.json.gz",
version, file_name
Expand All @@ -31,10 +35,11 @@ fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec<IpcField>, Vec
let mut s = String::new();
gz.read_to_string(&mut s).unwrap();
// convert to Arrow JSON
let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
let arrow_json: ArrowJson = serde_json::from_str(&s)?;

let schema = serde_json::to_value(arrow_json.schema).unwrap();
let (schema, ipc_fields) = read::deserialize_schema(&schema).unwrap();

let (schema, ipc_fields) = read::deserialize_schema(&schema)?;

// read dictionaries
let mut dictionaries = HashMap::new();
Expand All @@ -48,11 +53,10 @@ fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec<IpcField>, Vec
let batches = arrow_json
.batches
.iter()
.map(|batch| read::to_record_batch(&schema, &ipc_fields, batch, &dictionaries))
.collect::<Result<Vec<_>>>()
.unwrap();
.map(|batch| read::deserialize_columns(&schema, &ipc_fields, batch, &dictionaries))
.collect::<Result<Vec<_>>>()?;

(schema, ipc_fields, batches)
Ok((schema, ipc_fields, batches))
}

fn main() -> Result<()> {
Expand Down Expand Up @@ -108,7 +112,7 @@ fn main() -> Result<()> {
.collect::<Vec<_>>()
});

let (schema, _, batches) = read_gzip_json("1.0.0-littleendian", json_file);
let (schema, _, batches) = read_gzip_json("1.0.0-littleendian", json_file)?;

let schema = if let Some(projection) = &projection {
let fields = schema
Expand Down Expand Up @@ -144,7 +148,7 @@ fn main() -> Result<()> {
}
})
.collect();
RecordBatch::try_new(Arc::new(schema.clone()), columns).unwrap()
Columns::try_new(columns).unwrap()
})
.collect::<Vec<_>>()
} else {
Expand Down
5 changes: 1 addition & 4 deletions examples/csv_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use arrow2::{
io::csv::write,
};

fn write_batch<A: std::borrow::Borrow<dyn Array>>(
path: &str,
columns: &[Columns<A>],
) -> Result<()> {
fn write_batch<A: AsRef<dyn Array>>(path: &str, columns: &[Columns<A>]) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_path(path)?;

write::write_header(writer, &["c1"])?;
Expand Down
8 changes: 8 additions & 0 deletions src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,14 @@ pub fn clone(array: &dyn Array) -> Box<dyn Array> {
}
}

// see https://users.rust-lang.org/t/generic-for-dyn-a-or-box-dyn-a-or-arc-dyn-a/69430/3
// for details
impl<'a> AsRef<(dyn Array + 'a)> for dyn Array {
fn as_ref(&self) -> &(dyn Array + 'a) {
self
}
}

mod binary;
mod boolean;
mod dictionary;
Expand Down
14 changes: 7 additions & 7 deletions src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use crate::error::{ArrowError, Result};
/// A vector of trait objects of [`Array`] where every item has
/// the same length, [`Columns::len`].
#[derive(Debug, Clone, PartialEq)]
pub struct Columns<A: std::borrow::Borrow<dyn Array>> {
pub struct Columns<A: AsRef<dyn Array>> {
arrays: Vec<A>,
}

impl<A: std::borrow::Borrow<dyn Array>> Columns<A> {
impl<A: AsRef<dyn Array>> Columns<A> {
/// Creates a new [`Columns`].
/// # Panic
/// Iff the arrays do not have the same length
Expand All @@ -24,10 +24,10 @@ impl<A: std::borrow::Borrow<dyn Array>> Columns<A> {
/// Iff the arrays do not have the same length
pub fn try_new(arrays: Vec<A>) -> Result<Self> {
if !arrays.is_empty() {
let len = arrays.first().unwrap().borrow().len();
let len = arrays.first().unwrap().as_ref().len();
if arrays
.iter()
.map(|array| array.borrow())
.map(|array| array.as_ref())
.any(|array| array.len() != len)
{
return Err(ArrowError::InvalidArgumentError(
Expand All @@ -52,7 +52,7 @@ impl<A: std::borrow::Borrow<dyn Array>> Columns<A> {
pub fn len(&self) -> usize {
self.arrays
.first()
.map(|x| x.borrow().len())
.map(|x| x.as_ref().len())
.unwrap_or_default()
}

Expand All @@ -68,13 +68,13 @@ impl<A: std::borrow::Borrow<dyn Array>> Columns<A> {
}
}

impl<A: std::borrow::Borrow<dyn Array>> From<Columns<A>> for Vec<A> {
impl<A: AsRef<dyn Array>> From<Columns<A>> for Vec<A> {
fn from(c: Columns<A>) -> Self {
c.into_arrays()
}
}

impl<A: std::borrow::Borrow<dyn Array>> std::ops::Deref for Columns<A> {
impl<A: AsRef<dyn Array>> std::ops::Deref for Columns<A> {
type Target = [A];

#[inline]
Expand Down
6 changes: 3 additions & 3 deletions src/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ pub fn filter(array: &dyn Array, filter: &BooleanArray) -> Result<Box<dyn Array>

/// Returns a new [Columns] with arrays containing only values matching the filter.
/// This is a convenience function: filter multiple columns is embarassingly parallel.
pub fn filter_columns<A: std::borrow::Borrow<dyn Array>>(
pub fn filter_columns<A: AsRef<dyn Array>>(
columns: &Columns<A>,
filter_values: &BooleanArray,
) -> Result<Columns<Box<dyn Array>>> {
Expand All @@ -164,11 +164,11 @@ pub fn filter_columns<A: std::borrow::Borrow<dyn Array>>(

let filtered_arrays = match num_colums {
1 => {
vec![filter(columns.arrays()[0].borrow(), filter_values)?]
vec![filter(columns.arrays()[0].as_ref(), filter_values)?]
}
_ => {
let filter = build_filter(filter_values)?;
arrays.iter().map(|a| filter(a.borrow())).collect()
arrays.iter().map(|a| filter(a.as_ref())).collect()
}
};
Columns::try_new(filtered_arrays)
Expand Down
8 changes: 4 additions & 4 deletions src/io/csv/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@ use crate::error::Result;

/// Creates serializers that iterate over each column that serializes each item according
/// to `options`.
fn new_serializers<'a, A: std::borrow::Borrow<dyn Array>>(
fn new_serializers<'a, A: AsRef<dyn Array>>(
columns: &'a [A],
options: &'a SerializeOptions,
) -> Result<Vec<Box<dyn StreamingIterator<Item = [u8]> + 'a>>> {
columns
.iter()
.map(|column| new_serializer(column.borrow(), options))
.map(|column| new_serializer(column.as_ref(), options))
.collect()
}

/// Serializes [`Columns`] to a vector of `ByteRecord`.
/// The vector is guaranteed to have `columns.len()` entries.
/// Each `ByteRecord` is guaranteed to have `columns.array().len()` fields.
pub fn serialize<A: std::borrow::Borrow<dyn Array>>(
pub fn serialize<A: AsRef<dyn Array>>(
columns: &Columns<A>,
options: &SerializeOptions,
) -> Result<Vec<ByteRecord>> {
Expand All @@ -47,7 +47,7 @@ pub fn serialize<A: std::borrow::Borrow<dyn Array>>(
}

/// Writes [`Columns`] to `writer` according to the serialization options `options`.
pub fn write_columns<W: Write, A: std::borrow::Borrow<dyn Array>>(
pub fn write_columns<W: Write, A: AsRef<dyn Array>>(
writer: &mut Writer<W>,
columns: &Columns<A>,
options: &SerializeOptions,
Expand Down
6 changes: 3 additions & 3 deletions src/io/json/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ where
pub struct Serializer<F, A, I>
where
F: JsonFormat,
A: std::borrow::Borrow<dyn Array>,
A: AsRef<dyn Array>,
I: Iterator<Item = Result<Columns<A>>>,
{
batches: I,
Expand All @@ -47,7 +47,7 @@ where
impl<F, A, I> Serializer<F, A, I>
where
F: JsonFormat,
A: std::borrow::Borrow<dyn Array>,
A: AsRef<dyn Array>,
I: Iterator<Item = Result<Columns<A>>>,
{
/// Creates a new [`Serializer`].
Expand All @@ -64,7 +64,7 @@ where
impl<F, A, I> FallibleStreamingIterator for Serializer<F, A, I>
where
F: JsonFormat,
A: std::borrow::Borrow<dyn Array>,
A: AsRef<dyn Array>,
I: Iterator<Item = Result<Columns<A>>>,
{
type Item = [u8];
Expand Down
4 changes: 2 additions & 2 deletions src/io/json/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,15 @@ fn serialize_item<F: JsonFormat>(
pub fn serialize<N, A, F>(names: &[N], columns: &Columns<A>, format: F, buffer: &mut Vec<u8>)
where
N: AsRef<str>,
A: std::borrow::Borrow<dyn Array>,
A: AsRef<dyn Array>,
F: JsonFormat,
{
let num_rows = columns.len();

let mut serializers: Vec<_> = columns
.arrays()
.iter()
.map(|array| new_serializer(array.borrow()))
.map(|array| new_serializer(array.as_ref()))
.collect();

let mut is_first_row = true;
Expand Down
13 changes: 4 additions & 9 deletions src/io/parquet/write/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,14 @@ use crate::{
/// An iterator adapter that converts an iterator over [`Columns`] into an iterator
/// of row groups.
/// Use it to create an iterator consumable by the parquet's API.
pub struct RowGroupIterator<
A: std::borrow::Borrow<dyn Array> + 'static,
I: Iterator<Item = Result<Columns<A>>>,
> {
pub struct RowGroupIterator<A: AsRef<dyn Array> + 'static, I: Iterator<Item = Result<Columns<A>>>> {
iter: I,
options: WriteOptions,
parquet_schema: SchemaDescriptor,
encodings: Vec<Encoding>,
}

impl<A: std::borrow::Borrow<dyn Array> + 'static, I: Iterator<Item = Result<Columns<A>>>>
RowGroupIterator<A, I>
{
impl<A: AsRef<dyn Array> + 'static, I: Iterator<Item = Result<Columns<A>>>> RowGroupIterator<A, I> {
/// Creates a new [`RowGroupIterator`] from an iterator over [`Columns`].
pub fn try_new(
iter: I,
Expand All @@ -53,7 +48,7 @@ impl<A: std::borrow::Borrow<dyn Array> + 'static, I: Iterator<Item = Result<Colu
}
}

impl<A: std::borrow::Borrow<dyn Array> + 'static, I: Iterator<Item = Result<Columns<A>>>> Iterator
impl<A: AsRef<dyn Array> + 'static, I: Iterator<Item = Result<Columns<A>>>> Iterator
for RowGroupIterator<A, I>
{
type Item = Result<RowGroupIter<'static, ArrowError>>;
Expand All @@ -71,7 +66,7 @@ impl<A: std::borrow::Borrow<dyn Array> + 'static, I: Iterator<Item = Result<Colu
.zip(self.parquet_schema.columns().to_vec().into_iter())
.zip(encodings.into_iter())
.map(move |((array, descriptor), encoding)| {
array_to_pages(array.borrow(), descriptor, options, encoding).map(
array_to_pages(array.as_ref(), descriptor, options, encoding).map(
move |pages| {
let encoded_pages = DynIter::new(pages.map(|x| Ok(x?)));
let compressed_pages =
Expand Down
7 changes: 2 additions & 5 deletions src/io/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ use crate::{
use comfy_table::{Cell, Table};

/// Returns a visual representation of [`Columns`]
pub fn write<A: std::borrow::Borrow<dyn Array>, N: AsRef<str>>(
batches: &[Columns<A>],
names: &[N],
) -> String {
pub fn write<A: AsRef<dyn Array>, N: AsRef<str>>(batches: &[Columns<A>], names: &[N]) -> String {
let mut table = Table::new();
table.load_preset("||--+-++| ++++++");

Expand All @@ -26,7 +23,7 @@ pub fn write<A: std::borrow::Borrow<dyn Array>, N: AsRef<str>>(
let displayes = batch
.arrays()
.iter()
.map(|array| get_display(array.borrow()))
.map(|array| get_display(array.as_ref()))
.collect::<Vec<_>>();

for row in 0..batch.len() {
Expand Down
4 changes: 2 additions & 2 deletions tests/it/io/avro/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn data() -> Columns<Arc<dyn Array>> {

use super::read::read_avro;

fn write_avro<R: std::borrow::Borrow<dyn Array>>(
fn write_avro<R: AsRef<dyn Array>>(
columns: &Columns<R>,
schema: &Schema,
compression: Option<write::Compression>,
Expand All @@ -46,7 +46,7 @@ fn write_avro<R: std::borrow::Borrow<dyn Array>>(
let mut serializers = columns
.arrays()
.iter()
.map(|x| x.borrow())
.map(|x| x.as_ref())
.zip(avro_fields.iter())
.map(|(array, field)| write::new_serializer(array, &field.schema))
.collect::<Vec<_>>();
Expand Down
12 changes: 4 additions & 8 deletions tests/it/io/ipc/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ use arrow2::{

use flate2::read::GzDecoder;

type IpcRead = (Schema, Vec<IpcField>, Vec<Columns<Arc<dyn Array>>>);

/// Read gzipped JSON file
pub fn read_gzip_json(
version: &str,
file_name: &str,
) -> Result<(Schema, Vec<IpcField>, Vec<Columns<Arc<dyn Array>>>)> {
pub fn read_gzip_json(version: &str, file_name: &str) -> Result<IpcRead> {
let testdata = crate::test_util::arrow_test_data();
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.json.gz",
Expand Down Expand Up @@ -47,10 +46,7 @@ pub fn read_gzip_json(
Ok((schema, ipc_fields, batches))
}

pub fn read_arrow_stream(
version: &str,
file_name: &str,
) -> (Schema, Vec<IpcField>, Vec<Columns<Arc<dyn Array>>>) {
pub fn read_arrow_stream(version: &str, file_name: &str) -> IpcRead {
let testdata = crate::test_util::arrow_test_data();
let mut file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.stream",
Expand Down
2 changes: 1 addition & 1 deletion tests/it/io/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn read_batch(data: String, fields: &[Field]) -> Result<Columns<Arc<dyn Array>>>
json_read::deserialize(rows, fields)
}

fn write_batch<A: std::borrow::Borrow<dyn Array>>(
fn write_batch<A: AsRef<dyn Array>>(
batch: Columns<A>,
names: Vec<String>,
) -> Result<Vec<u8>> {
Expand Down
4 changes: 3 additions & 1 deletion tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,9 @@ fn integration_write(schema: &Schema, batches: &[Columns<Arc<dyn Array>>]) -> Re
Ok(writer.into_inner())
}

fn integration_read(data: &[u8]) -> Result<(Arc<Schema>, Vec<Columns<Arc<dyn Array>>>)> {
type IntegrationRead = (Arc<Schema>, Vec<Columns<Arc<dyn Array>>>);

fn integration_read(data: &[u8]) -> Result<IntegrationRead> {
let reader = Cursor::new(data);
let reader = RecordReader::try_new(reader, None, None, None, None)?;
let schema = reader.schema().clone();
Expand Down

0 comments on commit a0358be

Please sign in to comment.