Skip to content

Commit

Permalink
Rebase multifile merge fix (#66)
Browse files Browse the repository at this point in the history
- Consider `sorted` option by WINDOW functions.

- Fix memory usage by `DefaultParquetReader` during JSON data from Parquet files

- Remove `force_multifile_merge` GUC variable and use estimated number of rows to calculate merge cost. The current version of parquet_fdw uses different number of rows to calculate merge cost and to pass to PostgreSQL planner. This PR uses same number of rows which is passed to PostgreSQL planner.
  • Loading branch information
za-arthur authored Aug 27, 2024
1 parent e6afbfd commit d15664e
Show file tree
Hide file tree
Showing 10 changed files with 2,153 additions and 254 deletions.
95 changes: 60 additions & 35 deletions src/parquet_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ extern "C"
#else
#include "catalog/pg_am_d.h"
#endif

#if PG_VERSION_NUM >= 120000
#include "nodes/pathnodes.h"
#endif
}


Expand Down Expand Up @@ -1203,9 +1207,10 @@ parquetGetForeignPaths(PlannerInfo *root,
Cost run_cost;
bool is_sorted, is_multi;
List *pathkeys = NIL;
List *source_pathkeys = NIL;
std::list<RowGroupFilter> filters;
ListCell *lc;
ListCell *lc2;
ListCell *lc_sorted;
ListCell *lc_rootsort;

fdw_private = (ParquetFdwPlanState *) baserel->fdw_private;

Expand All @@ -1221,7 +1226,7 @@ parquetGetForeignPaths(PlannerInfo *root,

/*
* Build pathkeys for the foreign table based on attrs_sorted and ORDER BY
* clause passed by user.
* (or WINDOW FUNCTION) clause passed by user.
*
* pathkeys is used by Postgres to sort the result. After we build pathkeys
* for the foreign table Postgres will assume that the returned data is
Expand All @@ -1233,19 +1238,21 @@ parquetGetForeignPaths(PlannerInfo *root,
* that an attribute on ORDER BY and "sorted" doesn't match, since in that
* case Postgres will need to sort by remaining attributes by itself.
*/
forboth (lc, fdw_private->attrs_sorted, lc2, root->sort_pathkeys)
source_pathkeys = root->query_pathkeys;
lc_rootsort = list_head(source_pathkeys);

foreach (lc_sorted, fdw_private->attrs_sorted)
{
PathKey *root_pathkey = (PathKey *) lfirst(lc2);
Oid relid = root->simple_rte_array[baserel->relid]->relid;
int attnum = lfirst_int(lc);
int attnum = lfirst_int(lc_sorted);
Oid typid,
collid;
int32 typmod;
Oid sort_op;
Var *var;
List *attr_pathkeys;

if (root_pathkey->pk_eclass->ec_has_volatile)
if (lc_rootsort == NULL)
break;

/* Build an expression (simple var) for the attribute */
Expand All @@ -1262,29 +1269,52 @@ parquetGetForeignPaths(PlannerInfo *root,
#if PG_VERSION_NUM >= 160000
attr_pathkeys = build_expression_pathkey(root, (Expr *) var,
sort_op, baserel->relids,
true);
false);
#else
attr_pathkeys = build_expression_pathkey(root, (Expr *) var, NULL,
sort_op, baserel->relids,
true);
false);
#endif

if (attr_pathkeys != NIL)
if (attr_pathkeys == NIL)
break;
else
{
PathKey *attr_pathkey = (PathKey *) linitial(attr_pathkeys);
bool is_redundant = false;

/*
* Compare the attribute from "sorted" option and the attribute from
* ORDER BY clause ("root"). If they don't match stop here and use
* whatever pathkeys we've build so far. Postgres will use remaining
* attributes from ORDER BY clause to sort data on higher level of
* execution.
*/
if (!equal(attr_pathkey, root_pathkey))
break;
}
if (EC_MUST_BE_REDUNDANT(attr_pathkey->pk_eclass))
is_redundant = true;

if (lc_rootsort != NULL)
{
PathKey *root_pathkey = (PathKey *) lfirst(lc_rootsort);

/*
* Compare the attribute from "sorted" option and the attribute from
* ORDER BY clause ("root"). If they don't match stop here and use
* whatever pathkeys we've build so far. Postgres will use remaining
* attributes from ORDER BY clause to sort data on higher level of
* execution.
*/
if (!equal(attr_pathkey, root_pathkey))
{
if (!is_redundant)
break;
}
else
{
#if PG_VERSION_NUM < 130000
lc_rootsort = lnext(lc_rootsort);
#else
lc_rootsort = lnext(source_pathkeys, lc_rootsort);
#endif
}
}

pathkeys = list_concat(pathkeys, attr_pathkeys);
if (!is_redundant)
pathkeys = list_concat(pathkeys, attr_pathkeys);
}
}

foreign_path = (Path *) create_foreignscan_path(root, baserel,
Expand All @@ -1299,7 +1329,7 @@ parquetGetForeignPaths(PlannerInfo *root,
if (!enable_multifile && is_multi)
foreign_path->total_cost += disable_cost;

add_path(baserel, (Path *) foreign_path);
add_path(baserel, foreign_path);

if (fdw_private->type == RT_TRIVIAL)
return;
Expand Down Expand Up @@ -1330,7 +1360,7 @@ parquetGetForeignPaths(PlannerInfo *root,
RT_CACHING_MULTI_MERGE : RT_MULTI_MERGE;

cost_merge((Path *) path, list_length(private_sort->filenames),
startup_cost, total_cost, private_sort->matched_rows);
startup_cost, total_cost, path->rows);

if (!enable_multifile_merge)
path->total_cost += disable_cost;
Expand All @@ -1342,7 +1372,9 @@ parquetGetForeignPaths(PlannerInfo *root,
if (baserel->consider_parallel > 0)
{
ParquetFdwPlanState *private_parallel;
bool use_pathkeys = false;
bool use_pathkeys = false;
int num_workers = max_parallel_workers_per_gather;
double rows_per_worker = baserel->rows / (num_workers + 1);

private_parallel = (ParquetFdwPlanState *) palloc(sizeof(ParquetFdwPlanState));
memcpy(private_parallel, fdw_private, sizeof(ParquetFdwPlanState));
Expand All @@ -1354,18 +1386,14 @@ parquetGetForeignPaths(PlannerInfo *root,
Path *path = (Path *)
create_foreignscan_path(root, baserel,
NULL, /* default pathtarget */
baserel->rows,
rows_per_worker,
startup_cost,
total_cost,
startup_cost + run_cost / (num_workers + 1),
use_pathkeys ? pathkeys : NULL,
NULL, /* no outer rel either */
NULL, /* no extra plan */
(List *) private_parallel);

int num_workers = max_parallel_workers_per_gather;

path->rows = path->rows / (num_workers + 1);
path->total_cost = startup_cost + run_cost / (num_workers + 1);
path->parallel_workers = num_workers;
path->parallel_aware = true;
path->parallel_safe = true;
Expand All @@ -1389,20 +1417,17 @@ parquetGetForeignPaths(PlannerInfo *root,
path = (Path *)
create_foreignscan_path(root, baserel,
NULL, /* default pathtarget */
baserel->rows,
rows_per_worker,
startup_cost,
total_cost,
pathkeys,
NULL, /* no outer rel either */
NULL, /* no extra plan */
(List *) private_parallel_merge);

num_workers = max_parallel_workers_per_gather;

cost_merge(path, list_length(private_parallel_merge->filenames),
startup_cost, total_cost, private_parallel_merge->matched_rows);
startup_cost, total_cost, path->rows);

path->rows = path->rows / (num_workers + 1);
path->total_cost = path->startup_cost + path->total_cost / (num_workers + 1);
path->parallel_workers = num_workers;
path->parallel_aware = true;
Expand Down
32 changes: 22 additions & 10 deletions src/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1003,10 +1003,21 @@ class DefaultParquetReader : public ParquetReader
}
case arrow::Type::MAP:
{
arrow::MapArray* maparray = (arrow::MapArray*) array;
arrow::MapArray *maparray = (arrow::MapArray*) array;
Datum jsonb = this->map_to_datum(maparray, chunkInfo.pos, typinfo);

slot->tts_values[attr] =
this->map_to_datum(maparray, chunkInfo.pos, typinfo);
/*
* Copy jsonb into memory block allocated by
* FastAllocator to prevent its destruction though
* to be able to recycle it once it fulfilled its
* purpose.
*/
void *jsonb_val = allocator->fast_alloc(VARSIZE_ANY(jsonb));

memcpy(jsonb_val, DatumGetPointer(jsonb), VARSIZE_ANY(jsonb));
pfree(DatumGetPointer(jsonb));

slot->tts_values[attr] = PointerGetDatum(jsonb_val);
break;
}
default:
Expand Down Expand Up @@ -1255,20 +1266,21 @@ class CachingParquetReader : public ParquetReader
case arrow::Type::MAP:
{
arrow::MapArray* maparray = (arrow::MapArray*) array;

Datum jsonb =
this->map_to_datum(maparray, j, typinfo);
Datum jsonb = this->map_to_datum(maparray, j, typinfo);

/*
* Copy jsonb into memory block allocated by
* FastAllocator to prevent its destruction though
* to be able to recycle it once it fulfilled its
* purpose.
*/
void *res = allocator->fast_alloc(VARSIZE_ANY(jsonb));
memcpy(res, (Jsonb *) jsonb, VARSIZE_ANY(jsonb));
((Datum *) data)[row] = (Datum) res;
pfree((Jsonb *) jsonb);
void *jsonb_val = allocator->fast_alloc(VARSIZE_ANY(jsonb));

memcpy(jsonb_val, DatumGetPointer(jsonb), VARSIZE_ANY(jsonb));
pfree(DatumGetPointer(jsonb));

((Datum *) data)[row] = PointerGetDatum(jsonb_val);

break;
}
default:
Expand Down
32 changes: 18 additions & 14 deletions test/data/generate.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#!/usr/bin/env python3

import pyarrow.parquet as pq
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime, date, timedelta

# example1.parquet file
Expand All @@ -24,9 +22,9 @@
df2 = pd.DataFrame({'one': [4, 5, 6],
'two': [[10, 11, 12], [13, 14, 15], [16, 17, 18]],
'three': ['uno', 'dos', 'tres'],
'four': [datetime(2018, 1, 4) + timedelta(seconds=10),
datetime(2018, 1, 5) + timedelta(milliseconds=10),
datetime(2018, 1, 6) + timedelta(microseconds=10)],
'four': [datetime(2018, 1, 4)+timedelta(seconds=10),
datetime(2018, 1, 5)+timedelta(milliseconds=10),
datetime(2018, 1, 6)+timedelta(microseconds=10)],
'five': [date(2018, 1, 4),
date(2018, 1, 5),
date(2018, 1, 6)],
Expand Down Expand Up @@ -69,7 +67,7 @@
df = pd.DataFrame({
'one': pd.Series([
[(1, 'foo'), (2, 'bar'), (3, 'baz')],
[(4, 'test1'), (5,'test2')],
[(4, 'test1'), (5, 'test2')],
]),
'two': pd.Series([
[(date(2018, 1, 1), 10), (date(2018, 1, 2), 15)],
Expand All @@ -89,22 +87,28 @@
writer.write_table(table)

# Parquet files for partitions
df_part1 = pd.DataFrame({'id': [1, 1, 2],
df_part1 = pd.DataFrame({'id': [1, 1, 2, 3],
'token': [1, 1, 2, 2],
'date': [datetime(2018, 1, 1),
datetime(2018, 1, 2),
datetime(2018, 1, 3)],
'num': [10, 23, 9]})
datetime(2018, 1, 3),
datetime(2018, 1, 4)],
'num': [10, 23, 9, 38]})
table_part1 = pa.Table.from_pandas(df_part1)

with pq.ParquetWriter('partition/example_part1.parquet', table_part1.schema) as writer:
with pq.ParquetWriter(
'partition/example_part1.parquet', table_part1.schema) as writer:
writer.write_table(table_part1)

df_part2 = pd.DataFrame({'id': [1, 2, 2],
df_part2 = pd.DataFrame({'id': [1, 2, 2, 3],
'token': [1, 2, 2, 2],
'date': [datetime(2018, 2, 1),
datetime(2018, 2, 2),
datetime(2018, 2, 3)],
'num': [59, 1, 32]})
datetime(2018, 2, 3),
datetime(2018, 2, 4)],
'num': [59, 1, 32, 96]})
table_part2 = pa.Table.from_pandas(df_part2)

with pq.ParquetWriter('partition/example_part2.parquet', table_part2.schema) as writer:
with pq.ParquetWriter(
'partition/example_part2.parquet', table_part2.schema) as writer:
writer.write_table(table_part2)
Binary file modified test/data/partition/example_part1.parquet
Binary file not shown.
Binary file modified test/data/partition/example_part2.parquet
Binary file not shown.
Loading

0 comments on commit d15664e

Please sign in to comment.