summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Wright <56345+ayax79@users.noreply.github.com>2024-08-08 09:46:45 -0700
committerGitHub <noreply@github.com>2024-08-08 09:46:45 -0700
commit4ff33933dd7701240c37e4eee24929ebc8dccd92 (patch)
tree59130508378d46547cb456cc3d8785ea7a05e705
parent035308bb1df550fb1a651642e271cb7a196154e7 (diff)
Merge `polars sink` and `polars to-*` to `polars save` (#13568)
# Description This pull request merges `polars sink` and `polars to-*` into one command `polars save`. # User-Facing Changes - `polars to-*` commands have all been replaced with `polars save`. When saving a lazy frame to a type that supports a polars sink operation, a sink operation will be performed. Sink operations are much more performant, performing a collect while streaming to the file system.
-rw-r--r--crates/nu_plugin_polars/src/dataframe/eager/mod.rs17
-rw-r--r--crates/nu_plugin_polars/src/dataframe/eager/open.rs76
-rw-r--r--crates/nu_plugin_polars/src/dataframe/eager/save/arrow.rs62
-rw-r--r--crates/nu_plugin_polars/src/dataframe/eager/save/avro.rs74
-rw-r--r--crates/nu_plugin_polars/src/dataframe/eager/save/csv.rs111
-rw-r--r--crates/nu_plugin_polars/src/dataframe/eager/save/mod.rs308
-rw-r--r--crates/nu_plugin_polars/src/dataframe/eager/save/ndjson.rs63
-rw-r--r--crates/nu_plugin_polars/src/dataframe/eager/save/parquet.rs62
-rw-r--r--crates/nu_plugin_polars/src/dataframe/eager/to_arrow.rs134
-rw-r--r--crates/nu_plugin_polars/src/dataframe/eager/to_avro.rs163
-rw-r--r--crates/nu_plugin_polars/src/dataframe/eager/to_csv.rs181
-rw-r--r--crates/nu_plugin_polars/src/dataframe/eager/to_json_lines.rs135
-rw-r--r--crates/nu_plugin_polars/src/dataframe/eager/to_parquet.rs135
-rw-r--r--crates/nu_plugin_polars/src/dataframe/lazy/mod.rs2
-rw-r--r--crates/nu_plugin_polars/src/dataframe/lazy/sink.rs205
-rw-r--r--crates/nu_plugin_polars/src/dataframe/values/file_type.rs59
-rw-r--r--crates/nu_plugin_polars/src/dataframe/values/mod.rs16
17 files changed, 795 insertions, 1008 deletions
diff --git a/crates/nu_plugin_polars/src/dataframe/eager/mod.rs b/crates/nu_plugin_polars/src/dataframe/eager/mod.rs
index 6aa37d1ed..1dea968ef 100644
--- a/crates/nu_plugin_polars/src/dataframe/eager/mod.rs
+++ b/crates/nu_plugin_polars/src/dataframe/eager/mod.rs
@@ -14,6 +14,7 @@ mod pivot;
mod query_df;
mod rename;
mod sample;
+mod save;
mod schema;
mod shape;
mod slice;
@@ -21,13 +22,8 @@ mod sql_context;
mod sql_expr;
mod summary;
mod take;
-mod to_arrow;
-mod to_avro;
-mod to_csv;
mod to_df;
-mod to_json_lines;
mod to_nu;
-mod to_parquet;
mod unpivot;
mod with_column;
@@ -55,13 +51,8 @@ pub use slice::SliceDF;
pub use sql_context::SQLContext;
pub use summary::Summary;
pub use take::TakeDF;
-pub use to_arrow::ToArrow;
-pub use to_avro::ToAvro;
-pub use to_csv::ToCSV;
pub use to_df::ToDataFrame;
-pub use to_json_lines::ToJsonLines;
pub use to_nu::ToNu;
-pub use to_parquet::ToParquet;
pub use unpivot::UnpivotDF;
pub use with_column::WithColumn;
@@ -89,13 +80,9 @@ pub(crate) fn eager_commands() -> Vec<Box<dyn PluginCommand<Plugin = PolarsPlugi
Box::new(SchemaCmd),
Box::new(TakeDF),
Box::new(ToNu),
- Box::new(ToArrow),
- Box::new(ToAvro),
Box::new(ToDataFrame),
- Box::new(ToCSV),
- Box::new(ToJsonLines),
- Box::new(ToParquet),
Box::new(QueryDf),
Box::new(WithColumn),
+ Box::new(save::SaveDF),
]
}
diff --git a/crates/nu_plugin_polars/src/dataframe/eager/open.rs b/crates/nu_plugin_polars/src/dataframe/eager/open.rs
index 6c4246497..77fbe583a 100644
--- a/crates/nu_plugin_polars/src/dataframe/eager/open.rs
+++ b/crates/nu_plugin_polars/src/dataframe/eager/open.rs
@@ -1,6 +1,6 @@
use crate::{
dataframe::values::NuSchema,
- values::{CustomValueSupport, NuLazyFrame},
+ values::{CustomValueSupport, NuLazyFrame, PolarsFileType},
EngineWrapper, PolarsPlugin,
};
use nu_path::expand_path_with;
@@ -46,7 +46,7 @@ impl PluginCommand for OpenDataFrame {
}
fn usage(&self) -> &str {
- "Opens CSV, JSON, JSON lines, arrow, avro, or parquet file to create dataframe. A lazy dataframe will be created by default, if supported."
+ "Opens CSV, JSON, NDJSON/JSON lines, arrow, avro, or parquet file to create dataframe. A lazy dataframe will be created by default, if supported."
}
fn signature(&self) -> Signature {
@@ -130,33 +130,37 @@ fn command(
let file_path = expand_path_with(&spanned_file.item, engine.get_current_dir()?, true);
let file_span = spanned_file.span;
- let type_option: Option<Spanned<String>> = call.get_flag("type")?;
-
- let type_id = match &type_option {
- Some(ref t) => Some((t.item.to_owned(), "Invalid type", t.span)),
- None => file_path.extension().map(|e| {
- (
- e.to_string_lossy().into_owned(),
- "Invalid extension",
- spanned_file.span,
- )
- }),
- };
-
- match type_id {
- Some((e, msg, blamed)) => match e.as_str() {
- "csv" | "tsv" => from_csv(plugin, engine, call, &file_path, file_span),
- "parquet" | "parq" => from_parquet(plugin, engine, call, &file_path, file_span),
- "ipc" | "arrow" => from_ipc(plugin, engine, call, &file_path, file_span),
- "json" => from_json(plugin, engine, call, &file_path, file_span),
- "jsonl" => from_jsonl(plugin, engine, call, &file_path, file_span),
- "avro" => from_avro(plugin, engine, call, &file_path, file_span),
- _ => Err(ShellError::FileNotFoundCustom {
- msg: format!(
- "{msg}. Supported values: csv, tsv, parquet, ipc, arrow, json, jsonl, avro"
- ),
- span: blamed,
- }),
+ let type_option: Option<(String, Span)> = call
+ .get_flag("type")?
+ .map(|t: Spanned<String>| (t.item, t.span))
+ .or_else(|| {
+ file_path
+ .extension()
+ .map(|e| (e.to_string_lossy().into_owned(), spanned_file.span))
+ });
+
+ match type_option {
+ Some((ext, blamed)) => match PolarsFileType::from(ext.as_str()) {
+ PolarsFileType::Csv | PolarsFileType::Tsv => {
+ from_csv(plugin, engine, call, &file_path, file_span)
+ }
+ PolarsFileType::Parquet => from_parquet(plugin, engine, call, &file_path, file_span),
+ PolarsFileType::Arrow => from_arrow(plugin, engine, call, &file_path, file_span),
+ PolarsFileType::Json => from_json(plugin, engine, call, &file_path, file_span),
+ PolarsFileType::NdJson => from_ndjson(plugin, engine, call, &file_path, file_span),
+ PolarsFileType::Avro => from_avro(plugin, engine, call, &file_path, file_span),
+ _ => Err(PolarsFileType::build_unsupported_error(
+ &ext,
+ &[
+ PolarsFileType::Csv,
+ PolarsFileType::Tsv,
+ PolarsFileType::Parquet,
+ PolarsFileType::Arrow,
+ PolarsFileType::NdJson,
+ PolarsFileType::Avro,
+ ],
+ blamed,
+ )),
},
None => Err(ShellError::FileNotFoundCustom {
msg: "File without extension".into(),
@@ -268,7 +272,7 @@ fn from_avro(
df.cache_and_to_value(plugin, engine, call.head)
}
-fn from_ipc(
+fn from_arrow(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
@@ -370,7 +374,7 @@ fn from_json(
df.cache_and_to_value(plugin, engine, call.head)
}
-fn from_jsonl(
+fn from_ndjson(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
@@ -397,18 +401,14 @@ fn from_jsonl(
.with_schema(maybe_schema.map(|s| s.into()))
.finish()
.map_err(|e| ShellError::GenericError {
- error: format!("Json lines reader error: {e}"),
+ error: format!("NDJSON reader error: {e}"),
msg: "".into(),
span: Some(call.head),
help: None,
inner: vec![],
})?;
- perf!(
- "Lazy json lines dataframe open",
- start_time,
- engine.use_color()
- );
+ perf!("Lazy NDJSON dataframe open", start_time, engine.use_color());
let df = NuLazyFrame::new(false, df);
df.cache_and_to_value(plugin, engine, call.head)
@@ -444,7 +444,7 @@ fn from_jsonl(
.into();
perf!(
- "Eager json lines dataframe open",
+ "Eager NDJSON dataframe open",
start_time,
engine.use_color()
);
diff --git a/crates/nu_plugin_polars/src/dataframe/eager/save/arrow.rs b/crates/nu_plugin_polars/src/dataframe/eager/save/arrow.rs
new file mode 100644
index 000000000..2c972c4d4
--- /dev/null
+++ b/crates/nu_plugin_polars/src/dataframe/eager/save/arrow.rs
@@ -0,0 +1,62 @@
+use std::{fs::File, path::Path};
+
+use nu_plugin::EvaluatedCall;
+use nu_protocol::{ShellError, Span};
+use polars::prelude::{IpcWriter, SerWriter};
+use polars_io::ipc::IpcWriterOptions;
+
+use crate::values::{NuDataFrame, NuLazyFrame};
+
+use super::polars_file_save_error;
+
+pub(crate) fn command_lazy(
+ _call: &EvaluatedCall,
+ lazy: &NuLazyFrame,
+ file_path: &Path,
+ file_span: Span,
+) -> Result<(), ShellError> {
+ lazy.to_polars()
+ .sink_ipc(file_path, IpcWriterOptions::default())
+ .map_err(|e| polars_file_save_error(e, file_span))
+}
+
+pub(crate) fn command_eager(
+ df: &NuDataFrame,
+ file_path: &Path,
+ file_span: Span,
+) -> Result<(), ShellError> {
+ let mut file = File::create(file_path).map_err(|e| ShellError::GenericError {
+ error: format!("Error with file name: {e}"),
+ msg: "".into(),
+ span: Some(file_span),
+ help: None,
+ inner: vec![],
+ })?;
+
+ IpcWriter::new(&mut file)
+ .finish(&mut df.to_polars())
+ .map_err(|e| ShellError::GenericError {
+ error: "Error saving file".into(),
+ msg: e.to_string(),
+ span: Some(file_span),
+ help: None,
+ inner: vec![],
+ })?;
+ Ok(())
+}
+
+#[cfg(test)]
+pub mod test {
+
+ use crate::eager::save::test::{test_eager_save, test_lazy_save};
+
+ #[test]
+ pub fn test_arrow_eager_save() -> Result<(), Box<dyn std::error::Error>> {
+ test_eager_save("arrow")
+ }
+
+ #[test]
+ pub fn test_arrow_lazy_save() -> Result<(), Box<dyn std::error::Error>> {
+ test_lazy_save("arrow")
+ }
+}
diff --git a/crates/nu_plugin_polars/src/dataframe/eager/save/avro.rs b/crates/nu_plugin_polars/src/dataframe/eager/save/avro.rs
new file mode 100644
index 000000000..58463ec36
--- /dev/null
+++ b/crates/nu_plugin_polars/src/dataframe/eager/save/avro.rs
@@ -0,0 +1,74 @@
+use std::fs::File;
+use std::path::Path;
+
+use nu_plugin::EvaluatedCall;
+use nu_protocol::{ShellError, Span};
+use polars_io::avro::{AvroCompression, AvroWriter};
+use polars_io::SerWriter;
+
+use crate::values::NuDataFrame;
+
+fn get_compression(call: &EvaluatedCall) -> Result<Option<AvroCompression>, ShellError> {
+ if let Some((compression, span)) = call
+ .get_flag_value("avro-compression")
+ .map(|e| e.as_str().map(|s| (s.to_owned(), e.span())))
+ .transpose()?
+ {
+ match compression.as_ref() {
+ "snappy" => Ok(Some(AvroCompression::Snappy)),
+ "deflate" => Ok(Some(AvroCompression::Deflate)),
+ _ => Err(ShellError::IncorrectValue {
+ msg: "compression must be one of deflate or snappy".to_string(),
+ val_span: span,
+ call_span: span,
+ }),
+ }
+ } else {
+ Ok(None)
+ }
+}
+
+pub(crate) fn command_eager(
+ call: &EvaluatedCall,
+ df: &NuDataFrame,
+ file_path: &Path,
+ file_span: Span,
+) -> Result<(), ShellError> {
+ let compression = get_compression(call)?;
+
+ let file = File::create(file_path).map_err(|e| ShellError::GenericError {
+ error: format!("Error with file name: {e}"),
+ msg: "".into(),
+ span: Some(file_span),
+ help: None,
+ inner: vec![],
+ })?;
+
+ AvroWriter::new(file)
+ .with_compression(compression)
+ .finish(&mut df.to_polars())
+ .map_err(|e| ShellError::GenericError {
+ error: "Error saving file".into(),
+ msg: e.to_string(),
+ span: Some(file_span),
+ help: None,
+ inner: vec![],
+ })?;
+
+ Ok(())
+}
+
+#[cfg(test)]
+pub mod test {
+ use crate::eager::save::test::{test_eager_save, test_lazy_save};
+
+ #[test]
+ pub fn test_avro_eager_save() -> Result<(), Box<dyn std::error::Error>> {
+ test_eager_save("avro")
+ }
+
+ #[test]
+ pub fn test_avro_lazy_save() -> Result<(), Box<dyn std::error::Error>> {
+ test_lazy_save("avro")
+ }
+}
diff --git a/crates/nu_plugin_polars/src/dataframe/eager/save/csv.rs b/crates/nu_plugin_polars/src/dataframe/eager/save/csv.rs
new file mode 100644
index 000000000..f43b1b023
--- /dev/null
+++ b/crates/nu_plugin_polars/src/dataframe/eager/save/csv.rs
@@ -0,0 +1,111 @@
+use std::{fs::File, path::Path};
+
+use nu_plugin::EvaluatedCall;
+use nu_protocol::{ShellError, Span, Spanned};
+use polars::prelude::{CsvWriter, SerWriter};
+use polars_io::csv::write::{CsvWriterOptions, SerializeOptions};
+
+use crate::values::{NuDataFrame, NuLazyFrame};
+
+use super::polars_file_save_error;
+
+pub(crate) fn command_lazy(
+ call: &EvaluatedCall,
+ lazy: &NuLazyFrame,
+ file_path: &Path,
+ file_span: Span,
+) -> Result<(), ShellError> {
+ let delimiter: Option<Spanned<String>> = call.get_flag("csv-delimiter")?;
+ let separator = delimiter
+ .and_then(|d| d.item.chars().next().map(|c| c as u8))
+ .unwrap_or(b',');
+
+ let no_header: bool = call.has_flag("csv-no-header")?;
+
+ let options = CsvWriterOptions {
+ include_header: !no_header,
+ serialize_options: SerializeOptions {
+ separator,
+ ..SerializeOptions::default()
+ },
+ ..CsvWriterOptions::default()
+ };
+
+ lazy.to_polars()
+ .sink_csv(file_path, options)
+ .map_err(|e| polars_file_save_error(e, file_span))
+}
+
+pub(crate) fn command_eager(
+ call: &EvaluatedCall,
+ df: &NuDataFrame,
+ file_path: &Path,
+ file_span: Span,
+) -> Result<(), ShellError> {
+ let delimiter: Option<Spanned<String>> = call.get_flag("csv-delimiter")?;
+ let no_header: bool = call.has_flag("csv-no-header")?;
+
+ let mut file = File::create(file_path).map_err(|e| ShellError::GenericError {
+ error: format!("Error with file name: {e}"),
+ msg: "".into(),
+ span: Some(file_span),
+ help: None,
+ inner: vec![],
+ })?;
+
+ let writer = CsvWriter::new(&mut file);
+
+ let writer = if no_header {
+ writer.include_header(false)
+ } else {
+ writer.include_header(true)
+ };
+
+ let mut writer = match delimiter {
+ None => writer,
+ Some(d) => {
+ if d.item.len() != 1 {
+ return Err(ShellError::GenericError {
+ error: "Incorrect delimiter".into(),
+ msg: "Delimiter has to be one char".into(),
+ span: Some(d.span),
+ help: None,
+ inner: vec![],
+ });
+ } else {
+ let delimiter = match d.item.chars().next() {
+ Some(d) => d as u8,
+ None => unreachable!(),
+ };
+
+ writer.with_separator(delimiter)
+ }
+ }
+ };
+
+ writer
+ .finish(&mut df.to_polars())
+ .map_err(|e| ShellError::GenericError {
+ error: format!("Error writing to file: {e}"),
+ msg: e.to_string(),
+ span: Some(file_span),
+ help: None,
+ inner: vec![],
+ })?;
+ Ok(())
+}
+
+#[cfg(test)]
+pub mod test {
+ use crate::eager::save::test::{test_eager_save, test_lazy_save};
+
+ #[test]
+ pub fn test_csv_eager_save() -> Result<(), Box<dyn std::error::Error>> {
+ test_eager_save("csv")
+ }
+
+ #[test]
+ pub fn test_csv_lazy_save() -> Result<(), Box<dyn std::error::Error>> {
+ test_lazy_save("csv")
+ }
+}
diff --git a/crates/nu_plugin_polars/src/dataframe/eager/save/mod.rs b/crates/nu_plugin_polars/src/dataframe/eager/save/mod.rs
new file mode 100644
index 000000000..ce49fc54d
--- /dev/null
+++ b/crates/nu_plugin_polars/src/dataframe/eager/save/mod.rs
@@ -0,0 +1,308 @@
+mod arrow;
+mod avro;
+mod csv;
+mod ndjson;
+mod parquet;
+
+use std::path::PathBuf;
+
+use crate::{
+ values::{cant_convert_err, PolarsFileType, PolarsPluginObject, PolarsPluginType},
+ PolarsPlugin,
+};
+
+use nu_path::expand_path_with;
+use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
+use nu_protocol::{
+ Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, Spanned,
+ SyntaxShape, Type, Value,
+};
+use polars::error::PolarsError;
+
+#[derive(Clone)]
+pub struct SaveDF;
+
+impl PluginCommand for SaveDF {
+ type Plugin = PolarsPlugin;
+
+ fn name(&self) -> &str {
+ "polars save"
+ }
+
+ fn usage(&self) -> &str {
+ "Saves a dataframe to disk. For lazy dataframes a sink operation will be used if the file type supports it (parquet, ipc/arrow, csv, and ndjson)."
+ }
+
+ fn signature(&self) -> Signature {
+ Signature::build(self.name())
+ .required("path", SyntaxShape::Filepath, "Path to write to.")
+ .named(
+ "type",
+ SyntaxShape::String,
+ "File type: csv, json, parquet, arrow/ipc. If omitted, derive from file extension",
+ Some('t'),
+ )
+ .named(
+ "avro-compression",
+ SyntaxShape::String,
+ "Compression for avro supports deflate or snappy",
+ None,
+ )
+ .named(
+ "csv-delimiter",
+ SyntaxShape::String,
+ "file delimiter character",
+ None,
+ )
+ .switch(
+ "csv-no-header",
+ "Indicates to exclude a header row for CSV files.",
+ None,
+ )
+ .input_output_type(Type::Any, Type::String)
+ .category(Category::Custom("lazyframe".into()))
+ }
+
+ fn examples(&self) -> Vec<Example> {
+ vec![
+ Example {
+ description:
+ "Performs a streaming collect and save the output to the specified file",
+ example: "[[a b];[1 2] [3 4]] | polars into-lazy | polars save test.parquet",
+ result: None,
+ },
+ Example {
+ description: "Saves dataframe to parquet file",
+ example: "[[a b]; [1 2] [3 4]] | polars into-df | polars save test.parquet",
+ result: None,
+ },
+ Example {
+ description: "Saves dataframe to arrow file",
+ example: "[[a b]; [1 2] [3 4]] | polars into-df | polars save test.arrow",
+ result: None,
+ },
+ Example {
+ description: "Saves dataframe to NDJSON file",
+ example: "[[a b]; [1 2] [3 4]] | polars into-df | polars save test.ndjson",
+ result: None,
+ },
+ Example {
+ description: "Saves dataframe to avro file",
+ example: "[[a b]; [1 2] [3 4]] | polars into-df | polars save test.avro",
+ result: None,
+ },
+ Example {
+ description: "Saves dataframe to CSV file",
+ example: "[[a b]; [1 2] [3 4]] | dfr into-df | dfr save test.csv",
+ result: None,
+ },
+ Example {
+ description: "Saves dataframe to CSV file using other delimiter",
+ example: "[[a b]; [1 2] [3 4]] | dfr into-df | dfr save test.csv --delimiter '|'",
+ result: None,
+ },
+ ]
+ }
+
+ fn run(
+ &self,
+ plugin: &Self::Plugin,
+ engine: &EngineInterface,
+ call: &EvaluatedCall,
+ input: PipelineData,
+ ) -> Result<PipelineData, LabeledError> {
+ let value = input.into_value(call.head)?;
+
+ match PolarsPluginObject::try_from_value(plugin, &value)? {
+ po @ PolarsPluginObject::NuDataFrame(_) | po @ PolarsPluginObject::NuLazyFrame(_) => {
+ command(plugin, engine, call, po)
+ }
+ _ => Err(cant_convert_err(
+ &value,
+ &[PolarsPluginType::NuDataFrame, PolarsPluginType::NuLazyFrame],
+ )),
+ }
+ .map_err(LabeledError::from)
+ }
+}
+
+fn command(
+ _plugin: &PolarsPlugin,
+ engine: &EngineInterface,
+ call: &EvaluatedCall,
+ polars_object: PolarsPluginObject,
+) -> Result<PipelineData, ShellError> {
+ let spanned_file: Spanned<PathBuf> = call.req(0)?;
+ let file_path = expand_path_with(&spanned_file.item, engine.get_current_dir()?, true);
+ let file_span = spanned_file.span;
+ let type_option: Option<(String, Span)> = call
+ .get_flag("type")?
+ .map(|t: Spanned<String>| (t.item, t.span))
+ .or_else(|| {
+ file_path
+ .extension()
+ .map(|e| (e.to_string_lossy().into_owned(), spanned_file.span))
+ });
+
+ match type_option {
+ Some((ext, blamed)) => match PolarsFileType::from(ext.as_str()) {
+ PolarsFileType::Parquet => match polars_object {
+ PolarsPluginObject::NuLazyFrame(ref lazy) => {
+ parquet::command_lazy(call, lazy, &file_path, file_span)
+ }
+ PolarsPluginObject::NuDataFrame(ref df) => {
+ parquet::command_eager(df, &file_path, file_span)
+ }
+ _ => Err(unknown_file_save_error(file_span)),
+ },
+ PolarsFileType::Arrow => match polars_object {
+ PolarsPluginObject::NuLazyFrame(ref lazy) => {
+ arrow::command_lazy(call, lazy, &file_path, file_span)
+ }
+ PolarsPluginObject::NuDataFrame(ref df) => {
+ arrow::command_eager(df, &file_path, file_span)
+ }
+ _ => Err(unknown_file_save_error(file_span)),
+ },
+ PolarsFileType::NdJson => match polars_object {
+ PolarsPluginObject::NuLazyFrame(ref lazy) => {
+ ndjson::command_lazy(call, lazy, &file_path, file_span)
+ }
+ PolarsPluginObject::NuDataFrame(ref df) => {
+ ndjson::command_eager(df, &file_path, file_span)
+ }
+ _ => Err(unknown_file_save_error(file_span)),
+ },
+ PolarsFileType::Avro => match polars_object {
+ PolarsPluginObject::NuLazyFrame(lazy) => {
+ let df = lazy.collect(call.head)?;
+ avro::command_eager(call, &df, &file_path, file_span)
+ }
+ PolarsPluginObject::NuDataFrame(ref df) => {
+ avro::command_eager(call, df, &file_path, file_span)
+ }
+ _ => Err(unknown_file_save_error(file_span)),
+ },
+ PolarsFileType::Csv => match polars_object {
+ PolarsPluginObject::NuLazyFrame(ref lazy) => {
+ csv::command_lazy(call, lazy, &file_path, file_span)
+ }
+ PolarsPluginObject::NuDataFrame(ref df) => {
+ csv::command_eager(call, df, &file_path, file_span)
+ }
+ _ => Err(unknown_file_save_error(file_span)),
+ },
+ _ => Err(PolarsFileType::build_unsupported_error(
+ &ext,
+ &[
+ PolarsFileType::Parquet,
+ PolarsFileType::Csv,
+ PolarsFileType::Arrow,
+ PolarsFileType::NdJson,
+ PolarsFileType::Avro,
+ ],
+ blamed,
+ )),
+ },
+ None => Err(ShellError::FileNotFoundCustom {
+ msg: "File without extension".into(),
+ span: spanned_file.span,
+ }),
+ }?;
+ let file_value = Value::string(format!("saved {:?}", &file_path), file_span);
+
+ Ok(PipelineData::Value(
+ Value::list(vec![file_value], call.head),
+ None,
+ ))
+}
+
+pub(crate) fn polars_file_save_error(e: PolarsError, span: Span) -> ShellError {
+ ShellError::GenericError {
+ error: format!("Error saving file: {e}"),
+ msg: "".into(),
+ span: Some(span),
+ help: None,
+ inner: vec![],
+ }
+}
+
+pub fn unknown_file_save_error(span: Span) -> ShellError {
+ ShellError::GenericError {
+ error: "Could not save file for unknown reason".into(),
+ msg: "".into(),
+ span: Some(span),
+ help: None,
+ inner: vec![],
+ }
+}
+
+#[cfg(test)]
+pub(crate) mod test {
+ use nu_plugin_test_support::PluginTest;
+ use nu_protocol::{Span, Value};
+ use uuid::Uuid;
+
+ use crate::PolarsPlugin;
+
+ fn test_save(cmd: &'static str, extension: &str) -> Result<(), Box<dyn std::error::Error>> {
+ let tmp_dir = tempfile::tempdir()?;
+ let mut tmp_file = tmp_dir.path().to_owned();
+ tmp_file.push(format!("{}.{}", Uuid::new_v4(), extension));
+ let tmp_file_str = tmp_file.to_str().expect("should be able to get file path");
+
+ let cmd = format!("{cmd} {tmp_file_str}");
+ let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?;
+ plugin_test.engine_state_mut().add_env_var(
+ "PWD".to_string(),
+ Value::string(
+ tmp_dir
+ .path()
+ .to_str()
+ .expect("should be able to get path")
+ .to_owned(),
+ Span::test_data(),
+ ),
+ );
+ let pipeline_data = plugin_test.eval(&cmd)?;
+
+ assert!(tmp_file.exists());
+
+ let value = pipeline_data.into_value(Span::test_data())?;
+ let list = value.as_list()?;
+ assert_eq!(list.len(), 1);
+ let msg = list.first().expect("should have a value").as_str()?;
+ assert!(msg.contains("saved"));
+
+ Ok(())
+ }
+
+ pub fn test_lazy_save(extension: &str) -> Result<(), Box<dyn std::error::Error>> {
+ test_save(
+ "[[a b]; [1 2] [3 4]] | polars into-lazy | polars save",
+ extension,
+ )
+ }
+
+ pub fn test_eager_save(extension: &str) -> Result<(), Box<dyn std::error::Error>> {
+ test_save(
+ "[[a b]; [1 2] [3 4]] | polars into-df | polars save",
+ extension,
+ )
+ }
+
+ // #[test]
+ // pub fn test_to_ipc() -> Result<(), Box<dyn std::error::Error>> {
+ // test_sink("ipc")
+ // }
+ //
+ // #[test]
+ // pub fn test_to_csv() -> Result<(), Box<dyn std::error::Error>> {
+ // test_sink("csv")
+ // }
+ //
+ // #[test]
+ // pub fn test_to_json() -> Result<(), Box<dyn std::error::Error>> {
+ // test_sink("ndjson")
+ // }
+}
diff --git a/crates/nu_plugin_polars/src/dataframe/eager/save/ndjson.rs b/crates/nu_plugin_polars/src/dataframe/eager/save/ndjson.rs
new file mode 100644
index 000000000..784010307
--- /dev/null
+++ b/crates/nu_plugin_polars/src/dataframe/eager/save/ndjson.rs
@@ -0,0 +1,63 @@
+use std::{fs::File, io::BufWriter, path::Path};
+
+use nu_plugin::EvaluatedCall;
+use nu_protocol::{ShellError, Span};
+use polars::prelude::{JsonWriter, SerWriter};
+use polars_io::json::JsonWriterOptions;
+
+use crate::values::{NuDataFrame, NuLazyFrame};
+
+use super::polars_file_save_error;
+
+pub(crate) fn command_lazy(
+ _call: &EvaluatedCall,
+ lazy: &NuLazyFrame,
+ file_path: &Path,
+ file_span: Span,
+) -> Result<(), ShellError> {
+ lazy.to_polars()
+ .sink_json(file_path, JsonWriterOptions::default())
+ .map_err(|e| polars_file_save_error(e, file_span))
+}
+
+pub(crate) fn command_eager(
+ df: &NuDataFrame,
+ file_path: &Path,
+ file_span: Span,
+) -> Result<(), ShellError> {
+ let file = File::create(file_path).map_err(|e| ShellError::GenericError {
+ error: format!("Error with file name: {e}"),
+ msg: "".into(),
+ span: Some(file_span),
+ help: None,
+ inner: vec![],
+ })?;
+ let buf_writer = BufWriter::new(file);
+
+ JsonWriter::new(buf_writer)
+ .finish(&mut df.to_polars())
+ .map_err(|e| ShellError::GenericError {
+ error: "Error saving file".into(),
+ msg: e.to_string(),
+ span: Some(file_span),
+ help: None,
+ inner: vec![],
+ })?;
+
+ Ok(())
+}
+
+#[cfg(test)]
+pub mod test {
+ use crate::eager::save::test::{test_eager_save, test_lazy_save};
+
+ #[test]
+ pub fn test_arrow_eager_save() -> Result<(), Box<dyn std::error::Error>> {
+ test_eager_save("ndjson")
+ }
+
+ #[test]
+ pub fn test_arrow_lazy_save() -> Result<(), Box<dyn std::error::Error>> {
+ test_lazy_save("ndjson")
+ }
+}
diff --git a/crates/nu_plugin_polars/src/dataframe/eager/save/parquet.rs b/crates/nu_plugin_polars/src/dataframe/eager/save/parquet.rs
new file mode 100644
index 000000000..7200e2350
--- /dev/null
+++ b/crates/nu_plugin_polars/src/dataframe/eager/save/parquet.rs
@@ -0,0 +1,62 @@
+use std::{fs::File, path::Path};
+
+use nu_plugin::EvaluatedCall;
+use nu_protocol::{ShellError, Span};
+use polars::prelude::ParquetWriter;
+use polars_io::parquet::write::ParquetWriteOptions;
+
+use crate::values::{NuDataFrame, NuLazyFrame};
+
+use super::polars_file_save_error;
+
+pub(crate) fn command_lazy(
+ _call: &EvaluatedCall,
+ lazy: &NuLazyFrame,
+ file_path: &Path,
+ file_span: Span,
+) -> Result<(), ShellError> {
+ lazy.to_polars()
+ .sink_parquet(file_path, ParquetWriteOptions::default())
+ .map_err(|e| polars_file_save_error(e, file_span))
+}
+
+pub(crate) fn command_eager(
+ df: &NuDataFrame,
+ file_path: &Path,
+ file_span: Span,
+) -> Result<(), ShellError> {
+ let file = File::create(file_path).map_err(|e| ShellError::GenericError {
+ error: "Error with file name".into(),
+ msg: e.to_string(),
+ span: Some(file_span),
+ help: None,
+ inner: vec![],
+ })?;
+ let mut polars_df = df.to_polars();
+ ParquetWriter::new(file)
+ .finish(&mut polars_df)
+ .map_err(|e| ShellError::GenericError {
+ error: "Error saving file".into(),
+ msg: e.to_string(),
+ span: Some(file_span),
+ help: None,
+ inner: vec![],
+ })?;
+ Ok(())
+}
+
+#[cfg(test)]
+pub(crate) mod test {
+
+ use crate::eager::save::test::{test_eager_save, test_lazy_save};
+
+ #[test]
+ pub fn test_parquet_eager_save() -> Result<(), Box<dyn std::error::Error>> {
+ test_eager_save("parquet")
+ }
+
+ #[test]
+ pub fn test_parquet_lazy_save() -> Result<(), Box<dyn std::error::Error>> {
+ test_lazy_save("parquet")
+ }
+}
diff --git a/crates/nu_plugin_polars/src/dataframe/eager/to_arrow.rs b/crates/nu_plugin_polars/src/dataframe/eager/to_arrow.rs
deleted file mode 100644
index dfb331ac4..000000000
--- a/crates/nu_plugin_polars/src/dataframe/eager/to_arrow.rs
+++ /dev/null
@@ -1,134 +0,0 @@
-use std::{fs::File, path::PathBuf};
-
-use nu_path::expand_path_with;
-use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
-use nu_protocol::{
- Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape,
- Type, Value,
-};
-use polars::prelude::{IpcWriter, SerWriter};
-
-use crate::PolarsPlugin;
-
-use super::super::values::NuDataFrame;
-
-#[derive(Clone)]
-pub struct ToArrow;
-
-impl PluginCommand for ToArrow {
- type Plugin = PolarsPlugin;
-
- fn name(&self) -> &str {
- "polars to-arrow"
- }
-
- fn usage(&self) -> &str {
- "Saves dataframe to arrow file."
- }
-
- fn signature(&self) -> Signature {
- Signature::build(self.name())
- .required("file", SyntaxShape::Filepath, "file path to save dataframe")
- .input_output_type(Type::Custom("dataframe".into()), Type::Any)
- .category(Category::Custom("dataframe".into()))
- }
-
- fn examples(&self) -> Vec<Example> {
- vec![Example {
- description: "Saves dataframe to arrow file",
- example: "[[a b]; [1 2] [3 4]] | polars into-df | polars to-arrow test.arrow",
- result: None,
- }]
- }
-
- fn run(
- &self,
- plugin: &Self::Plugin,
- engine: &EngineInterface,
- call: &EvaluatedCall,
- input: PipelineData,
- ) -> Result<PipelineData, LabeledError> {
- command(plugin, engine, call, input).map_err(|e| e.into())
- }
-}
-
-fn command(
- plugin: &PolarsPlugin,
- engine: &EngineInterface,
- call: &EvaluatedCall,
- input: PipelineData,
-) -> Result<PipelineData, ShellError> {
- let file_name: Spanned<PathBuf> = call.req(0)?;
- let file_path = expand_path_with(&file_name.item, engine.get_current_dir()?, true);
-
- let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
-
- let mut file = File::create(file_path).map_err(|e| ShellError::GenericError {
- error: "Error with file name".into(),
- msg: e.to_string(),
- span: Some(file_name.span),
- help: None,
- inner: vec![],
- })?;
-
- IpcWriter::new(&mut file)
- .finish(&mut df.to_polars())
- .map_err(|e| ShellError::GenericError {
- error: "Error saving file".into(),
- msg: e.to_string(),
- span: Some(file_name.span),
- help: None,
- inner: vec![],
- })?;
-
- let file_value = Value::string(format!("saved {:?}", &file_name.item), file_name.span);
-
- Ok(PipelineData::Value(
- Value::list(vec![file_value], call.head),
- None,
- ))
-}
-
-#[cfg(test)]
-pub mod test {
- use nu_plugin_test_support::PluginTest;
- use nu_protocol::{Span, Value};
- use uuid::Uuid;
-
- use crate::PolarsPlugin;
-
- #[test]
- pub fn test_to_arrow() -> Result<(), Box<dyn std::error::Error>> {
- let tmp_dir = tempfile::tempdir()?;
- let mut tmp_file = tmp_dir.path().to_owned();
- tmp_file.push(format!("{}.arrow", Uuid::new_v4()));
- let tmp_file_str = tmp_file.to_str().expect("should be able to get file path");
-
- let cmd = format!(
- "[[a b]; [1 2] [3 4]] | polars into-df | polars to-arrow {}",
- tmp_file_str
- );
- let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?;
- plugin_test.engine_state_mut().add_env_var(
- "PWD".to_string(),
- Value::string(
- tmp_dir
- .path()
- .to_str()
- .expect("should be able to get path")
- .to_owned(),
- Span::test_data(),
- ),
- );
- let pipeline_data = plugin_test.eval(&cmd)?;
-
- assert!(tmp_file.exists());
-
- let value = pipeline_data.into_value(Span::test_data())?;
- let list = value.as_list()?;
- assert_eq!(list.len(), 1);
- let msg = list.first().expect("should have a value").as_str()?;
- assert!(msg.contains("saved"));
- Ok(())
- }
-}
diff --git a/crates/nu_plugin_polars/src/dataframe/eager/to_avro.rs b/crates/nu_plugin_polars/src/dataframe/eager/to_avro.rs
deleted file mode 100644
index 3a5dc317e..000000000
--- a/crates/nu_plugin_polars/src/dataframe/eager/to_avro.rs
+++ /dev/null
@@ -1,163 +0,0 @@
-use std::{fs::File, path::PathBuf};
-
-use nu_path::expand_path_with;
-use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
-use nu_protocol::{
- Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape,
- Type, Value,
-};
-use polars_io::avro::{AvroCompression, AvroWriter};
-use polars_io::SerWriter;
-
-use crate::PolarsPlugin;
-
-use super::super::values::NuDataFrame;
-
-#[derive(Clone)]
-pub struct ToAvro;
-
-impl PluginCommand for ToAvro {
- type Plugin = PolarsPlugin;
-
- fn name(&self) -> &str {
- "polars to-avro"
- }
-
- fn usage(&self) -> &str {
- "Saves dataframe to avro file."
- }
-
- fn signature(&self) -> Signature {
- Signature::build(self.name())
- .named(
- "compression",
- SyntaxShape::String,
- "use compression, supports deflate or snappy",
- Some('c'),
- )
- .required("file", SyntaxShape::Filepath, "file path to save dataframe")
- .input_output_type(Type::Custom("dataframe".into()), Type::Any)
- .category(Category::Custom("dataframe".into()))
- }
-
- fn examples(&self) -> Vec<Example> {
- vec![Example {
- description: "Saves dataframe to avro file",
- example: "[[a b]; [1 2] [3 4]] | polars into-df | polars to-avro test.avro",
- result: None,
- }]
- }
-
- fn run(
- &self,
- plugin: &Self::Plugin,
- engine: &EngineInterface,
- call: &EvaluatedCall,
- input: PipelineData,
- ) -> Result<PipelineData, LabeledError> {
- command(plugin, engine, call, input).map_err(LabeledError::from)
- }
-}
-
-fn get_compression(call: &EvaluatedCall) -> Result<Option<AvroCompression>, ShellError> {
- if let Some((compression, span)) = call
- .get_flag_value("compression")
- .map(|e| e.as_str().map(|s| (s.to_owned(), e.span())))
- .transpose()?
- {
- match compression.as_ref() {
- "snappy" => Ok(Some(AvroCompression::Snappy)),
- "deflate" => Ok(Some(AvroCompression::Deflate)),
- _ => Err(ShellError::IncorrectValue {
- msg: "compression must be one of deflate or snappy".to_string(),
- val_span: span,
- call_span: span,
- }),
- }
- } else {
- Ok(None)
- }
-}
-
-fn command(
- plugin: &PolarsPlugin,
- engine: &EngineInterface,
- call: &EvaluatedCall,
- input: PipelineData,
-) -> Result<PipelineData, ShellError> {
- let file_name: Spanned<PathBuf> = call.req(0)?;
- let file_path = expand_path_with(&file_name.item, engine.get_current_dir()?, true);
- let compression = get_compression(call)?;
-
- let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
-
- let file = File::create(file_path).map_err(|e| ShellError::GenericError {
- error: "Error with file name".into(),
- msg: e.to_string(),
- span: Some(file_name.span),
- help: None,
- inner: vec![],
- })?;
-
- AvroWriter::new(file)
- .with_compression(compression)
- .finish(&mut df.to_polars())
- .map_err(|e| ShellError::GenericError {
- error: "Error saving file".into(),
- msg: e.to_string(),
- span: Some(file_name.span),
- help: None,
- inner: vec![],
- })?;
-
- let file_value = Value::string(format!("saved {:?}", &file_name.item), file_name.span);
-
- Ok(PipelineData::Value(
- Value::list(vec![file_value], call.head),
- None,
- ))
-}
-
-#[cfg(test)]
-pub mod test {
- use nu_plugin_test_support::PluginTest;
- use nu_protocol::{Span, Value};
- use uuid::Uuid;
-
- use crate::PolarsPlugin;
-
- #[test]
- pub fn test_to_avro() -> Result<(), Box<dyn std::error::Error>> {
- let tmp_dir = tempfile::tempdir()?;
- let mut tmp_file = tmp_dir.path().to_owned();
- tmp_file.push(format!("{}.avro", Uuid::new_v4()));
- let tmp_file_str = tmp_file.to_str().expect("should be able to get file path");
-
- let cmd = format!(
- "[[a b]; [1 2] [3 4]] | polars into-df | polars to-avro {}",
- tmp_file_str
- );
- let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?;
- plugin_test.engine_state_mut().add_env_var(
- "PWD".to_string(),
- Value::string(
- tmp_dir
- .path()
- .to_str()
- .expect("should be able to get path")
- .to_owned(),
- Span::test_data(),
- ),
- );
- let pipeline_data = plugin_test.eval(&cmd)?;
-
- assert!(tmp_file.exists());
-
- let value = pipeline_data.into_value(Span::test_data())?;
- let list = value.as_list()?;
- assert_eq!(list.len(), 1);
- let msg = list.first().expect("should have a value").as_str()?;
- assert!(msg.contains("saved"));
- Ok(())
- }
-}
diff --git a/crates/nu_plugin_polars/src/dataframe/eager/to_csv.rs b/crates/nu_plugin_polars/src/dataframe/eager/to_csv.rs
deleted file mode 100644
index d55a53f1f..000000000
--- a/crates/nu_plugin_polars/src/dataframe/eager/to_csv.rs
+++ /dev/null
@@ -1,181 +0,0 @@
-use std::{fs::File, path::PathBuf};
-
-use nu_path::expand_path_with;
-use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
-use nu_protocol::{
- Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape,
- Type, Value,
-};
-use polars::prelude::{CsvWriter, SerWriter};
-
-use crate::PolarsPlugin;
-
-use super::super::values::NuDataFrame;
-
-#[derive(Clone)]
-pub struct ToCSV;
-
-impl PluginCommand for ToCSV {
- type Plugin = PolarsPlugin;
-
- fn name(&self) -> &str {
- "polars to-csv"
- }
-
- fn usage(&self) -> &str {
- "Saves dataframe to CSV file."
- }
-
- fn signature(&self) -> Signature {
- Signature::build(self.name())
- .required("file", SyntaxShape::Filepath, "file path to save dataframe")
- .named(
- "delimiter",
- SyntaxShape::String,
- "file delimiter character",
- Some('d'),
- )
- .switch("no-header", "Indicates if file doesn't have header", None)
- .input_output_type(Type::Custom("dataframe".into()), Type::Any)
- .category(Category::Custom("dataframe".into()))
- }
-
- fn examples(&self) -> Vec<Example> {
- vec![
- Example {
- description: "Saves dataframe to CSV file",
- example: "[[a b]; [1 2] [3 4]] | dfr into-df | dfr to-csv test.csv",
- result: None,
- },
- Example {
- description: "Saves dataframe to CSV file using other delimiter",
- example: "[[a b]; [1 2] [3 4]] | dfr into-df | dfr to-csv test.csv --delimiter '|'",
- result: None,
- },
- ]
- }
-
- fn run(
- &self,
- plugin: &Self::Plugin,
- engine: &EngineInterface,
- call: &EvaluatedCall,
- input: PipelineData,
- ) -> Result<PipelineData, LabeledError> {
- command(plugin, engine, call, input).map_err(|e| e.into())
- }
-}
-
-fn command(
- plugin: &PolarsPlugin,
- engine: &EngineInterface,
- call: &EvaluatedCall,
- input: PipelineData,
-) -> Result<PipelineData, ShellError> {
- let file_name: Spanned<PathBuf> = call.req(0)?;
- let file_path = expand_path_with(&file_name.item, engine.get_current_dir()?, true);
- let delimiter: Option<Spanned<String>> = call.get_flag("delimiter")?;
- let no_header: bool = call.has_flag("no-header")?;
-
- let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
-
- let mut file = File::create(file_path).map_err(|e| ShellError::GenericError {
- error: "Error with file name".into(),
- msg: e.to_string(),
- span: Some(file_name.span),
- help: None,
- inner: vec![],
- })?;
-
- let writer = CsvWriter::new(&mut file);
-
- let writer = if no_header {
- writer.include_header(false)
- } else {
- writer.include_header(true)
- };
-
- let mut writer = match delimiter {
- None => writer,
- Some(d) => {
- if d.item.len() != 1 {
- return Err(ShellError::GenericError {
- error: "Incorrect delimiter".into(),
- msg: "Delimiter has to be one char".into(),
- span: Some(d.span),
- help: None,
- inner: vec![],
- });
- } else {
- let delimiter = match d.item.chars().next() {
- Some(d) => d as u8,
- None => unreachable!(),
- };
-
- writer.with_separator(delimiter)
- }
- }
- };
-
- writer
- .finish(&mut df.to_polars())
- .map_err(|e| ShellError::GenericError {
- error: "Error writing to file".into(),
- msg: e.to_string(),
- span: Some(file_name.span),
- help: None,
- inner: vec![],
- })?;
-
- let file_value = Value::string(format!("saved {:?}", &file_name.item), file_name.span);
-
- Ok(PipelineData::Value(
- Value::list(vec![file_value], call.head),
- None,
- ))
-}
-
-#[cfg(test)]
-pub mod test {
- use nu_plugin_test_support::PluginTest;
- use nu_protocol::{Span, Value};
- use uuid::Uuid;
-
- use crate::PolarsPlugin;
-
- #[test]
- pub fn test_to_csv() -> Result<(), Box<dyn std::error::Error>> {
- let tmp_dir = tempfile::tempdir()?;
- let mut tmp_file = tmp_dir.path().to_owned();
- tmp_file.push(format!("{}.csv", Uuid::new_v4()));
- let tmp_file_str = tmp_file.to_str().expect("should be able to get file path");
-
- let cmd = format!(
- "[[a b]; [1 2] [3 4]] | polars into-df | polars to-csv {}",
- tmp_file_str
- );
- println!("cmd: {}", cmd);
- let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?;
- plugin_test.engine_state_mut().add_env_var(
- "PWD".to_string(),
- Value::string(
- tmp_dir
- .path()
- .to_str()
- .expect("should be able to get path")
- .to_owned(),
- Span::test_data(),
- ),
- );
- let pipeline_data = plugin_test.eval(&cmd)?;
-
- assert!(tmp_file.exists());
-
- let value = pipeline_data.into_value(Span::test_data())?;
- let list = value.as_list()?;
- assert_eq!(list.len(), 1);
- let msg = list.first().expect("should have a value").as_str()?;
- assert!(msg.contains("saved"));
- Ok(())
- }
-}
diff --git a/crates/nu_plugin_polars/src/dataframe/eager/to_json_lines.rs b/crates/nu_plugin_polars/src/dataframe/eager/to_json_lines.rs
deleted file mode 100644
index 88b4a61bb..000000000
--- a/crates/nu_plugin_polars/src/dataframe/eager/to_json_lines.rs
+++ /dev/null
@@ -1,135 +0,0 @@
-use std::{fs::File, io::BufWriter, path::PathBuf};
-
-use nu_path::expand_path_with;
-use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
-use nu_protocol::{
- Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape,
- Type, Value,
-};
-use polars::prelude::{JsonWriter, SerWriter};
-
-use crate::PolarsPlugin;
-
-use super::super::values::NuDataFrame;
-
-#[derive(Clone)]
-pub struct ToJsonLines;
-
-impl PluginCommand for ToJsonLines {
- type Plugin = PolarsPlugin;
-
- fn name(&self) -> &str {
- "polars to-jsonl"
- }
-
- fn usage(&self) -> &str {
- "Saves dataframe to a JSON lines file."
- }
-
- fn signature(&self) -> Signature {
- Signature::build(self.name())
- .required("file", SyntaxShape::Filepath, "file path to save dataframe")
- .input_output_type(Type::Custom("dataframe".into()), Type::Any)
- .category(Category::Custom("dataframe".into()))
- }
-
- fn examples(&self) -> Vec<Example> {
- vec![Example {
- description: "Saves dataframe to JSON lines file",
- example: "[[a b]; [1 2] [3 4]] | polars into-df | polars to-jsonl test.jsonl",
- result: None,
- }]
- }
-
- fn run(
- &self,
- plugin: &Self::Plugin,
- engine: &EngineInterface,
- call: &EvaluatedCall,
- input: PipelineData,
- ) -> Result<PipelineData, LabeledError> {
- command(plugin, engine, call, input).map_err(LabeledError::from)
- }
-}
-
-fn command(
- plugin: &PolarsPlugin,
- engine: &EngineInterface,
- call: &EvaluatedCall,
- input: PipelineData,
-) -> Result<PipelineData, ShellError> {
- let file_name: Spanned<PathBuf> = call.req(0)?;
- let file_path = expand_path_with(&file_name.item, engine.get_current_dir()?, true);
-
- let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
-
- let file = File::create(file_path).map_err(|e| ShellError::GenericError {
- error: "Error with file name".into(),
- msg: e.to_string(),
- span: Some(file_name.span),
- help: None,
- inner: vec![],
- })?;
- let buf_writer = BufWriter::new(file);
-
- JsonWriter::new(buf_writer)
- .finish(&mut df.to_polars())
- .map_err(|e| ShellError::GenericError {
- error: "Error saving file".into(),
- msg: e.to_string(),
- span: Some(file_name.span),
- help: None,
- inner: vec![],
- })?;
-
- let file_value = Value::string(format!("saved {:?}", &file_name.item), file_name.span);
-
- Ok(PipelineData::Value(
- Value::list(vec![file_value], call.head),
- None,
- ))
-}
-
-#[cfg(test)]
-pub mod test {
- use nu_plugin_test_support::PluginTest;
- use nu_protocol::{Span, Value};
- use uuid::Uuid;
-
- use crate::PolarsPlugin;
-
- #[test]
- pub fn test_to_jsonl() -> Result<(), Box<dyn std::error::Error>> {
- let tmp_dir = tempfile::tempdir()?;
- let mut tmp_file = tmp_dir.path().to_owned();
- tmp_file.push(format!("{}.jsonl", Uuid::new_v4()));
- let tmp_file_str = tmp_file.to_str().expect("should be able to get file path");
-
- let cmd = format!(
- "[[a b]; [1 2] [3 4]] | polars into-df | polars to-jsonl {}",
- tmp_file_str
- );
- let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?;
- plugin_test.engine_state_mut().add_env_var(
- "PWD".to_string(),
- Value::string(
- tmp_dir
- .path()
- .to_str()
- .expect("should be able to get path")
- .to_owned(),
- Span::test_data(),
- ),
- );
- let pipeline_data = plugin_test.eval(&cmd)?;
-
- assert!(tmp_file.exists());
-
- let value = pipeline_data.into_value(Span::test_data())?;
- let list = value.as_list()?;
- assert_eq!(list.len(), 1);
- let msg = list.first().expect("should have a value").as_str()?;
- assert!(msg.contains("saved"));
- Ok(())
- }
-}
diff --git a/crates/nu_plugin_polars/src/dataframe/eager/to_parquet.rs b/crates/nu_plugin_polars/src/dataframe/eager/to_parquet.rs
deleted file mode 100644
index 4a8208ae1..000000000
--- a/crates/nu_plugin_polars/src/dataframe/eager/to_parquet.rs
+++ /dev/null
@@ -1,135 +0,0 @@
-use std::{fs::File, path::PathBuf};
-
-use nu_path::expand_path_with;
-use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
-use nu_protocol::{
- Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape,
- Type, Value,
-};
-use polars::prelude::ParquetWriter;
-
-use crate::PolarsPlugin;
-
-use super::super::values::NuDataFrame;
-
-#[derive(Clone)]
-pub struct ToParquet;
-
-impl PluginCommand for ToParquet {
- type Plugin = PolarsPlugin;
-
- fn name(&self) -> &str {
- "polars to-parquet"
- }
-
- fn usage(&self) -> &str {
- "Saves dataframe to parquet file."
- }
-
- fn signature(&self) -> Signature {
- Signature::build(self.name())
- .required("file", SyntaxShape::Filepath, "file path to save dataframe")
- .input_output_type(Type::Custom("dataframe".into()), Type::Any)
- .category(Category::Custom("dataframe".into()))
- }
-
- fn examples(&self) -> Vec<Example> {
- vec![Example {
- description: "Saves dataframe to parquet file",
- example: "[[a b]; [1 2] [3 4]] | polars into-df | polars to-parquet test.parquet",
- result: None,
- }]
- }
-
- fn run(
- &self,
- plugin: &Self::Plugin,
- engine: &EngineInterface,
- call: &EvaluatedCall,
- input: PipelineData,
- ) -> Result<PipelineData, LabeledError> {
- command(plugin, engine, call, input).map_err(LabeledError::from)
- }
-}
-
-fn command(
- plugin: &PolarsPlugin,
- engine: &EngineInterface,
- call: &EvaluatedCall,
- input: PipelineData,
-) -> Result<PipelineData, ShellError> {
- let file_name: Spanned<PathBuf> = call.req(0)?;
- let file_path = expand_path_with(&file_name.item, engine.get_current_dir()?, true);
-
- let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
-
- let file = File::create(file_path).map_err(|e| ShellError::GenericError {
- error: "Error with file name".into(),
- msg: e.to_string(),
- span: Some(file_name.span),
- help: None,
- inner: vec![],
- })?;
- let mut polars_df = df.to_polars();
- ParquetWriter::new(file)
- .finish(&mut polars_df)
- .map_err(|e| ShellError::GenericError {
- error: "Error saving file".into(),
- msg: e.to_string(),
- span: Some(file_name.span),
- help: None,
- inner: vec![],
- })?;
-
- let file_value = Value::string(format!("saved {:?}", &file_name.item), file_name.span);
-
- Ok(PipelineData::Value(
- Value::list(vec![file_value], call.head),
- None,
- ))
-}
-
-#[cfg(test)]
-pub mod test {
- use nu_plugin_test_support::PluginTest;
- use nu_protocol::{Span, Value};
- use uuid::Uuid;
-
- use crate::PolarsPlugin;
-
- #[test]
- pub fn test_to_parquet() -> Result<(), Box<dyn std::error::Error>> {
- let tmp_dir = tempfile::tempdir()?;
- let mut tmp_file = tmp_dir.path().to_owned();
- tmp_file.push(format!("{}.parquet", Uuid::new_v4()));
- let tmp_file_str = tmp_file.to_str().expect("should be able to get file path");
-
- let cmd = format!(
- "[[a b]; [1 2] [3 4]] | polars into-df | polars to-parquet {}",
- tmp_file_str
- );
- let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?;
- plugin_test.engine_state_mut().add_env_var(
- "PWD".to_string(),
- Value::string(
- tmp_dir
- .path()
- .to_str()
- .expect("should be able to get path")
- .to_owned(),
- Span::test_data(),
- ),
- );
- let pipeline_data = plugin_test.eval(&cmd)?;
-
- assert!(tmp_file.exists());
-
- let value = pipeline_data.into_value(Span::test_data())?;
- let list = value.as_list()?;
- assert_eq!(list.len(), 1);
- let msg = list.first().expect("should have a value").as_str()?;
- assert!(msg.contains("saved"));
-
- Ok(())
- }
-}
diff --git a/crates/nu_plugin_polars/src/dataframe/lazy/mod.rs b/crates/nu_plugin_polars/src/dataframe/lazy/mod.rs
index e1944e06a..e70143e6c 100644
--- a/crates/nu_plugin_polars/src/dataframe/lazy/mod.rs
+++ b/crates/nu_plugin_polars/src/dataframe/lazy/mod.rs
@@ -12,7 +12,6 @@ mod macro_commands;
mod median;
mod quantile;
mod select;
-mod sink;
mod sort_by_expr;
mod to_lazy;
@@ -54,6 +53,5 @@ pub(crate) fn lazy_commands() -> Vec<Box<dyn PluginCommand<Plugin = PolarsPlugin
Box::new(LazyQuantile),
Box::new(ToLazyFrame),
Box::new(ToLazyGroupBy),
- Box::new(sink::Sink),
]
}
diff --git a/crates/nu_plugin_polars/src/dataframe/lazy/sink.rs b/crates/nu_plugin_polars/src/dataframe/lazy/sink.rs
deleted file mode 100644
index 0fdf0200f..000000000
--- a/crates/nu_plugin_polars/src/dataframe/lazy/sink.rs
+++ /dev/null
@@ -1,205 +0,0 @@
-use std::path::PathBuf;
-
-use crate::{
- values::{cant_convert_err, PolarsPluginObject, PolarsPluginType},
- PolarsPlugin,
-};
-
-use super::super::values::NuLazyFrame;
-
-use nu_path::expand_path_with;
-use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
-use nu_protocol::{
- Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, Spanned,
- SyntaxShape, Type, Value,
-};
-use polars::error::PolarsError;
-use polars_io::{
- csv::write::CsvWriterOptions, ipc::IpcWriterOptions, json::JsonWriterOptions,
- parquet::write::ParquetWriteOptions,
-};
-
-#[derive(Clone)]
-pub struct Sink;
-
-impl PluginCommand for Sink {
- type Plugin = PolarsPlugin;
-
- fn name(&self) -> &str {
- "polars sink"
- }
-
- fn usage(&self) -> &str {
- "Streams a collect result to a file. This is useful if the result is too large for memory. Supports parquet, ipc/arrow, csv, and json formats."
- }
-
- fn signature(&self) -> Signature {
- Signature::build(self.name())
- .required("path", SyntaxShape::Filepath, "Path to write to.")
- .named(
- "type",
- SyntaxShape::String,
- "File type: csv, json, parquet, arrow/ipc. If omitted, derive from file extension",
- Some('t'),
- )
- .input_output_type(Type::Any, Type::String)
- .category(Category::Custom("lazyframe".into()))
- }
-
- fn examples(&self) -> Vec<Example> {
- vec![Example {
- description: "Collect and save the output to the specified file",
- example: "[[a b];[1 2] [3 4]] | polars into-lazy | polars sink /tmp/foo.parquet",
- result: None,
- }]
- }
-
- fn run(
- &self,
- plugin: &Self::Plugin,
- engine: &EngineInterface,
- call: &EvaluatedCall,
- input: PipelineData,
- ) -> Result<PipelineData, LabeledError> {
- let value = input.into_value(call.head)?;
-
- match PolarsPluginObject::try_from_value(plugin, &value)? {
- PolarsPluginObject::NuDataFrame(df) => command(plugin, engine, call, df.lazy()),
- PolarsPluginObject::NuLazyFrame(lazy) => command(plugin, engine, call, lazy),
- _ => Err(cant_convert_err(
- &value,
- &[PolarsPluginType::NuDataFrame, PolarsPluginType::NuLazyFrame],
- )),
- }
- .map_err(LabeledError::from)
- }
-}
-
-fn command(
- _plugin: &PolarsPlugin,
- engine: &EngineInterface,
- call: &EvaluatedCall,
- lazy: NuLazyFrame,
-) -> Result<PipelineData, ShellError> {
- let spanned_file: Spanned<PathBuf> = call.req(0)?;
- let file_path = expand_path_with(&spanned_file.item, engine.get_current_dir()?, true);
- let file_span = spanned_file.span;
- let type_option: Option<Spanned<String>> = call.get_flag("type")?;
- let type_id = match &type_option {
- Some(ref t) => Some((t.item.to_owned(), "Invalid type", t.span)),
- None => file_path.extension().map(|e| {
- (
- e.to_string_lossy().into_owned(),
- "Invalid extension",
- spanned_file.span,
- )
- }),
- };
-
- let polars_df = lazy.to_polars();
-
- match type_id {
- Some((e, msg, blamed)) => match e.as_str() {
- "parquet" | "parq" => polars_df
- .sink_parquet(&file_path, ParquetWriteOptions::default())
- .map_err(|e| file_save_error(e, file_span))?,
- "csv" => polars_df
- .sink_csv(&file_path, CsvWriterOptions::default())
- .map_err(|e| file_save_error(e, file_span))?,
- "ipc" | "arrow" => polars_df
- .sink_ipc(&file_path, IpcWriterOptions::default())
- .map_err(|e| file_save_error(e, file_span))?,
- "json" | "jsonl" | "ndjson" => polars_df
- .sink_json(&file_path, JsonWriterOptions::default())
- .map_err(|e| file_save_error(e, file_span))?,
- _ => Err(ShellError::FileNotFoundCustom {
- msg: format!("{msg}. Supported values: csv, tsv, parquet, ipc, arrow, json, jsonl"),
- span: blamed,
- })?,
- },
- None => Err(ShellError::FileNotFoundCustom {
- msg: "File without extension".into(),
- span: spanned_file.span,
- })?,
- };
- let file_value = Value::string(format!("saved {:?}", &file_path), file_span);
-
- Ok(PipelineData::Value(
- Value::list(vec![file_value], call.head),
- None,
- ))
-}
-
-fn file_save_error(e: PolarsError, span: Span) -> ShellError {
- ShellError::GenericError {
- error: "Error saving file".into(),
- msg: e.to_string(),
- span: Some(span),
- help: None,
- inner: vec![],
- }
-}
-
-#[cfg(test)]
-pub mod test {
- use nu_plugin_test_support::PluginTest;
- use nu_protocol::{Span, Value};
- use uuid::Uuid;
-
- use crate::PolarsPlugin;
-
- pub fn test_sink(extension: &str) -> Result<(), Box<dyn std::error::Error>> {
- let tmp_dir = tempfile::tempdir()?;
- let mut tmp_file = tmp_dir.path().to_owned();
- tmp_file.push(format!("{}.{}", Uuid::new_v4(), extension));
- let tmp_file_str = tmp_file.to_str().expect("should be able to get file path");
-
- let cmd = format!(
- "[[a b]; [1 2] [3 4]] | polars into-lazy | polars sink {}",
- tmp_file_str
- );
- let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?;
- plugin_test.engine_state_mut().add_env_var(
- "PWD".to_string(),
- Value::string(
- tmp_dir
- .path()
- .to_str()
- .expect("should be able to get path")
- .to_owned(),
- Span::test_data(),
- ),
- );
- let pipeline_data = plugin_test.eval(&cmd)?;
-
- assert!(tmp_file.exists());
-
- let value = pipeline_data.into_value(Span::test_data())?;
- let list = value.as_list()?;
- assert_eq!(list.len(), 1);
- let msg = list.first().expect("should have a value").as_str()?;
- assert!(msg.contains("saved"));
-
- Ok(())
- }
-
- #[test]
- pub fn test_to_parquet() -> Result<(), Box<dyn std::error::Error>> {
- test_sink("parquet")
- }
-
- #[test]
- pub fn test_to_ipc() -> Result<(), Box<dyn std::error::Error>> {
- test_sink("ipc")
- }
-
- #[test]
- pub fn test_to_csv() -> Result<(), Box<dyn std::error::Error>> {
- test_sink("csv")
- }
-
- #[test]
- pub fn test_to_json() -> Result<(), Box<dyn std::error::Error>> {
- test_sink("ndjson")
- }
-}
diff --git a/crates/nu_plugin_polars/src/dataframe/values/file_type.rs b/crates/nu_plugin_polars/src/dataframe/values/file_type.rs
new file mode 100644
index 000000000..c46fcd711
--- /dev/null
+++ b/crates/nu_plugin_polars/src/dataframe/values/file_type.rs
@@ -0,0 +1,59 @@
+use nu_protocol::{ShellError, Span};
+
+pub enum PolarsFileType {
+ Csv,
+ Tsv,
+ Parquet,
+ Arrow,
+ Json,
+ Avro,
+ NdJson,
+ Unknown,
+}
+
+impl PolarsFileType {
+ pub fn build_unsupported_error(
+ extension: &str,
+ supported_types: &[PolarsFileType],
+ span: Span,
+ ) -> ShellError {
+ let type_string = supported_types
+ .iter()
+ .map(|ft| ft.to_str())
+ .collect::<Vec<&'static str>>()
+ .join(", ");
+
+ ShellError::FileNotFoundCustom {
+ msg: format!("Unsupported type {extension} expected {type_string}"),
+ span,
+ }
+ }
+
+ pub fn to_str(&self) -> &'static str {
+ match self {
+ PolarsFileType::Csv => "csv",
+ PolarsFileType::Tsv => "tsv",
+ PolarsFileType::Parquet => "parquet",
+ PolarsFileType::Arrow => "arrow",
+ PolarsFileType::Json => "json",
+ PolarsFileType::Avro => "avro",
+ PolarsFileType::NdJson => "ndjson",
+ PolarsFileType::Unknown => "unknown",
+ }
+ }
+}
+
+impl From<&str> for PolarsFileType {
+ fn from(file_type: &str) -> Self {
+ match file_type {
+ "csv" => PolarsFileType::Csv,
+ "tsv" => PolarsFileType::Tsv,
+ "parquet" | "parq" => PolarsFileType::Parquet,
+ "ipc" | "arrow" => PolarsFileType::Arrow,
+ "json" => PolarsFileType::Json,
+ "avro" => PolarsFileType::Avro,
+ "jsonl" | "ndjson" => PolarsFileType::NdJson,
+ _ => PolarsFileType::Unknown,
+ }
+ }
+}
diff --git a/crates/nu_plugin_polars/src/dataframe/values/mod.rs b/crates/nu_plugin_polars/src/dataframe/values/mod.rs
index ec59304ea..41a119cdd 100644
--- a/crates/nu_plugin_polars/src/dataframe/values/mod.rs
+++ b/crates/nu_plugin_polars/src/dataframe/values/mod.rs
@@ -1,3 +1,4 @@
+mod file_type;
mod nu_dataframe;
mod nu_expression;
mod nu_lazyframe;
@@ -8,6 +9,7 @@ pub mod utils;
use std::{cmp::Ordering, fmt};
+pub use file_type::PolarsFileType;
pub use nu_dataframe::{Axis, Column, NuDataFrame, NuDataFrameCustomValue};
pub use nu_expression::{NuExpression, NuExpressionCustomValue};
pub use nu_lazyframe::{NuLazyFrame, NuLazyFrameCustomValue};
@@ -125,6 +127,20 @@ impl PolarsPluginObject {
}
}
}
+
+ pub fn dataframe(&self) -> Option<&NuDataFrame> {
+ match self {
+ PolarsPluginObject::NuDataFrame(df) => Some(df),
+ _ => None,
+ }
+ }
+
+ pub fn lazyframe(&self) -> Option<&NuLazyFrame> {
+ match self {
+ PolarsPluginObject::NuLazyFrame(lf) => Some(lf),
+ _ => None,
+ }
+ }
}
#[derive(Debug, Clone)]