Skip to content

Commit e44dbf8

Browse files
committed
Merge pull request #21 from corwinjoy/move_kms
Move KMS class from file_format_options.rs to new file kms_encryption.rs Signed-off-by: Corwin Joy <[email protected]>
2 parents c5577a6 + 9a304ca commit e44dbf8

File tree

8 files changed

+180
-186
lines changed

8 files changed

+180
-186
lines changed

crates/core/src/operations/encryption.rs

Lines changed: 0 additions & 69 deletions
This file was deleted.

crates/core/src/operations/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ pub mod constraints;
5959
#[cfg(feature = "datafusion")]
6060
pub mod delete;
6161
#[cfg(feature = "datafusion")]
62-
pub mod encryption;
63-
#[cfg(feature = "datafusion")]
6462
mod load;
6563
#[cfg(feature = "datafusion")]
6664
pub mod load_cdf;

crates/core/src/table/file_format_options.rs

Lines changed: 1 addition & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,11 @@ use datafusion::catalog::Session;
44
pub use datafusion::config::{ConfigFileType, TableOptions, TableParquetOptions};
55
#[cfg(feature = "datafusion")]
66
use datafusion::execution::SessionState;
7-
use std::fmt::{Debug, Formatter};
7+
use std::fmt::Debug;
88

99
use crate::{crate_version, DeltaResult};
1010
use arrow_schema::Schema as ArrowSchema;
1111

12-
#[cfg(feature = "datafusion")]
13-
use crate::operations::encryption::TableEncryption;
1412
use async_trait::async_trait;
1513

1614
use object_store::path::Path;
@@ -19,7 +17,6 @@ use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
1917
use parquet::schema::types::ColumnPath;
2018
use std::sync::Arc;
2119
use tracing::debug;
22-
use uuid::Uuid;
2320

2421
// Top level trait for file format options used by a DeltaTable
2522
pub trait FileFormatOptions: Send + Sync + std::fmt::Debug + 'static {
@@ -172,104 +169,3 @@ impl WriterPropertiesFactory for SimpleWriterPropertiesFactory {
172169
Ok(self.writer_properties.clone())
173170
}
174171
}
175-
176-
// More advanced factory with KMS support
177-
#[cfg(feature = "datafusion")]
178-
#[derive(Clone, Debug)]
179-
pub struct KMSWriterPropertiesFactory {
180-
writer_properties: WriterProperties,
181-
encryption: Option<crate::operations::encryption::TableEncryption>,
182-
}
183-
184-
#[cfg(feature = "datafusion")]
185-
impl KMSWriterPropertiesFactory {
186-
pub fn with_encryption(table_encryption: TableEncryption) -> Self {
187-
let writer_properties = WriterProperties::builder()
188-
.set_compression(Compression::SNAPPY) // Code assumes Snappy by default
189-
.set_created_by(format!("delta-rs version {}", crate_version()))
190-
.build();
191-
Self {
192-
writer_properties,
193-
encryption: Some(table_encryption),
194-
}
195-
}
196-
}
197-
198-
#[cfg(feature = "datafusion")]
199-
#[async_trait]
200-
impl WriterPropertiesFactory for KMSWriterPropertiesFactory {
201-
fn compression(&self, column_path: &ColumnPath) -> Compression {
202-
self.writer_properties.compression(column_path)
203-
}
204-
205-
async fn create_writer_properties(
206-
&self,
207-
file_path: &Path,
208-
file_schema: &Arc<ArrowSchema>,
209-
) -> DeltaResult<WriterProperties> {
210-
let mut builder: WriterPropertiesBuilder = self.writer_properties.clone().into();
211-
if let Some(encryption) = self.encryption.as_ref() {
212-
builder = encryption
213-
.update_writer_properties(builder, file_path, file_schema)
214-
.await?;
215-
}
216-
Ok(builder.build())
217-
}
218-
}
219-
220-
// -------------------------------------------------------------------------------------------------
221-
// FileFormatOptions for KMS encryption based on settings in TableEncryption
222-
// -------------------------------------------------------------------------------------------------
223-
#[cfg(feature = "datafusion")]
224-
pub struct KmsFileFormatOptions {
225-
table_encryption: TableEncryption,
226-
writer_properties_factory: WriterPropertiesFactoryRef,
227-
encryption_factory_id: String,
228-
}
229-
230-
#[cfg(feature = "datafusion")]
231-
impl KmsFileFormatOptions {
232-
pub fn new(table_encryption: TableEncryption) -> Self {
233-
let encryption_factory_id = format!("delta-{}", Uuid::new_v4());
234-
let writer_properties_factory = Arc::new(KMSWriterPropertiesFactory::with_encryption(
235-
table_encryption.clone(),
236-
));
237-
Self {
238-
table_encryption,
239-
writer_properties_factory,
240-
encryption_factory_id,
241-
}
242-
}
243-
}
244-
245-
#[cfg(feature = "datafusion")]
246-
impl Debug for KmsFileFormatOptions {
247-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
248-
f.debug_struct("KmsFileFormatOptions")
249-
.finish_non_exhaustive()
250-
}
251-
}
252-
253-
#[cfg(feature = "datafusion")]
254-
impl FileFormatOptions for KmsFileFormatOptions {
255-
fn table_options(&self) -> TableOptions {
256-
let mut table_options = TableOptions::default();
257-
table_options.parquet.crypto.factory_id = Some(self.encryption_factory_id.clone());
258-
table_options.parquet.crypto.factory_options =
259-
self.table_encryption.configuration().clone();
260-
table_options
261-
}
262-
263-
fn writer_properties_factory(&self) -> WriterPropertiesFactoryRef {
264-
Arc::clone(&self.writer_properties_factory)
265-
}
266-
267-
fn update_session(&self, session: &dyn Session) -> DeltaResult<()> {
268-
// Ensure DataFusion has the encryption factory registered
269-
session.runtime_env().register_parquet_encryption_factory(
270-
&self.encryption_factory_id,
271-
Arc::clone(self.table_encryption.encryption_factory()),
272-
);
273-
Ok(())
274-
}
275-
}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
use crate::table::file_format_options::{
2+
FileFormatOptions, TableOptions, WriterPropertiesFactory, WriterPropertiesFactoryRef,
3+
};
4+
use crate::{crate_version, DeltaResult};
5+
use arrow_schema::Schema as ArrowSchema;
6+
use async_trait::async_trait;
7+
use datafusion::catalog::Session;
8+
use datafusion::config::{ConfigField, EncryptionFactoryOptions, ExtensionOptions};
9+
use datafusion::execution::parquet_encryption::EncryptionFactory;
10+
use object_store::path::Path;
11+
use parquet::basic::Compression;
12+
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
13+
use parquet::schema::types::ColumnPath;
14+
use std::fmt::{Debug, Formatter};
15+
use std::sync::Arc;
16+
use uuid::Uuid;
17+
18+
pub type SchemaRef = Arc<ArrowSchema>;
19+
20+
#[derive(Clone, Debug)]
21+
pub struct TableEncryption {
22+
encryption_factory: Arc<dyn EncryptionFactory>,
23+
configuration: EncryptionFactoryOptions,
24+
}
25+
26+
impl TableEncryption {
27+
pub fn new(
28+
encryption_factory: Arc<dyn EncryptionFactory>,
29+
configuration: EncryptionFactoryOptions,
30+
) -> Self {
31+
Self {
32+
encryption_factory,
33+
configuration,
34+
}
35+
}
36+
37+
pub fn new_with_extension_options<T: ExtensionOptions>(
38+
encryption_factory: Arc<dyn EncryptionFactory>,
39+
options: &T,
40+
) -> DeltaResult<Self> {
41+
let mut configuration = EncryptionFactoryOptions::default();
42+
for entry in options.entries() {
43+
if let Some(value) = &entry.value {
44+
configuration.set(&entry.key, value)?;
45+
}
46+
}
47+
Ok(Self {
48+
encryption_factory,
49+
configuration,
50+
})
51+
}
52+
53+
pub fn encryption_factory(&self) -> &Arc<dyn EncryptionFactory> {
54+
&self.encryption_factory
55+
}
56+
57+
pub fn configuration(&self) -> &EncryptionFactoryOptions {
58+
&self.configuration
59+
}
60+
61+
pub async fn update_writer_properties(
62+
&self,
63+
mut builder: WriterPropertiesBuilder,
64+
file_path: &Path,
65+
file_schema: &SchemaRef,
66+
) -> DeltaResult<WriterPropertiesBuilder> {
67+
let encryption_properties = self
68+
.encryption_factory
69+
.get_file_encryption_properties(&self.configuration, file_schema, file_path)
70+
.await?;
71+
if let Some(encryption_properties) = encryption_properties {
72+
builder = builder.with_file_encryption_properties(encryption_properties);
73+
}
74+
Ok(builder)
75+
}
76+
}
77+
78+
// More advanced factory with KMS support
79+
#[derive(Clone, Debug)]
80+
pub struct KMSWriterPropertiesFactory {
81+
writer_properties: WriterProperties,
82+
encryption: Option<TableEncryption>,
83+
}
84+
85+
impl KMSWriterPropertiesFactory {
86+
pub fn with_encryption(table_encryption: TableEncryption) -> Self {
87+
let writer_properties = WriterProperties::builder()
88+
.set_compression(Compression::SNAPPY) // Code assumes Snappy by default
89+
.set_created_by(format!("delta-rs version {}", crate_version()))
90+
.build();
91+
Self {
92+
writer_properties,
93+
encryption: Some(table_encryption),
94+
}
95+
}
96+
}
97+
98+
#[async_trait]
99+
impl WriterPropertiesFactory for KMSWriterPropertiesFactory {
100+
fn compression(&self, column_path: &ColumnPath) -> Compression {
101+
self.writer_properties.compression(column_path)
102+
}
103+
104+
async fn create_writer_properties(
105+
&self,
106+
file_path: &Path,
107+
file_schema: &Arc<ArrowSchema>,
108+
) -> DeltaResult<WriterProperties> {
109+
let mut builder: WriterPropertiesBuilder = self.writer_properties.clone().into();
110+
if let Some(encryption) = self.encryption.as_ref() {
111+
builder = encryption
112+
.update_writer_properties(builder, file_path, file_schema)
113+
.await?;
114+
}
115+
Ok(builder.build())
116+
}
117+
}
118+
119+
// -------------------------------------------------------------------------------------------------
120+
// FileFormatOptions for KMS encryption based on settings in TableEncryption
121+
// -------------------------------------------------------------------------------------------------
122+
pub struct KmsFileFormatOptions {
123+
table_encryption: TableEncryption,
124+
writer_properties_factory: WriterPropertiesFactoryRef,
125+
encryption_factory_id: String,
126+
}
127+
128+
impl KmsFileFormatOptions {
129+
pub fn new(table_encryption: TableEncryption) -> Self {
130+
let encryption_factory_id = format!("delta-{}", Uuid::new_v4());
131+
let writer_properties_factory = Arc::new(KMSWriterPropertiesFactory::with_encryption(
132+
table_encryption.clone(),
133+
));
134+
Self {
135+
table_encryption,
136+
writer_properties_factory,
137+
encryption_factory_id,
138+
}
139+
}
140+
}
141+
142+
impl Debug for KmsFileFormatOptions {
143+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
144+
f.debug_struct("KmsFileFormatOptions")
145+
.finish_non_exhaustive()
146+
}
147+
}
148+
149+
impl FileFormatOptions for KmsFileFormatOptions {
150+
fn table_options(&self) -> TableOptions {
151+
let mut table_options = TableOptions::default();
152+
table_options.parquet.crypto.factory_id = Some(self.encryption_factory_id.clone());
153+
table_options.parquet.crypto.factory_options =
154+
self.table_encryption.configuration().clone();
155+
table_options
156+
}
157+
158+
fn writer_properties_factory(&self) -> WriterPropertiesFactoryRef {
159+
Arc::clone(&self.writer_properties_factory)
160+
}
161+
162+
fn update_session(&self, session: &dyn Session) -> DeltaResult<()> {
163+
// Ensure DataFusion has the encryption factory registered
164+
session.runtime_env().register_parquet_encryption_factory(
165+
&self.encryption_factory_id,
166+
Arc::clone(self.table_encryption.encryption_factory()),
167+
);
168+
Ok(())
169+
}
170+
}

crates/core/src/test_utils/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
mod factories;
2+
#[cfg(feature = "datafusion")]
3+
pub mod kms_encryption;
24

35
use std::{collections::HashMap, path::PathBuf, process::Command};
46

crates/core/tests/commands_with_encryption.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,9 @@ use datafusion::{
1212
};
1313
use deltalake_core::kernel::{DataType, PrimitiveType, StructField};
1414
use deltalake_core::operations::collect_sendable_stream;
15-
use deltalake_core::operations::encryption::TableEncryption;
1615
use deltalake_core::parquet::encryption::decrypt::FileDecryptionProperties;
17-
use deltalake_core::table::file_format_options::{
18-
FileFormatRef, KmsFileFormatOptions, SimpleFileFormatOptions,
19-
};
16+
use deltalake_core::table::file_format_options::{FileFormatRef, SimpleFileFormatOptions};
17+
use deltalake_core::test_utils::kms_encryption::{KmsFileFormatOptions, TableEncryption};
2018
use deltalake_core::{arrow, parquet, DeltaOps};
2119
use deltalake_core::{operations::optimize::OptimizeType, DeltaTable, DeltaTableError};
2220
use parquet_key_management::{

0 commit comments

Comments
 (0)