Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ impl DynamicObjectStoreCatalogProvider {
}

impl CatalogProvider for DynamicObjectStoreCatalogProvider {
fn schema_names(&self) -> Vec<String> {
fn schema_names(&self) -> Result<Vec<String>> {
self.inner.schema_names()
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
let state = self.state.clone();
self.inner.schema(name).map(|schema| {
Ok(self.inner.schema(name)?.map(|schema| {
Arc::new(DynamicObjectStoreSchemaProvider::new(schema, state)) as _
})
}))
}

fn register_schema(
Expand Down Expand Up @@ -125,7 +125,7 @@ impl DynamicObjectStoreSchemaProvider {

#[async_trait]
impl SchemaProvider for DynamicObjectStoreSchemaProvider {
fn table_names(&self) -> Vec<String> {
fn table_names(&self) -> Result<Vec<String>> {
self.inner.table_names()
}

Expand Down Expand Up @@ -200,7 +200,7 @@ impl SchemaProvider for DynamicObjectStoreSchemaProvider {
self.inner.deregister_table(name)
}

fn table_exist(&self, name: &str) -> bool {
fn table_exist(&self, name: &str) -> Result<bool> {
self.inner.table_exist(name)
}
}
Expand Down Expand Up @@ -235,12 +235,22 @@ mod tests {
ctx.state().catalog_list().clone(),
ctx.state_weak_ref(),
) as &dyn CatalogProviderList;
let catalog_name = provider
.catalog_names()
.first()
.expect("default catalog should exist")
.clone();
let catalog = provider
.catalog(provider.catalog_names().first().unwrap())
.unwrap();
.catalog(&catalog_name)
.expect("default catalog should be retrievable");
let schema_names = catalog
.schema_names()
.expect("schema names lookup should succeed");
let schema_name = schema_names.first().expect("default schema should exist");
let schema = catalog
.schema(catalog.schema_names().first().unwrap())
.unwrap();
.schema(schema_name)
.expect("schema lookup should succeed")
.expect("default schema should be retrievable");
(ctx, schema)
}

Expand Down
18 changes: 9 additions & 9 deletions datafusion-examples/examples/data_io/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,19 +178,19 @@ impl DirSchema {

#[async_trait]
impl SchemaProvider for DirSchema {
fn table_names(&self) -> Vec<String> {
fn table_names(&self) -> Result<Vec<String>> {
let tables = self.tables.read().unwrap();
tables.keys().cloned().collect::<Vec<_>>()
Ok(tables.keys().cloned().collect::<Vec<_>>())
}

async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let tables = self.tables.read().unwrap();
Ok(tables.get(name).cloned())
}

fn table_exist(&self, name: &str) -> bool {
fn table_exist(&self, name: &str) -> Result<bool> {
let tables = self.tables.read().unwrap();
tables.contains_key(name)
Ok(tables.contains_key(name))
}

fn register_table(
Expand Down Expand Up @@ -237,20 +237,20 @@ impl CatalogProvider for DirCatalog {
Ok(Some(schema))
}

fn schema_names(&self) -> Vec<String> {
fn schema_names(&self) -> Result<Vec<String>> {
let schemas = self.schemas.read().unwrap();
schemas.keys().cloned().collect()
Ok(schemas.keys().cloned().collect())
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
let schemas = self.schemas.read().unwrap();
let maybe_schema = schemas.get(name);
if let Some(schema) = maybe_schema {
Ok(if let Some(schema) = maybe_schema {
let schema = schema.clone() as Arc<dyn SchemaProvider>;
Some(schema)
} else {
None
}
})
}
}

Expand Down
28 changes: 21 additions & 7 deletions datafusion-examples/examples/flight/sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,26 @@ impl FlightSqlServiceImpl {
let mut names = vec![];
let mut types = vec![];
for catalog in ctx.catalog_names() {
let catalog_provider = ctx.catalog(&catalog).unwrap();
for schema in catalog_provider.schema_names() {
let schema_provider = catalog_provider.schema(&schema).unwrap();
for table in schema_provider.table_names() {
let table_provider =
schema_provider.table(&table).await.unwrap().unwrap();
let catalog_provider = ctx
.catalog(&catalog)
.expect("catalog listed by context should be retrievable");
for schema in catalog_provider
.schema_names()
.expect("schema names lookup should succeed")
{
let schema_provider = catalog_provider
.schema(&schema)
.expect("schema lookup should succeed")
.expect("listed schema should be retrievable");
for table in schema_provider
.table_names()
.expect("table names lookup should succeed")
{
let table_provider = schema_provider
.table(&table)
.await
.expect("table lookup should succeed")
.expect("listed table should be retrievable");
catalogs.push(catalog.clone());
schemas.push(schema.clone());
names.push(table.clone());
Expand All @@ -197,7 +211,7 @@ impl FlightSqlServiceImpl {
.map(|i| Arc::new(StringArray::from(i)) as ArrayRef)
.collect::<Vec<_>>(),
)
.unwrap()
.expect("record batch construction should succeed")
}

fn remove_plan(&self, handle: &str) -> Result<(), Status> {
Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/examples/udf/table_list_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ impl TableFunctionImpl for TableListUdtf {
let Some(catalog) = catalog_list.catalog(&catalog_name) else {
continue;
};
for schema_name in catalog.schema_names() {
let Some(schema) = catalog.schema(&schema_name) else {
for schema_name in catalog.schema_names()? {
let Some(schema) = catalog.schema(&schema_name)? else {
continue;
};
for table_name in schema.table_names() {
for table_name in schema.table_names()? {
let Some(provider) = block_in_place(|| {
Handle::current().block_on(schema.table(&table_name))
})?
Expand Down
30 changes: 19 additions & 11 deletions datafusion/catalog/src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ impl SchemaProvider for ResolvedSchemaProvider {
self.owner_name.as_deref()
}

fn table_names(&self) -> Vec<String> {
self.cached_tables.keys().cloned().collect()
fn table_names(&self) -> Result<Vec<String>> {
Ok(self.cached_tables.keys().cloned().collect())
}

async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
Expand All @@ -61,8 +61,8 @@ impl SchemaProvider for ResolvedSchemaProvider {
)
}

fn table_exist(&self, name: &str) -> bool {
self.cached_tables.contains_key(name)
fn table_exist(&self, name: &str) -> Result<bool> {
Ok(self.cached_tables.contains_key(name))
}
}

Expand Down Expand Up @@ -111,12 +111,12 @@ struct ResolvedCatalogProvider {
cached_schemas: HashMap<String, Arc<dyn SchemaProvider>>,
}
impl CatalogProvider for ResolvedCatalogProvider {
fn schema_names(&self) -> Vec<String> {
self.cached_schemas.keys().cloned().collect()
fn schema_names(&self) -> Result<Vec<String>> {
Ok(self.cached_schemas.keys().cloned().collect())
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.cached_schemas.get(name).cloned()
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
Ok(self.cached_schemas.get(name).cloned())
}
}

Expand Down Expand Up @@ -592,12 +592,19 @@ mod tests {
);

for schema_ref in found_schemas {
let schema = cached_provider.schema(schema_ref);
let schema = cached_provider
.schema(schema_ref)
.expect("schema lookup should succeed");
assert!(schema.is_some());
}

for schema_ref in not_found_schemas {
assert!(cached_provider.schema(schema_ref).is_none());
assert!(
cached_provider
.schema(schema_ref)
.expect("schema lookup should succeed")
.is_none()
);
}
}

Expand Down Expand Up @@ -729,7 +736,8 @@ mod tests {
.unwrap();
let schema = catalog
.schema(table_ref.schema().unwrap_or(MOCK_SCHEMA))
.unwrap();
.expect("schema lookup should succeed")
.expect("default schema should exist");
assert!(schema.table(table_ref.table()).await.unwrap().is_some());
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ use datafusion_common::not_impl_err;
/// [`TableProvider`]: crate::TableProvider
pub trait CatalogProvider: Any + Debug + Sync + Send {
/// Retrieves the list of available schema names in this catalog.
fn schema_names(&self) -> Vec<String>;
fn schema_names(&self) -> Result<Vec<String>>;

/// Retrieves a specific schema from the catalog by name, provided it exists.
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>>;

/// Adds a new schema to this catalog.
///
Expand Down
15 changes: 9 additions & 6 deletions datafusion/catalog/src/dynamic_file/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,20 @@ impl DynamicFileCatalogProvider {
}

impl CatalogProvider for DynamicFileCatalogProvider {
fn schema_names(&self) -> Vec<String> {
fn schema_names(&self) -> datafusion_common::Result<Vec<String>> {
self.inner.schema_names()
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.inner.schema(name).map(|schema| {
fn schema(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
Ok(self.inner.schema(name)?.map(|schema| {
Arc::new(DynamicFileSchemaProvider::new(
schema,
Arc::clone(&self.factory),
)) as _
})
}))
}

fn register_schema(
Expand Down Expand Up @@ -128,7 +131,7 @@ impl DynamicFileSchemaProvider {

#[async_trait]
impl SchemaProvider for DynamicFileSchemaProvider {
fn table_names(&self) -> Vec<String> {
fn table_names(&self) -> datafusion_common::Result<Vec<String>> {
self.inner.table_names()
}

Expand Down Expand Up @@ -158,7 +161,7 @@ impl SchemaProvider for DynamicFileSchemaProvider {
self.inner.deregister_table(name)
}

fn table_exist(&self, name: &str) -> bool {
fn table_exist(&self, name: &str) -> datafusion_common::Result<bool> {
self.inner.table_exist(name)
}
}
Expand Down
Loading