diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 8de3224c9138b..197be2631a055 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1353,6 +1353,7 @@ message RepartitionExecNode{ // uint64 unknown = 4; // } Partitioning partitioning = 5; + bool preserve_order = 6; } message Partitioning { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 2b8089f5632d2..7a4964c4c49f1 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -20929,6 +20929,9 @@ impl serde::Serialize for RepartitionExecNode { if self.partitioning.is_some() { len += 1; } + if self.preserve_order { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.RepartitionExecNode", len)?; if let Some(v) = self.input.as_ref() { struct_ser.serialize_field("input", v)?; @@ -20936,6 +20939,9 @@ impl serde::Serialize for RepartitionExecNode { if let Some(v) = self.partitioning.as_ref() { struct_ser.serialize_field("partitioning", v)?; } + if self.preserve_order { + struct_ser.serialize_field("preserveOrder", &self.preserve_order)?; + } struct_ser.end() } } @@ -20948,12 +20954,15 @@ impl<'de> serde::Deserialize<'de> for RepartitionExecNode { const FIELDS: &[&str] = &[ "input", "partitioning", + "preserve_order", + "preserveOrder", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { Input, Partitioning, + PreserveOrder, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -20977,6 +20986,7 @@ impl<'de> serde::Deserialize<'de> for RepartitionExecNode { match value { "input" => Ok(GeneratedField::Input), "partitioning" => Ok(GeneratedField::Partitioning), + "preserveOrder" | "preserve_order" => Ok(GeneratedField::PreserveOrder), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -20998,6 +21008,7 @@ impl<'de> serde::Deserialize<'de> for RepartitionExecNode { { let mut input__ = None; let mut partitioning__ = None; + let mut preserve_order__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Input => { @@ -21012,11 +21023,18 @@ impl<'de> serde::Deserialize<'de> for RepartitionExecNode { } partitioning__ = map_.next_value()?; } + GeneratedField::PreserveOrder => { + if preserve_order__.is_some() { + return Err(serde::de::Error::duplicate_field("preserveOrder")); + } + preserve_order__ = Some(map_.next_value()?); + } } } Ok(RepartitionExecNode { input: input__, partitioning: partitioning__, + preserve_order: preserve_order__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 6daf81bf1a52e..c708c61328c37 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2001,6 +2001,8 @@ pub struct RepartitionExecNode { /// } #[prost(message, optional, tag = "5")] pub partitioning: ::core::option::Option, + #[prost(bool, tag = "6")] + pub preserve_order: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Partitioning { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 85406e31da614..99cd39d9f2bef 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1011,10 +1011,11 @@ impl protobuf::PhysicalPlanNode { codec, proto_converter, )?; - Ok(Arc::new(RepartitionExec::try_new( - input, - partitioning.unwrap(), - )?)) + let mut repart_exec = RepartitionExec::try_new(input, partitioning.unwrap())?; + if repart.preserve_order { + repart_exec = repart_exec.with_preserve_order(); + } + Ok(Arc::new(repart_exec)) } fn try_into_global_limit_physical_plan( @@ -3056,6 +3057,7 @@ impl protobuf::PhysicalPlanNode { protobuf::RepartitionExecNode { input: Some(Box::new(input)), partitioning: Some(pb_partitioning), + preserve_order: exec.preserve_order(), }, ))), }) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 66ca903e4ec8a..31626ef9fdada 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1781,6 +1781,35 @@ fn roundtrip_union() -> Result<()> { roundtrip_test(union) } +#[test] +fn roundtrip_repartition_preserve_order() -> Result<()> { + let field_a = Field::new("a", DataType::Int64, false); + let schema = Arc::new(Schema::new(vec![field_a])); + let sort_exprs: LexOrdering = [PhysicalSortExpr { + expr: col("a", &schema)?, + options: SortOptions::default(), + }] + .into(); + + // Create two sorted single-partition inputs, then union them to get + // a sorted input with 2 partitions. + let source1 = SortExec::new( + sort_exprs.clone(), + Arc::new(EmptyExec::new(Arc::clone(&schema))), + ); + let source2 = SortExec::new(sort_exprs, Arc::new(EmptyExec::new(schema))); + let union = UnionExec::try_new(vec![ + Arc::new(source1) as Arc, + Arc::new(source2) as Arc, + ])?; + + let repartition = RepartitionExec::try_new(union, Partitioning::RoundRobinBatch(10))? + .with_preserve_order(); + assert!(repartition.preserve_order()); + + roundtrip_test(Arc::new(repartition)) +} + #[test] fn roundtrip_interleave() -> Result<()> { let field_a = Field::new("col", DataType::Int64, false);