Skip to content

Commit 6740f2f

Browse files
committed
fix: normalize Urls being added to the DefaultObjectStoreRegistry
Every trailing slashes cause all sorts of subtle equivalency confusion when we do things with a Url. It's better for us to normalize everything to always have a trailing slash which makes it easier to join with, and do other things. Signed-off-by: R. Tyler Croy <[email protected]>
1 parent a959d1f commit 6740f2f

File tree

2 files changed

+58
-13
lines changed

2 files changed

+58
-13
lines changed

crates/core/src/logstore/storage/mod.rs

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use dashmap::DashMap;
55
use deltalake_derive::DeltaConfig;
66
use object_store::path::Path;
77
use object_store::{DynObjectStore, ObjectStore};
8+
use tracing::log::*;
89
use url::Url;
910

1011
use crate::{DeltaResult, DeltaTableError};
@@ -40,7 +41,7 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
4041
#[derive(Clone)]
4142
pub struct DefaultObjectStoreRegistry {
4243
/// A map from scheme to object store that serve list / read operations for the store
43-
object_stores: DashMap<String, Arc<dyn ObjectStore>>,
44+
object_stores: DashMap<Url, Arc<dyn ObjectStore>>,
4445
}
4546

4647
impl Default for DefaultObjectStoreRegistry {
@@ -51,7 +52,7 @@ impl Default for DefaultObjectStoreRegistry {
5152

5253
impl DefaultObjectStoreRegistry {
5354
pub fn new() -> Self {
54-
let object_stores: DashMap<String, Arc<dyn ObjectStore>> = DashMap::new();
55+
let object_stores: DashMap<Url, Arc<dyn ObjectStore>> = DashMap::new();
5556
Self { object_stores }
5657
}
5758
}
@@ -77,19 +78,48 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
7778
url: &Url,
7879
store: Arc<dyn ObjectStore>,
7980
) -> Option<Arc<dyn ObjectStore>> {
80-
self.object_stores.insert(url.to_string(), store)
81+
self.object_stores.insert(normalize_table_url(url), store)
8182
}
8283

8384
fn get_store(&self, url: &Url) -> DeltaResult<Arc<dyn ObjectStore>> {
8485
self.object_stores
85-
.get(&url.to_string())
86+
.get(&normalize_table_url(url))
8687
.map(|o| Arc::clone(o.value()))
8788
.ok_or_else(|| {
8889
DeltaTableError::generic(format!("No suitable object store found for '{url}'."))
8990
})
9091
}
9192
}
9293

94+
/// Normalize a given [Url] to **always** contain a trailing slash. This is critically important
95+
/// for assumptions about [Url] equivalency and more importantly for **joining** on a Url`.
96+
///
97+
/// ```
98+
/// left.join("_delta_log"); // produces `s3://bucket/prefix/_delta_log`
99+
/// right.join("_delta_log"); // produces `s3://bucket/_delta_log`
100+
/// ```
101+
fn normalize_table_url(url: &Url) -> Url {
102+
let mut new_segments = vec![];
103+
for segment in url.path().split('/') {
104+
if !segment.is_empty() {
105+
new_segments.push(segment);
106+
}
107+
}
108+
// Add a trailing slash segment
109+
new_segments.push("");
110+
111+
let mut url = url.clone();
112+
if let Ok(mut path_segments) = url.path_segments_mut() {
113+
path_segments.clear();
114+
path_segments.extend(new_segments);
115+
} else {
116+
error!(
117+
"Was not able to normalize the table URL. This is non-fatal but may produce curious results!"
118+
);
119+
}
120+
url
121+
}
122+
93123
#[derive(Debug, Clone, Default, DeltaConfig)]
94124
pub struct LimitConfig {
95125
#[delta(alias = "concurrency_limit", env = "OBJECT_STORE_CONCURRENCY_LIMIT")]
@@ -108,6 +138,25 @@ mod tests {
108138
use crate::logstore::config::TryUpdateKey;
109139
use crate::test_utils::with_env;
110140

141+
#[test]
142+
fn test_normalize_table_url() {
143+
for (u, path) in [
144+
(Url::parse("s3://bucket/prefix/").unwrap(), "/prefix/"),
145+
(Url::parse("s3://bucket/prefix").unwrap(), "/prefix/"),
146+
(
147+
Url::parse("s3://bucket/prefix/with/redundant/slashes//").unwrap(),
148+
"/prefix/with/redundant/slashes/",
149+
),
150+
] {
151+
assert_eq!(
152+
normalize_table_url(&u).path(),
153+
path,
154+
"Failed to normalize: {}",
155+
u.as_str()
156+
);
157+
}
158+
}
159+
111160
#[test]
112161
fn test_limit_config() {
113162
let mut config = LimitConfig::default();

crates/lakefs/src/execute.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -145,16 +145,12 @@ mod tests {
145145
.as_any()
146146
.downcast_ref::<LakeFSLogStore>()
147147
{
148+
let table_url =
149+
Url::parse(format!("lakefs://repo/delta-tx-{operation_id}/table").as_str())
150+
.unwrap();
148151
assert!(
149-
lakefs_store
150-
.prefixed_registry
151-
.get_store(
152-
&Url::parse(
153-
format!("lakefs://repo/delta-tx-{operation_id}/table").as_str()
154-
)
155-
.unwrap()
156-
)
157-
.is_ok()
152+
lakefs_store.prefixed_registry.get_store(&table_url).is_ok(),
153+
"The LakeFSLogStore did not have the URL {table_url:?} we expected: {lakefs_store:?}"
158154
);
159155

160156
assert!(lakefs_store.client.get_transaction(operation_id).is_ok())

0 commit comments

Comments
 (0)