summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Wright <56345+ayax79@users.noreply.github.com>2024-09-23 04:43:43 -0700
committerGitHub <noreply@github.com>2024-09-23 06:43:43 -0500
commit2541a712e4091a334c66d750e7246c77413ce76d (patch)
tree2f2cec7f8d91baf1deb39291ab1719aa0e1afd97
parentee877607fb5500d378e261a4a4f3495db8deec03 (diff)
Added `polars concat` to allow concatenation of multiple dataframes (#13879)
# Description Provides the ability to concatenate multiple dataframes together # User-Facing Changes - Introduces new command `polars concat`
-rw-r--r--crates/nu_plugin_polars/src/dataframe/command/data/concat.rs162
-rw-r--r--crates/nu_plugin_polars/src/dataframe/command/data/mod.rs2
2 files changed, 164 insertions, 0 deletions
diff --git a/crates/nu_plugin_polars/src/dataframe/command/data/concat.rs b/crates/nu_plugin_polars/src/dataframe/command/data/concat.rs
new file mode 100644
index 000000000..afb2a1733
--- /dev/null
+++ b/crates/nu_plugin_polars/src/dataframe/command/data/concat.rs
@@ -0,0 +1,162 @@
+use crate::{
+ values::{CustomValueSupport, NuLazyFrame},
+ PolarsPlugin,
+};
+
+use crate::values::NuDataFrame;
+use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
+use nu_protocol::{
+ Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, SyntaxShape, Type,
+ Value,
+};
+use polars::{
+ df,
+ prelude::{LazyFrame, UnionArgs},
+};
+
+#[derive(Clone)]
+pub struct ConcatDF;
+
+impl PluginCommand for ConcatDF {
+ type Plugin = PolarsPlugin;
+
+ fn name(&self) -> &str {
+ "polars concat"
+ }
+
+ fn description(&self) -> &str {
+ "Concatenate two or more dataframes."
+ }
+
+ fn signature(&self) -> Signature {
+ Signature::build(self.name())
+ .switch("no-parallel", "Disable parallel execution", None)
+ .switch("rechunk", "Rechunk the resulting dataframe", None)
+ .switch("to-supertypes", "Cast to supertypes", None)
+ .switch("diagonal", "Concatenate dataframes diagonally", None)
+ .switch(
+ "from-partitioned-ds",
+ "Concatenate dataframes from a partitioned dataset",
+ None,
+ )
+ .rest(
+ "dataframes",
+ SyntaxShape::Any,
+ "The dataframes to concatenate",
+ )
+ .input_output_type(Type::Any, Type::Custom("dataframe".into()))
+ .category(Category::Custom("dataframe".into()))
+ }
+
+ fn examples(&self) -> Vec<Example> {
+ vec![
+ Example {
+ description: "Concatenates two dataframes with the dataframe in the pipeline.",
+ example: "[[a b]; [1 2]] | polars into-df
+ | polars concat ([[a b]; [3 4]] | polars into-df) ([[a b]; [5 6]] | polars into-df)
+ | polars collect
+ | polars sort-by [a b]",
+ result: Some(
+ NuDataFrame::from(
+ df!(
+ "a" => [1, 3, 5],
+ "b" => [2, 4, 6],
+ )
+ .expect("simple df for test should not fail"),
+ )
+ .into_value(Span::test_data()),
+ ),
+ },
+ Example {
+ description: "Concatenates three dataframes together",
+ example: "polars concat ([[a b]; [1 2]] | polars into-df) ([[a b]; [3 4]] | polars into-df) ([[a b]; [5 6]] | polars into-df)
+ | polars collect
+ | polars sort-by [a b]",
+ result: Some(
+ NuDataFrame::from(
+ df!(
+ "a" => [1, 3, 5],
+ "b" => [2, 4, 6],
+ )
+ .expect("simple df for test should not fail"),
+ )
+ .into_value(Span::test_data()),
+ ),
+ }
+ ]
+ }
+
+ fn run(
+ &self,
+ plugin: &Self::Plugin,
+ engine: &EngineInterface,
+ call: &EvaluatedCall,
+ input: PipelineData,
+ ) -> Result<PipelineData, LabeledError> {
+ let maybe_df = NuLazyFrame::try_from_pipeline_coerce(plugin, input, call.head).ok();
+ command_lazy(plugin, engine, call, maybe_df).map_err(LabeledError::from)
+ }
+}
+
+fn command_lazy(
+ plugin: &PolarsPlugin,
+ engine: &EngineInterface,
+ call: &EvaluatedCall,
+ maybe_lazy: Option<NuLazyFrame>,
+) -> Result<PipelineData, ShellError> {
+ let parallel = !call.has_flag("no-parallel")?;
+ let rechunk = call.has_flag("rechunk")?;
+ let to_supertypes = call.has_flag("to-supertypes")?;
+ let diagonal = call.has_flag("diagonal")?;
+ let from_partitioned_ds = call.has_flag("from-partitioned-ds")?;
+ let mut dataframes = call
+ .rest::<Value>(0)?
+ .iter()
+ .map(|v| NuLazyFrame::try_from_value_coerce(plugin, v).map(|lazy| lazy.to_polars()))
+ .collect::<Result<Vec<LazyFrame>, ShellError>>()?;
+
+ if dataframes.is_empty() {
+ Err(ShellError::GenericError {
+ error: "At least one other dataframe must be provided".into(),
+ msg: "".into(),
+ span: Some(call.head),
+ help: None,
+ inner: vec![],
+ })
+ } else {
+ if let Some(lazy) = maybe_lazy.as_ref() {
+ dataframes.insert(0, lazy.to_polars());
+ }
+ let args = UnionArgs {
+ parallel,
+ rechunk,
+ to_supertypes,
+ diagonal,
+ from_partitioned_ds,
+ };
+
+ let res: NuLazyFrame = polars::prelude::concat(&dataframes, args)
+ .map_err(|e| ShellError::GenericError {
+ error: format!("Failed to concatenate dataframes: {e}"),
+ msg: "".into(),
+ span: Some(call.head),
+ help: None,
+ inner: vec![],
+ })?
+ .into();
+
+ res.to_pipeline_data(plugin, engine, call.head)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use crate::test::test_polars_plugin_command;
+
+ use super::*;
+
+ #[test]
+ fn test_examples() -> Result<(), ShellError> {
+ test_polars_plugin_command(&ConcatDF)
+ }
+}
diff --git a/crates/nu_plugin_polars/src/dataframe/command/data/mod.rs b/crates/nu_plugin_polars/src/dataframe/command/data/mod.rs
index 302b5f468..f11ac31fe 100644
--- a/crates/nu_plugin_polars/src/dataframe/command/data/mod.rs
+++ b/crates/nu_plugin_polars/src/dataframe/command/data/mod.rs
@@ -4,6 +4,7 @@ mod arg_where;
mod cast;
mod col;
mod collect;
+mod concat;
mod drop;
mod drop_duplicates;
mod drop_nulls;
@@ -74,6 +75,7 @@ pub(crate) fn data_commands() -> Vec<Box<dyn PluginCommand<Plugin = PolarsPlugin
Box::new(AppendDF),
Box::new(CastDF),
Box::new(DropDF),
+ Box::new(concat::ConcatDF),
Box::new(DropDuplicates),
Box::new(DropNulls),
Box::new(Dummies),