@@ -5,6 +5,7 @@ use dashmap::DashMap;
55use deltalake_derive:: DeltaConfig ;
66use object_store:: path:: Path ;
77use object_store:: { DynObjectStore , ObjectStore } ;
8+ use tracing:: log:: * ;
89use url:: Url ;
910
1011use crate :: { DeltaResult , DeltaTableError } ;
@@ -40,7 +41,7 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
4041#[ derive( Clone ) ]
4142pub struct DefaultObjectStoreRegistry {
4243 /// A map from scheme to object store that serve list / read operations for the store
43- object_stores : DashMap < String , Arc < dyn ObjectStore > > ,
44+ object_stores : DashMap < Url , Arc < dyn ObjectStore > > ,
4445}
4546
4647impl Default for DefaultObjectStoreRegistry {
@@ -51,7 +52,7 @@ impl Default for DefaultObjectStoreRegistry {
5152
5253impl DefaultObjectStoreRegistry {
5354 pub fn new ( ) -> Self {
54- let object_stores: DashMap < String , Arc < dyn ObjectStore > > = DashMap :: new ( ) ;
55+ let object_stores: DashMap < Url , Arc < dyn ObjectStore > > = DashMap :: new ( ) ;
5556 Self { object_stores }
5657 }
5758}
@@ -77,19 +78,48 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
7778 url : & Url ,
7879 store : Arc < dyn ObjectStore > ,
7980 ) -> Option < Arc < dyn ObjectStore > > {
80- self . object_stores . insert ( url . to_string ( ) , store)
81+ self . object_stores . insert ( normalize_table_url ( url ) , store)
8182 }
8283
8384 fn get_store ( & self , url : & Url ) -> DeltaResult < Arc < dyn ObjectStore > > {
8485 self . object_stores
85- . get ( & url . to_string ( ) )
86+ . get ( & normalize_table_url ( url ) )
8687 . map ( |o| Arc :: clone ( o. value ( ) ) )
8788 . ok_or_else ( || {
8889 DeltaTableError :: generic ( format ! ( "No suitable object store found for '{url}'." ) )
8990 } )
9091 }
9192}
9293
94+ /// Normalize a given [Url] to **always** contain a trailing slash. This is critically important
95+ /// for assumptions about [Url] equivalency and more importantly for **joining** on a Url`.
96+ ///
97+ /// ```
98+ /// left.join("_delta_log"); // produces `s3://bucket/prefix/_delta_log`
99+ /// right.join("_delta_log"); // produces `s3://bucket/_delta_log`
100+ /// ```
101+ fn normalize_table_url ( url : & Url ) -> Url {
102+ let mut new_segments = vec ! [ ] ;
103+ for segment in url. path ( ) . split ( '/' ) {
104+ if !segment. is_empty ( ) {
105+ new_segments. push ( segment) ;
106+ }
107+ }
108+ // Add a trailing slash segment
109+ new_segments. push ( "" ) ;
110+
111+ let mut url = url. clone ( ) ;
112+ if let Ok ( mut path_segments) = url. path_segments_mut ( ) {
113+ path_segments. clear ( ) ;
114+ path_segments. extend ( new_segments) ;
115+ } else {
116+ error ! (
117+ "Was not able to normalize the table URL. This is non-fatal but may produce curious results!"
118+ ) ;
119+ }
120+ url
121+ }
122+
93123#[ derive( Debug , Clone , Default , DeltaConfig ) ]
94124pub struct LimitConfig {
95125 #[ delta( alias = "concurrency_limit" , env = "OBJECT_STORE_CONCURRENCY_LIMIT" ) ]
@@ -108,6 +138,25 @@ mod tests {
108138 use crate :: logstore:: config:: TryUpdateKey ;
109139 use crate :: test_utils:: with_env;
110140
141+ #[ test]
142+ fn test_normalize_table_url ( ) {
143+ for ( u, path) in [
144+ ( Url :: parse ( "s3://bucket/prefix/" ) . unwrap ( ) , "/prefix/" ) ,
145+ ( Url :: parse ( "s3://bucket/prefix" ) . unwrap ( ) , "/prefix/" ) ,
146+ (
147+ Url :: parse ( "s3://bucket/prefix/with/redundant/slashes//" ) . unwrap ( ) ,
148+ "/prefix/with/redundant/slashes/" ,
149+ ) ,
150+ ] {
151+ assert_eq ! (
152+ normalize_table_url( & u) . path( ) ,
153+ path,
154+ "Failed to normalize: {}" ,
155+ u. as_str( )
156+ ) ;
157+ }
158+ }
159+
111160 #[ test]
112161 fn test_limit_config ( ) {
113162 let mut config = LimitConfig :: default ( ) ;
0 commit comments