Skip to content

Commit 36792b0

Browse files
feat: Add support for server supported endpoints in config endpoint
1 parent bc86d10 commit 36792b0

File tree

2 files changed

+459
-5
lines changed

2 files changed

+459
-5
lines changed

crates/catalog/rest/src/catalog.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
//! This module contains the iceberg REST catalog implementation.
1919
2020
use std::any::Any;
21-
use std::collections::HashMap;
21+
use std::collections::{HashMap, HashSet};
2222
use std::future::Future;
2323
use std::str::FromStr;
24+
use std::sync::OnceLock;
2425

2526
use async_trait::async_trait;
2627
use iceberg::io::{self, FileIO};
@@ -37,6 +38,7 @@ use reqwest::{Client, Method, StatusCode, Url};
3738
use tokio::sync::OnceCell;
3839
use typed_builder::TypedBuilder;
3940

41+
use crate::Endpoint;
4042
use crate::client::{
4143
HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
4244
};
@@ -55,6 +57,33 @@ const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
5557
const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
5658
const PATH_V1: &str = "v1";
5759

60+
static DEFAULT_ENDPOINTS: OnceLock<HashSet<Endpoint>> = OnceLock::new();
61+
62+
fn default_endpoints() -> &'static HashSet<Endpoint> {
63+
DEFAULT_ENDPOINTS.get_or_init(|| {
64+
[
65+
Endpoint::v1_config(),
66+
Endpoint::v1_list_namespaces(),
67+
Endpoint::v1_create_namespace(),
68+
Endpoint::v1_load_namespace(),
69+
Endpoint::v1_update_namespace(),
70+
Endpoint::v1_delete_namespace(),
71+
Endpoint::v1_list_tables(),
72+
Endpoint::v1_create_table(),
73+
Endpoint::v1_load_table(),
74+
Endpoint::v1_update_table(),
75+
Endpoint::v1_delete_table(),
76+
Endpoint::v1_rename_table(),
77+
Endpoint::v1_register_table(),
78+
Endpoint::v1_report_metrics(),
79+
Endpoint::v1_commit_transaction(),
80+
]
81+
.into_iter()
82+
.cloned()
83+
.collect()
84+
})
85+
}
86+
5887
/// Builder for [`RestCatalog`].
5988
#[derive(Debug)]
6089
pub struct RestCatalogBuilder(RestCatalogConfig);
@@ -67,6 +96,7 @@ impl Default for RestCatalogBuilder {
6796
warehouse: None,
6897
props: HashMap::new(),
6998
client: None,
99+
endpoints: default_endpoints().clone(),
70100
})
71101
}
72102
}
@@ -142,6 +172,9 @@ pub(crate) struct RestCatalogConfig {
142172

143173
#[builder(default)]
144174
client: Option<Client>,
175+
176+
#[builder(default)]
177+
endpoints: HashSet<Endpoint>,
145178
}
146179

147180
impl RestCatalogConfig {
@@ -304,6 +337,12 @@ impl RestCatalogConfig {
304337
props.extend(config.overrides);
305338

306339
self.props = props;
340+
self.endpoints = if config.endpoints.is_empty() {
341+
default_endpoints().clone()
342+
} else {
343+
config.endpoints
344+
};
345+
307346
self
308347
}
309348
}
@@ -442,6 +481,7 @@ impl Catalog for RestCatalog {
442481
parent: Option<&NamespaceIdent>,
443482
) -> Result<Vec<NamespaceIdent>> {
444483
let context = self.context().await?;
484+
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_list_namespaces())?;
445485
let endpoint = context.config.namespaces_endpoint();
446486
let mut namespaces = Vec::new();
447487
let mut next_token = None;
@@ -492,6 +532,7 @@ impl Catalog for RestCatalog {
492532
properties: HashMap<String, String>,
493533
) -> Result<Namespace> {
494534
let context = self.context().await?;
535+
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_create_namespace())?;
495536

496537
let request = context
497538
.client
@@ -520,6 +561,7 @@ impl Catalog for RestCatalog {
520561

521562
async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
522563
let context = self.context().await?;
564+
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_load_namespace())?;
523565

524566
let request = context
525567
.client
@@ -544,6 +586,7 @@ impl Catalog for RestCatalog {
544586

545587
async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
546588
let context = self.context().await?;
589+
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_namespace_exists())?;
547590

548591
let request = context
549592
.client
@@ -572,6 +615,7 @@ impl Catalog for RestCatalog {
572615

573616
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
574617
let context = self.context().await?;
618+
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_delete_namespace())?;
575619

576620
let request = context
577621
.client
@@ -592,6 +636,7 @@ impl Catalog for RestCatalog {
592636

593637
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
594638
let context = self.context().await?;
639+
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_list_tables())?;
595640
let endpoint = context.config.tables_endpoint(namespace);
596641
let mut identifiers = Vec::new();
597642
let mut next_token = None;
@@ -642,6 +687,7 @@ impl Catalog for RestCatalog {
642687
creation: TableCreation,
643688
) -> Result<Table> {
644689
let context = self.context().await?;
690+
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_create_table())?;
645691

646692
let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
647693

@@ -714,6 +760,7 @@ impl Catalog for RestCatalog {
714760
/// provided locally to the `RestCatalog` will take precedence.
715761
async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
716762
let context = self.context().await?;
763+
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_load_table())?;
717764

718765
let request = context
719766
.client
@@ -760,6 +807,7 @@ impl Catalog for RestCatalog {
760807
/// Drop a table from the catalog.
761808
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
762809
let context = self.context().await?;
810+
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_delete_table())?;
763811

764812
let request = context
765813
.client
@@ -781,6 +829,7 @@ impl Catalog for RestCatalog {
781829
/// Check if a table exists in the catalog.
782830
async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
783831
let context = self.context().await?;
832+
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_table_exists())?;
784833

785834
let request = context
786835
.client
@@ -799,6 +848,7 @@ impl Catalog for RestCatalog {
799848
/// Rename a table in the catalog.
800849
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
801850
let context = self.context().await?;
851+
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_rename_table())?;
802852

803853
let request = context
804854
.client
@@ -831,6 +881,7 @@ impl Catalog for RestCatalog {
831881
metadata_location: String,
832882
) -> Result<Table> {
833883
let context = self.context().await?;
884+
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_register_table())?;
834885

835886
let request = context
836887
.client
@@ -885,6 +936,7 @@ impl Catalog for RestCatalog {
885936

886937
async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
887938
let context = self.context().await?;
939+
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_update_table())?;
888940

889941
let request = context
890942
.client

0 commit comments

Comments
 (0)