Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
[workspace]
members = ["crates/*", "delta-inspect", "python"]
members = ["crates/*", "python"]
exclude = []
resolver = "2"
resolver = "3"

[workspace.package]
authors = ["Qingping Hou <[email protected]>"]
rust-version = "1.86"
authors = ["Qingping Hou <[email protected]>", "R Tyler Croy <[email protected]>"]
rust-version = "1.88"
keywords = ["deltalake", "delta", "datalake"]
readme = "README.md"
edition = "2021"
edition = "2024"
description = "Native Delta Lake implementation in Rust"
homepage = "https://github.com/delta-io/delta.rs"
license = "Apache-2.0"
Expand Down Expand Up @@ -144,4 +144,4 @@ incremental = false
inherits = "release"
opt-level = 3
codegen-units = 1
lto = "fat"
lto = "fat"
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ coverage: setup-dat ## Run Rust tests with code-coverage
.PHONY: check
check: ## Run basic cargo formatting and other checks (no tests)
cargo fmt -- --check


cargo clippy --features azure,datafusion,s3,gcs,glue,hdfs --tests

.PHONY: clean
clean: ## Remove temporary and downloaded artifacts
Expand Down
4 changes: 2 additions & 2 deletions crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-aws"
version = "0.12.1"
version = "0.13.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -13,7 +13,7 @@ rust-version.workspace = true

[dependencies]
# path dependencies
deltalake-core = { version = "0.29.0", path = "../core" , features = ["cloud"]}
deltalake-core = { version = "0.30.0", path = "../core" , features = ["cloud"]}

# workspace dependencies
async-trait = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/aws/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ pub static CONDITION_EXPR_CREATE: LazyLock<String> = LazyLock::new(|| {

pub static CONDITION_DELETE_INCOMPLETE: LazyLock<String> = LazyLock::new(|| {
format!(
"(complete = :f) or (attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME}))"
)
"(complete = :f) or (attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME}))"
)
});

pub const CONDITION_UPDATE_INCOMPLETE: &str = "complete = :f";
Expand Down
18 changes: 12 additions & 6 deletions crates/aws/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::SystemTime;

use aws_config::SdkConfig;
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_config::meta::credentials::CredentialsProviderChain;
use aws_config::sts::AssumeRoleProvider;
use aws_config::SdkConfig;
use aws_credential_types::provider::error::CredentialsError;
use aws_credential_types::provider::{future, ProvideCredentials};
use aws_credential_types::Credentials;
use aws_credential_types::provider::error::CredentialsError;
use aws_credential_types::provider::{ProvideCredentials, future};

use deltalake_core::DeltaResult;
use deltalake_core::logstore::object_store::aws::{AmazonS3ConfigKey, AwsCredential};
use deltalake_core::logstore::object_store::{
CredentialProvider, Error as ObjectStoreError, Result as ObjectStoreResult,
};
use deltalake_core::DeltaResult;
use tokio::sync::Mutex;
use tracing::log::*;

Expand Down Expand Up @@ -186,7 +186,10 @@ mod options_tests {
]);
let provider = OptionsCredentialsProvider { options };
let result = provider.credentials();
assert!(result.is_ok(), "StorageOptions with at least AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY should resolve");
assert!(
result.is_ok(),
"StorageOptions with at least AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY should resolve"
);
let result = result.unwrap();
assert_eq!(result.access_key_id(), "key");
assert_eq!(result.secret_access_key(), "secret");
Expand All @@ -200,7 +203,10 @@ mod options_tests {
]);
let provider = OptionsCredentialsProvider { options };
let result = provider.credentials();
assert!(result.is_ok(), "StorageOptions with at least AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY should resolve");
assert!(
result.is_ok(),
"StorageOptions with at least AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY should resolve"
);
let result = result.unwrap();
assert_eq!(result.access_key_id(), "key");
assert_eq!(result.secret_access_key(), "secret");
Expand Down
18 changes: 12 additions & 6 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use aws_config::SdkConfig;
pub use aws_credential_types::provider::SharedCredentialsProvider;
use aws_sdk_dynamodb::error::SdkError;
use aws_sdk_dynamodb::{
Client,
operation::{
create_table::CreateTableError, delete_item::DeleteItemError, get_item::GetItemError,
put_item::PutItemError, query::QueryError, update_item::UpdateItemError,
Expand All @@ -24,12 +25,11 @@ use aws_sdk_dynamodb::{
AttributeDefinition, AttributeValue, BillingMode, KeySchemaElement, KeyType,
ScalarAttributeType,
},
Client,
};
use deltalake_core::logstore::object_store::aws::AmazonS3ConfigKey;
use deltalake_core::logstore::{
default_logstore, logstore_factories, object_store_factories, LogStore, LogStoreFactory,
ObjectStoreRef, StorageConfig,
LogStore, LogStoreFactory, ObjectStoreRef, StorageConfig, default_logstore, logstore_factories,
object_store_factories,
};
use deltalake_core::{DeltaResult, Path};
use errors::{DynamoDbConfigError, LockClientError};
Expand Down Expand Up @@ -69,8 +69,12 @@ impl LogStoreFactory for S3LogStoreFactory {
]
.contains(&key.as_str())
}) {
debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required");
warn!("Most S3 object store support conditional put, remove copy_if_not_exists parameter to use a more performant conditional put.");
debug!(
"S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required"
);
warn!(
"Most S3 object store support conditional put, remove copy_if_not_exists parameter to use a more performant conditional put."
);
return Ok(logstore::default_s3_logstore(
prefixed_store,
root_store,
Expand All @@ -81,7 +85,9 @@ impl LogStoreFactory for S3LogStoreFactory {

let s3_options = S3StorageOptions::from_map(&s3_options)?;
if s3_options.locking_provider.as_deref() == Some("dynamodb") {
debug!("S3LogStoreFactory has been asked to create a LogStore with the dynamodb locking provider");
debug!(
"S3LogStoreFactory has been asked to create a LogStore with the dynamodb locking provider"
);
return Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new(
location.clone(),
options,
Expand Down
7 changes: 2 additions & 5 deletions crates/aws/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use bytes::Bytes;
use deltalake_core::logstore::*;
use deltalake_core::{
kernel::transaction::TransactionError, logstore::ObjectStoreRef, DeltaResult,
DeltaResult, kernel::transaction::TransactionError, logstore::ObjectStoreRef,
};
use object_store::{Error as ObjectStoreError, ObjectStore};
use url::Url;
Expand All @@ -21,10 +21,7 @@ pub fn default_s3_logstore(
Arc::new(S3LogStore::new(
store,
root_store,
LogStoreConfig {
location: location.clone(),
options: options.clone(),
},
LogStoreConfig::new(location.clone(), options.clone()),
))
}

Expand Down
38 changes: 18 additions & 20 deletions crates/aws/src/logstore/dynamodb_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@

use crate::errors::LockClientError;
use crate::storage::S3StorageOptions;
use crate::{constants, CommitEntry, DynamoDbLockClient, UpdateLogEntryResult};
use crate::{CommitEntry, DynamoDbLockClient, UpdateLogEntryResult, constants};

use bytes::Bytes;
use deltalake_core::{ObjectStoreError, Path};

Check warning on line 11 in crates/aws/src/logstore/dynamodb_logstore.rs

View workflow job for this annotation

GitHub Actions / cloud (aws)

unused import: `Path`

Check warning on line 11 in crates/aws/src/logstore/dynamodb_logstore.rs

View workflow job for this annotation

GitHub Actions / aws-native-tls

unused import: `Path`
use tracing::{debug, error, warn};
use typed_builder::TypedBuilder;
use url::Url;

use deltalake_core::logstore::*;
use deltalake_core::{
kernel::transaction::TransactionError, logstore::ObjectStoreRef, DeltaResult, DeltaTableError,
DeltaResult, DeltaTableError, kernel::transaction::TransactionError, logstore::ObjectStoreRef,
};
use uuid::Uuid;

Expand All @@ -36,7 +36,7 @@
config: LogStoreConfig,
/// Table path URI
#[builder(setter(into))]
table_path: String,
table_path: Url,
}

impl std::fmt::Debug for S3DynamoDbLogStore {
Expand Down Expand Up @@ -80,16 +80,12 @@
source: Box::new(err),
},
})?;
let table_path = to_uri(&location, &Path::from(""));
Ok(Self::builder()
.prefixed_store(prefixed_store)
.root_store(root_store)
.lock_client(lock_client)
.config(LogStoreConfig {
location,
options: options.clone(),
})
.table_path(table_path)
.config(LogStoreConfig::new(location.clone(), options.clone()))
.table_path(location)
.build())
}

Expand Down Expand Up @@ -119,7 +115,10 @@
Err(TransactionError::ObjectStore {
source: ObjectStoreError::NotFound { .. },
}) => {
warn!("It looks like the {}.json has already been moved, we got 404 from ObjectStorage.", entry.version);
warn!(
"It looks like the {}.json has already been moved, we got 404 from ObjectStorage.",
entry.version
);
return self.try_complete_entry(entry, false).await;
}
Err(err) if retry == MAX_REPAIR_RETRIES => return Err(err),
Expand All @@ -141,7 +140,7 @@
for retry in 0..=MAX_REPAIR_RETRIES {
match self
.lock_client
.update_commit_entry(entry.version, &self.table_path)
.update_commit_entry(entry.version, self.table_path.as_str())
.await
.map_err(|err| TransactionError::LogStoreError {
msg: format!(
Expand Down Expand Up @@ -183,14 +182,10 @@
"S3DynamoDbLogStore".into()
}

fn root_uri(&self) -> String {
self.table_path.clone()
}

async fn refresh(&self) -> DeltaResult<()> {
let entry = self
.lock_client
.get_latest_entry(&self.table_path)
.get_latest_entry(self.table_path.as_str())
.await
.map_err(|err| DeltaTableError::GenericError {
source: Box::new(err),
Expand All @@ -204,7 +199,7 @@
async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>> {
let entry = self
.lock_client
.get_commit_entry(&self.table_path, version)
.get_commit_entry(&self.table_path.to_string(), version)
.await;
if let Ok(Some(entry)) = entry {
self.repair_entry(&entry).await?;
Expand Down Expand Up @@ -234,7 +229,7 @@
debug!("Writing commit entry for {self:?}: {entry:?}");
// create log entry in dynamo db: complete = false, no expireTime
self.lock_client
.put_commit_entry(&self.table_path, &entry)
.put_commit_entry(self.table_path.as_str(), &entry)
.await
.map_err(|err| match err {
LockClientError::VersionAlreadyExists { version, .. } => {
Expand Down Expand Up @@ -281,7 +276,7 @@
_ => unreachable!(), // S3DynamoDBLogstore should never get Bytes
};
self.lock_client
.delete_commit_entry(version, &self.table_path)
.delete_commit_entry(version, self.table_path.as_str())
.await
.map_err(|err| match err {
LockClientError::ProvisionedThroughputExceeded => todo!(
Expand All @@ -308,7 +303,7 @@
debug!("Retrieving latest version of {self:?} at v{current_version}");
let entry = self
.lock_client
.get_latest_entry(&self.table_path)
.get_latest_entry(self.table_path.as_str())
.await
.map_err(|err| DeltaTableError::GenericError {
source: Box::new(err),
Expand Down Expand Up @@ -347,3 +342,6 @@
/// Both parts of the repair process where already carried.
AlreadyCompleted,
}

#[cfg(test)]
mod tests {}
2 changes: 1 addition & 1 deletion crates/aws/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
mod default_logstore;
mod dynamodb_logstore;

pub use default_logstore::default_s3_logstore;
pub use default_logstore::S3LogStore;
pub use default_logstore::default_s3_logstore;
pub use dynamodb_logstore::RepairLogEntryResult;
pub use dynamodb_logstore::S3DynamoDbLogStore;
Loading
Loading