Skip to content

Commit 32502f7

Browse files
committed
chore: converting more "uri" to actual Url 🔥
In this commit I am intentionally introducing the convention in our Rust APIs: * anything named `uri` is expected to take a `AsRef<str>` type which is expected to turn into a `Url` * anything named `url` is expected to take a `Url` type. As such I have renamed a number of our APIs which previously had been converted to use `Url` but were referring to them as `uri` Signed-off-by: R. Tyler Croy <[email protected]>
1 parent f436b61 commit 32502f7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+365
-285
lines changed

crates/aws/src/logstore/dynamodb_logstore.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,6 @@ impl LogStore for S3DynamoDbLogStore {
183183
"S3DynamoDbLogStore".into()
184184
}
185185

186-
fn root_uri(&self) -> String {
187-
self.table_path.clone()
188-
}
189-
190186
async fn refresh(&self) -> DeltaResult<()> {
191187
let entry = self
192188
.lock_client

crates/benchmarks/src/merge.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ pub async fn prepare_source_and_table(
443443

444444
let batches = parquet_df.collect().await?;
445445
let fields: Vec<StructField> = delta_schema.fields().cloned().collect();
446-
let table = DeltaOps::try_from_uri(temp_table_url)
446+
let table = DeltaOps::try_from_url(temp_table_url)
447447
.await?
448448
.create()
449449
.with_columns(fields)

crates/benchmarks/src/smoke.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub async fn run_smoke_once(table_url: &Url, params: &SmokeParams) -> DeltaResul
3535
],
3636
)?;
3737

38-
let table = DeltaOps::try_from_uri(table_url.clone())
38+
let table = DeltaOps::try_from_url(table_url.clone())
3939
.await?
4040
.write(vec![batch])
4141
.with_save_mode(SaveMode::Overwrite)

crates/benchmarks/src/tpcds_queries.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ pub async fn register_tpcds_tables(
7171

7272
let batches = parquet_df.collect().await?;
7373
let fields: Vec<StructField> = delta_schema.fields().cloned().collect();
74-
let table = DeltaOps::try_from_uri(temp_table_url)
74+
let table = DeltaOps::try_from_url(temp_table_url)
7575
.await?
7676
.create()
7777
.with_columns(fields)

crates/catalog-unity/src/datafusion.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ impl SchemaProvider for UnitySchemaProvider {
237237
})?;
238238
let table_url = ensure_table_uri(&table.storage_location)
239239
.map_err(|e| DataFusionError::External(Box::new(e)))?;
240-
let table = DeltaTableBuilder::from_uri(table_url)
240+
let table = DeltaTableBuilder::from_url(table_url)
241241
.map_err(|e| DataFusionError::External(Box::new(e)))?
242242
.with_storage_options(new_storage_opts)
243243
.load()

crates/catalog-unity/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -860,7 +860,7 @@ impl ObjectStoreFactory for UnityCatalogFactory {
860860
// TODO(roeap): we should not have to go through the table here.
861861
// ideally we just create the right storage ...
862862
let table_url = ensure_table_uri(&table_path)?;
863-
let mut builder = DeltaTableBuilder::from_uri(table_url)?;
863+
let mut builder = DeltaTableBuilder::from_url(table_url)?;
864864

865865
if let Some(runtime) = &config.runtime {
866866
builder = builder.with_io_runtime(runtime.clone());

crates/core/PERF.adoc

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
2+
= Build performance
3+
4+
== Nightly
5+
6+
=== threads=8
7+
8+
9+
[source]
10+
----
11+
RUSTFLAGS="-Z threads=8" cargo +nightly build 54.90s user 12.40s system 90% cpu 1:14.54 total
12+
----
13+
14+
=== basic
15+
16+
[source]
17+
----
18+
cargo +nightly build 45.63s user 9.93s system 70% cpu 1:18.59 total
19+
----
20+
21+
22+
=== threads=8 with datafusion
23+
24+
[source]
25+
----
26+
RUSTFLAGS="-Z threads=8" cargo +nightly build --features datafusion 109.25s user 17.27s system 72% cpu 2:53.74 total
27+
----
28+
29+
30+
=== no threads
31+
32+
[source]
33+
----
34+
cargo +nightly build --features datafusion 80.02s user 13.46s system 41% cpu 3:47.94 total
35+
----
36+
37+
38+
[source]
39+
----
40+
RUSTFLAGS="-Zcodegen-backend=cranelift" cargo +nightly build --features 58.19s user 12.43s system 38% cpu 3:03.59 total
41+
----
42+
43+
[source]
44+
----
45+
RUSTFLAGS="-Zthreads=8 -Zcodegen-backend=cranelift" cargo +nightly build 78.85s user 14.81s system 51% cpu 3:02.71 total
46+
47+
----

crates/core/src/delta_datafusion/mod.rs

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -692,13 +692,12 @@ impl PhysicalExtensionCodec for DeltaPhysicalCodec {
692692
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
693693
let wire: DeltaScanWire = serde_json::from_reader(buf)
694694
.map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?;
695-
let delta_scan = DeltaScan {
696-
table_uri: wire.table_uri,
697-
parquet_scan: (*inputs)[0].clone(),
698-
config: wire.config,
699-
logical_schema: wire.logical_schema,
700-
metrics: ExecutionPlanMetricsSet::new(),
701-
};
695+
let delta_scan = DeltaScan::new(
696+
&wire.table_url,
697+
wire.config,
698+
(*inputs)[0].clone(),
699+
wire.logical_schema,
700+
);
702701
Ok(Arc::new(delta_scan))
703702
}
704703

@@ -712,11 +711,7 @@ impl PhysicalExtensionCodec for DeltaPhysicalCodec {
712711
.downcast_ref::<DeltaScan>()
713712
.ok_or_else(|| DataFusionError::Internal("Not a delta scan!".to_string()))?;
714713

715-
let wire = DeltaScanWire {
716-
table_uri: delta_scan.table_uri.to_owned(),
717-
config: delta_scan.config.clone(),
718-
logical_schema: delta_scan.logical_schema.clone(),
719-
};
714+
let wire = DeltaScanWire::from(delta_scan);
720715
serde_json::to_writer(buf, &wire)
721716
.map_err(|_| DataFusionError::Internal("Unable to encode delta scan!".to_string()))?;
722717
Ok(())
@@ -867,7 +862,7 @@ mod tests {
867862
};
868863
use serde_json::json;
869864
use std::fmt::{self, Debug, Display, Formatter};
870-
use std::ops::{Deref, Range};
865+
use std::ops::Range;
871866
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
872867

873868
use super::*;
@@ -1140,13 +1135,12 @@ mod tests {
11401135
Field::new("a", ArrowDataType::Utf8, false),
11411136
Field::new("b", ArrowDataType::Int32, false),
11421137
]));
1143-
let exec_plan = Arc::from(DeltaScan {
1144-
table_uri: "s3://my_bucket/this/is/some/path".to_string(),
1145-
parquet_scan: Arc::from(EmptyExec::new(schema.clone())),
1146-
config: DeltaScanConfig::default(),
1147-
logical_schema: schema.clone(),
1148-
metrics: ExecutionPlanMetricsSet::new(),
1149-
});
1138+
let exec_plan = Arc::from(DeltaScan::new(
1139+
&Url::parse("s3://my_bucket/this/is/some/path").unwrap(),
1140+
DeltaScanConfig::default(),
1141+
Arc::from(EmptyExec::new(schema.clone())),
1142+
schema.clone(),
1143+
));
11501144
let proto: protobuf::PhysicalPlanNode =
11511145
protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
11521146
.expect("to proto");

crates/core/src/delta_datafusion/table_provider.rs

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use futures::StreamExt as _;
4848
use itertools::Itertools;
4949
use object_store::ObjectMeta;
5050
use serde::{Deserialize, Serialize};
51+
use url::Url;
5152

5253
use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory;
5354
use crate::delta_datafusion::{
@@ -60,8 +61,8 @@ use crate::kernel::{Action, Add, EagerSnapshot, Remove};
6061
use crate::operations::write::WriterStatsConfig;
6162
use crate::operations::write::writer::{DeltaWriter, WriterConfig};
6263
use crate::protocol::{DeltaOperation, SaveMode};
63-
use crate::{DeltaResult, DeltaTableError, logstore::LogStoreRef};
64-
use crate::{DeltaTable, ensure_table_uri};
64+
use crate::table::normalize_table_url;
65+
use crate::{DeltaResult, DeltaTable, DeltaTableError, logstore::LogStoreRef};
6566

6667
const PATH_COLUMN: &str = "__delta_rs_path";
6768

@@ -690,7 +691,7 @@ impl<'a> DeltaScanBuilder<'a> {
690691
.add(files_pruned);
691692

692693
Ok(DeltaScan {
693-
table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(),
694+
table_url: self.log_store.root_url().clone(),
694695
parquet_scan: DataSourceExec::from_data_source(file_scan_config),
695696
config,
696697
logical_schema,
@@ -883,23 +884,52 @@ impl TableProvider for DeltaTableProvider {
883884
/// A wrapper for parquet scans
884885
#[derive(Debug)]
885886
pub struct DeltaScan {
886-
/// The URL of the ObjectStore root
887-
pub table_uri: String,
887+
/// The normalized [Url] of the ObjectStore root
888+
table_url: Url,
888889
/// Column that contains an index that maps to the original metadata Add
889-
pub config: DeltaScanConfig,
890+
pub(crate) config: DeltaScanConfig,
890891
/// The parquet scan to wrap
891-
pub parquet_scan: Arc<dyn ExecutionPlan>,
892+
pub(crate) parquet_scan: Arc<dyn ExecutionPlan>,
892893
/// The schema of the table to be used when evaluating expressions
893-
pub logical_schema: Arc<Schema>,
894+
pub(crate) logical_schema: Arc<Schema>,
894895
/// Metrics for scan reported via DataFusion
895-
pub(super) metrics: ExecutionPlanMetricsSet,
896+
metrics: ExecutionPlanMetricsSet,
897+
}
898+
899+
impl DeltaScan {
900+
pub(crate) fn new(
901+
table_url: &Url,
902+
config: DeltaScanConfig,
903+
parquet_scan: Arc<dyn ExecutionPlan>,
904+
logical_schema: Arc<Schema>,
905+
) -> Self {
906+
Self {
907+
table_url: normalize_table_url(table_url),
908+
metrics: ExecutionPlanMetricsSet::new(),
909+
config,
910+
parquet_scan,
911+
logical_schema,
912+
}
913+
}
896914
}
897915

916+
#[non_exhaustive]
898917
#[derive(Debug, Serialize, Deserialize)]
899918
pub(super) struct DeltaScanWire {
900-
pub table_uri: String,
901-
pub config: DeltaScanConfig,
902-
pub logical_schema: Arc<Schema>,
919+
/// This [Url] should have already been passed through [normalize_table_url]
920+
pub(crate) table_url: Url,
921+
pub(crate) config: DeltaScanConfig,
922+
pub(crate) logical_schema: Arc<Schema>,
923+
}
924+
925+
impl From<&DeltaScan> for DeltaScanWire {
926+
fn from(scan: &DeltaScan) -> Self {
927+
Self {
928+
table_url: scan.table_url.clone(),
929+
config: scan.config.clone(),
930+
logical_schema: scan.logical_schema.clone(),
931+
}
932+
}
903933
}
904934

905935
impl DisplayAs for DeltaScan {
@@ -940,7 +970,7 @@ impl ExecutionPlan for DeltaScan {
940970
)));
941971
}
942972
Ok(Arc::new(DeltaScan {
943-
table_uri: self.table_uri.clone(),
973+
table_url: self.table_url.clone(),
944974
config: self.config.clone(),
945975
parquet_scan: children[0].clone(),
946976
logical_schema: self.logical_schema.clone(),
@@ -955,7 +985,7 @@ impl ExecutionPlan for DeltaScan {
955985
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
956986
if let Some(parquet_scan) = self.parquet_scan.repartitioned(target_partitions, config)? {
957987
Ok(Some(Arc::new(DeltaScan {
958-
table_uri: self.table_uri.clone(),
988+
table_url: self.table_url.clone(),
959989
config: self.config.clone(),
960990
parquet_scan,
961991
logical_schema: self.logical_schema.clone(),

crates/core/src/kernel/transaction/application.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ mod tests {
1919
let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
2020

2121
let batch = get_record_batch(None, false);
22-
let table = DeltaOps::try_from_uri(ensure_table_uri(tmp_path.to_str().unwrap()).unwrap())
22+
let table = DeltaOps::try_from_url(ensure_table_uri(tmp_path.to_str().unwrap()).unwrap())
2323
.await
2424
.unwrap()
2525
.write(vec![batch.clone()])
@@ -55,7 +55,7 @@ mod tests {
5555
// Test Txn Id can be read from existing table
5656

5757
let mut table2 =
58-
DeltaTableBuilder::from_uri(ensure_table_uri(tmp_path.to_str().unwrap()).unwrap())
58+
DeltaTableBuilder::from_url(ensure_table_uri(tmp_path.to_str().unwrap()).unwrap())
5959
.unwrap()
6060
.load()
6161
.await
@@ -123,7 +123,7 @@ mod tests {
123123
// Create a checkpoint and then load
124124
checkpoints::create_checkpoint(&table, None).await.unwrap();
125125
let table3 =
126-
DeltaTableBuilder::from_uri(ensure_table_uri(tmp_path.to_str().unwrap()).unwrap())
126+
DeltaTableBuilder::from_url(ensure_table_uri(tmp_path.to_str().unwrap()).unwrap())
127127
.unwrap()
128128
.load()
129129
.await

0 commit comments

Comments
 (0)