Skip to content

Commit

Permalink
Lighten DataFrame size and fix large futures warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Apr 17, 2024
1 parent ea2e4fc commit 0a2b633
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 70 deletions.
174 changes: 122 additions & 52 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ impl Default for DataFrameWriteOptions {
/// ```
#[derive(Debug, Clone)]
pub struct DataFrame {
session_state: SessionState,
// Box the (large) SessionState to reduce the size of DataFrame on the stack
session_state: Box<SessionState>,
plan: LogicalPlan,
}

Expand All @@ -168,7 +169,7 @@ impl DataFrame {
/// `DataFrame` from an existing datasource.
pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self {
Self {
session_state,
session_state: Box::new(session_state),
plan,
}
}
Expand Down Expand Up @@ -234,7 +235,10 @@ impl DataFrame {
};
let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;

Ok(DataFrame::new(self.session_state, project_plan))
Ok(DataFrame {
session_state: self.session_state,
plan: project_plan,
})
}

/// Expand each list element of a column to multiple rows.
Expand Down Expand Up @@ -273,7 +277,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.unnest_column_with_options(column, options)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Return a DataFrame with only rows for which `predicate` evaluates to
Expand All @@ -298,7 +305,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.filter(predicate)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Return a new `DataFrame` that aggregates the rows of the current
Expand Down Expand Up @@ -329,7 +339,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.aggregate(group_expr, aggr_expr)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Return a new DataFrame that adds the result of evaluating one or more
Expand All @@ -338,7 +351,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.window(window_exprs)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Returns a new `DataFrame` with a limited number of rows.
Expand All @@ -363,7 +379,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.limit(skip, fetch)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Calculate the union of two [`DataFrame`]s, preserving duplicate rows.
Expand All @@ -387,7 +406,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.union(dataframe.plan)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Calculate the distinct union of two [`DataFrame`]s.
Expand All @@ -409,12 +431,13 @@ impl DataFrame {
/// # }
/// ```
pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
Ok(DataFrame::new(
self.session_state,
LogicalPlanBuilder::from(self.plan)
.union_distinct(dataframe.plan)?
.build()?,
))
let plan = LogicalPlanBuilder::from(self.plan)
.union_distinct(dataframe.plan)?
.build()?;
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Return a new `DataFrame` with all duplicated rows removed.
Expand All @@ -432,10 +455,11 @@ impl DataFrame {
/// # }
/// ```
pub fn distinct(self) -> Result<DataFrame> {
Ok(DataFrame::new(
self.session_state,
LogicalPlanBuilder::from(self.plan).distinct()?.build()?,
))
let plan = LogicalPlanBuilder::from(self.plan).distinct()?.build()?;
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Return a new `DataFrame` that has statistics for a DataFrame.
Expand Down Expand Up @@ -603,15 +627,18 @@ impl DataFrame {
describe_record_batch.schema(),
vec![vec![describe_record_batch]],
)?;
Ok(DataFrame::new(
self.session_state,
LogicalPlanBuilder::scan(
UNNAMED_TABLE,
provider_as_source(Arc::new(provider)),
None,
)?
.build()?,
))

let plan = LogicalPlanBuilder::scan(
UNNAMED_TABLE,
provider_as_source(Arc::new(provider)),
None,
)?
.build()?;

Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Sort the DataFrame by the specified sorting expressions.
Expand All @@ -637,7 +664,10 @@ impl DataFrame {
/// ```
pub fn sort(self, expr: Vec<Expr>) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Join this `DataFrame` with another `DataFrame` using explicitly specified
Expand Down Expand Up @@ -691,7 +721,10 @@ impl DataFrame {
filter,
)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Join this `DataFrame` with another `DataFrame` using the specified
Expand Down Expand Up @@ -741,7 +774,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.join_on(right.plan, join_type, expr)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Repartition a DataFrame based on a logical partitioning scheme.
Expand All @@ -762,7 +798,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.repartition(partitioning_scheme)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Return the total number of rows in this `DataFrame`.
Expand Down Expand Up @@ -867,7 +906,7 @@ impl DataFrame {

/// Return a new [`TaskContext`] which would be used to execute this DataFrame
pub fn task_ctx(&self) -> TaskContext {
TaskContext::from(&self.session_state)
TaskContext::from(self.session_state.as_ref())
}

/// Executes this DataFrame and returns a stream over a single partition
Expand Down Expand Up @@ -973,7 +1012,7 @@ impl DataFrame {

/// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`]
pub fn into_parts(self) -> (SessionState, LogicalPlan) {
(self.session_state, self.plan)
(*self.session_state, self.plan)
}

/// Return the [`LogicalPlan`] represented by this DataFrame without running
Expand Down Expand Up @@ -1027,7 +1066,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.explain(verbose, analyze)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Return a `FunctionRegistry` used to plan udf's calls
Expand All @@ -1046,7 +1088,7 @@ impl DataFrame {
/// # }
/// ```
pub fn registry(&self) -> &dyn FunctionRegistry {
&self.session_state
self.session_state.as_ref()
}

/// Calculate the intersection of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
Expand All @@ -1066,10 +1108,11 @@ impl DataFrame {
pub fn intersect(self, dataframe: DataFrame) -> Result<DataFrame> {
let left_plan = self.plan;
let right_plan = dataframe.plan;
Ok(DataFrame::new(
self.session_state,
LogicalPlanBuilder::intersect(left_plan, right_plan, true)?,
))
let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, true)?;
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Calculate the exception of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
Expand All @@ -1089,11 +1132,11 @@ impl DataFrame {
pub fn except(self, dataframe: DataFrame) -> Result<DataFrame> {
let left_plan = self.plan;
let right_plan = dataframe.plan;

Ok(DataFrame::new(
self.session_state,
LogicalPlanBuilder::except(left_plan, right_plan, true)?,
))
let plan = LogicalPlanBuilder::except(left_plan, right_plan, true)?;
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Execute this `DataFrame` and write the results to `table_name`.
Expand All @@ -1118,7 +1161,13 @@ impl DataFrame {
write_options.overwrite,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await

DataFrame {
session_state: self.session_state,
plan,
}
.collect()
.await
}

/// Execute the `DataFrame` and write the results to CSV file(s).
Expand Down Expand Up @@ -1166,7 +1215,13 @@ impl DataFrame {
options.partition_by,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await

DataFrame {
session_state: self.session_state,
plan,
}
.collect()
.await
}

/// Execute the `DataFrame` and write the results to JSON file(s).
Expand Down Expand Up @@ -1215,7 +1270,13 @@ impl DataFrame {
options.partition_by,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await

DataFrame {
session_state: self.session_state,
plan,
}
.collect()
.await
}

/// Add an additional column to the DataFrame.
Expand Down Expand Up @@ -1261,7 +1322,10 @@ impl DataFrame {

let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?;

Ok(DataFrame::new(self.session_state, project_plan))
Ok(DataFrame {
session_state: self.session_state,
plan: project_plan,
})
}

/// Rename one column by applying a new projection. This is a no-op if the column to be
Expand Down Expand Up @@ -1326,7 +1390,10 @@ impl DataFrame {
let project_plan = LogicalPlanBuilder::from(self.plan)
.project(projection)?
.build()?;
Ok(DataFrame::new(self.session_state, project_plan))
Ok(DataFrame {
session_state: self.session_state,
plan: project_plan,
})
}

/// Replace all parameters in logical plan with the specified
Expand Down Expand Up @@ -1388,7 +1455,10 @@ impl DataFrame {
/// ```
pub fn with_param_values(self, query_values: impl Into<ParamValues>) -> Result<Self> {
let plan = self.plan.with_param_values(query_values)?;
Ok(Self::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Cache DataFrame as a memory table.
Expand All @@ -1405,7 +1475,7 @@ impl DataFrame {
/// # }
/// ```
pub async fn cache(self) -> Result<DataFrame> {
let context = SessionContext::new_with_state(self.session_state.clone());
let context = SessionContext::new_with_state((*self.session_state).clone());
// The schema is consistent with the output
let plan = self.clone().create_physical_plan().await?;
let schema = plan.schema();
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ impl DataFrame {
options.partition_by,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
DataFrame {
session_state: self.session_state,
plan,
}
.collect()
.await
}
}

Expand Down
Loading

0 comments on commit 0a2b633

Please sign in to comment.