diff --git a/Cargo.lock b/Cargo.lock index d6adc0beb8..a9002c54ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2328,6 +2328,29 @@ dependencies = [ "uuid", ] +[[package]] +name = "bson" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3f109694c4f45353972af96bf97d8a057f82e2d6e496457f4d135b9867a518c" +dependencies = [ + "ahash 0.8.12", + "base64", + "bitvec", + "getrandom 0.3.4", + "hex", + "indexmap 2.14.0", + "js-sys", + "rand 0.9.4", + "serde", + "serde_bytes", + "serde_json", + "simdutf8", + "thiserror 2.0.18", + "time", + "uuid", +] + [[package]] name = "bstr" version = "1.12.1" @@ -7015,6 +7038,7 @@ dependencies = [ "apache-avro", "async-trait", "base64", + "bson 3.1.0", "dashmap", "flatbuffers", "http 1.4.1", @@ -8356,7 +8380,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8da0cd419a51a5fb44819e290fbdb0665a54f21dead8923446a799c7f4d26ad9" dependencies = [ - "bson", + "bson 2.15.0", "mongocrypt-sys", "once_cell", "serde", @@ -8376,7 +8400,7 @@ checksum = "276ba0cd571553d1f6936c6f180964776ece6ab7507dc8765f8a9c9c49d8cd00" dependencies = [ "base64", "bitflags 2.11.1", - "bson", + "bson 2.15.0", "derive-where", "derive_more", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 0f9ffd242a..766bce4851 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ bench-runner = { path = "core/bench/runner" } bit-set = "0.10.0" blake3 = "1.8.5" bon = "3.9.1" +bson = { version = "3.1.0", features = ["serde", "serde_json-1"] } byte-unit = { version = "5.2.0", default-features = false, features = ["serde", "byte", "std"] } bytemuck = { version = "1.25", features = ["derive", "min_const_generics"] } bytes = "1.11.1" diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml index 94cfbfc8eb..5c2d68b1ec 100644 --- a/core/connectors/sdk/Cargo.toml +++ b/core/connectors/sdk/Cargo.toml @@ -41,6 +41,7 @@ anyhow = { workspace = true } apache-avro = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } +bson = { workspace = true } dashmap = { workspace = true } flatbuffers = { workspace = true } http = { workspace = true } diff --git a/core/connectors/sdk/src/decoders/bson.rs b/core/connectors/sdk/src/decoders/bson.rs new file mode 100644 index 0000000000..24b07e6c71 --- /dev/null +++ b/core/connectors/sdk/src/decoders/bson.rs @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{Error, Payload, Schema, StreamDecoder}; + +pub struct BsonStreamDecoder; + +impl StreamDecoder for BsonStreamDecoder { + fn schema(&self) -> Schema { + Schema::Bson + } + + fn decode(&self, mut _payload: Vec) -> Result { + unimplemented!() + } +} diff --git a/core/connectors/sdk/src/decoders/mod.rs b/core/connectors/sdk/src/decoders/mod.rs index d10299e830..1e819ed397 100644 --- a/core/connectors/sdk/src/decoders/mod.rs +++ b/core/connectors/sdk/src/decoders/mod.rs @@ -16,6 +16,7 @@ // under the License. pub mod avro; +pub mod bson; pub mod flatbuffer; pub mod json; pub mod proto; diff --git a/core/connectors/sdk/src/encoders/avro.rs b/core/connectors/sdk/src/encoders/avro.rs index 103a3fb499..b04423af6d 100644 --- a/core/connectors/sdk/src/encoders/avro.rs +++ b/core/connectors/sdk/src/encoders/avro.rs @@ -164,6 +164,246 @@ impl AvroStreamEncoder { }) } + fn encode_bson_to_avro(&self, bson_value: bson::Bson) -> Result, Error> { + let schema = self.schema.as_ref().ok_or_else(|| { + error!("Cannot encode BSON to Avro without a schema"); + Error::InvalidConfigValue("Avro schema is required for encoding".to_string()) + })?; + let avro_value = Self::bson_to_avro_value(bson_value, schema)?; + + apache_avro::to_avro_datum(schema, avro_value).map_err(|e| { + error!("Failed to encode Avro datum: {}", e); + Error::Serialization(format!("Avro encoding failed: {e}")) + }) + } + + fn bson_to_avro_value( + value: bson::Bson, + schema: &apache_avro::Schema, + ) -> Result { + use apache_avro::Schema as AvroSchema; + use apache_avro::types::Value as AvroValue; + + match (value, schema) { + (bson::Bson::Document(document), AvroSchema::Record(record_schema)) => { + let mut record = Vec::with_capacity(record_schema.fields.len()); + for field in &record_schema.fields { + let field_value = document + .get(&field.name) + .cloned() + .unwrap_or(bson::Bson::Null); + record.push(( + field.name.clone(), + Self::bson_to_avro_value(field_value, &field.schema)?, + )); + } + Ok(AvroValue::Record(record)) + } + (bson::Bson::Document(document), AvroSchema::Map(map_schema)) => Ok(AvroValue::Map( + document + .into_iter() + .map(|(key, value)| { + Self::bson_to_avro_value(value, &map_schema.types).map(|avro| (key, avro)) + }) + .collect::, _>>()?, + )), + (bson::Bson::Array(values), AvroSchema::Array(array_schema)) => Ok(AvroValue::Array( + values + .into_iter() + .map(|value| Self::bson_to_avro_value(value, &array_schema.items)) + .collect::, _>>()?, + )), + (value, AvroSchema::Union(union_schema)) => { + for (idx, variant_schema) in union_schema.variants().iter().enumerate() { + match Self::bson_to_avro_value(value.clone(), variant_schema) { + Ok(avro_value) if avro_value.validate(variant_schema) => { + return Ok(AvroValue::Union(idx as u32, Box::new(avro_value))); + } + _ => continue, + } + } + Err(Error::Serialization(format!( + "BSON value does not match Avro union schema: {union_schema:?}" + ))) + } + (bson::Bson::Null | bson::Bson::Undefined, AvroSchema::Null) => Ok(AvroValue::Null), + (bson::Bson::Boolean(value), AvroSchema::Boolean) => Ok(AvroValue::Boolean(value)), + (bson::Bson::Int32(value), AvroSchema::Int) => Ok(AvroValue::Int(value)), + (bson::Bson::Int32(value), AvroSchema::Long) => Ok(AvroValue::Long(i64::from(value))), + (bson::Bson::Int32(value), AvroSchema::Float) => Ok(AvroValue::Float(value as f32)), + (bson::Bson::Int32(value), AvroSchema::Double) => Ok(AvroValue::Double(value as f64)), + (bson::Bson::Int64(value), AvroSchema::Int) => { + let value = i32::try_from(value).map_err(|_| { + Error::Serialization(format!("BSON Int64 {value} out of range for Avro Int")) + })?; + Ok(AvroValue::Int(value)) + } + (bson::Bson::Int64(value), AvroSchema::Long) => Ok(AvroValue::Long(value)), + (bson::Bson::Int64(value), AvroSchema::Float) => Ok(AvroValue::Float(value as f32)), + (bson::Bson::Int64(value), AvroSchema::Double) => Ok(AvroValue::Double(value as f64)), + (bson::Bson::Double(value), AvroSchema::Float) => Ok(AvroValue::Float(value as f32)), + (bson::Bson::Double(value), AvroSchema::Double) => Ok(AvroValue::Double(value)), + (bson::Bson::String(value), AvroSchema::String) => Ok(AvroValue::String(value)), + (bson::Bson::Symbol(value), AvroSchema::String) => Ok(AvroValue::String(value)), + (bson::Bson::ObjectId(value), AvroSchema::String) => { + Ok(AvroValue::String(value.to_hex())) + } + (bson::Bson::Decimal128(value), AvroSchema::String) => { + Ok(AvroValue::String(value.to_string())) + } + (bson::Bson::Timestamp(value), AvroSchema::Long) => { + let packed = (u64::from(value.time) << 32) | u64::from(value.increment); + let packed = i64::try_from(packed).map_err(|_| { + Error::Serialization(format!( + "BSON timestamp {value} cannot be represented as Avro Long" + )) + })?; + Ok(AvroValue::Long(packed)) + } + (bson::Bson::Timestamp(value), AvroSchema::Record(record_schema)) => { + let mut document = bson::Document::new(); + document.insert("time", bson::Bson::Int64(i64::from(value.time))); + document.insert("increment", bson::Bson::Int64(i64::from(value.increment))); + Self::bson_to_avro_value( + bson::Bson::Document(document), + &AvroSchema::Record(record_schema.clone()), + ) + } + (bson::Bson::RegularExpression(value), AvroSchema::String) => { + Ok(AvroValue::String(value.to_string())) + } + (bson::Bson::JavaScriptCode(value), AvroSchema::String) => Ok(AvroValue::String(value)), + (bson::Bson::DateTime(value), AvroSchema::Long) => { + Ok(AvroValue::Long(value.timestamp_millis())) + } + (bson::Bson::DateTime(value), AvroSchema::TimestampMillis) => { + Ok(AvroValue::TimestampMillis(value.timestamp_millis())) + } + (bson::Bson::DateTime(value), AvroSchema::TimestampMicros) => { + let micros = value.timestamp_millis().checked_mul(1_000).ok_or_else(|| { + Error::Serialization(format!( + "BSON DateTime {value} cannot be represented as Avro timestamp-micros" + )) + })?; + Ok(AvroValue::TimestampMicros(micros)) + } + (bson::Bson::DateTime(value), AvroSchema::TimestampNanos) => { + let nanos = value + .timestamp_millis() + .checked_mul(1_000_000) + .ok_or_else(|| { + Error::Serialization(format!( + "BSON DateTime {value} cannot be represented as Avro timestamp-nanos" + )) + })?; + Ok(AvroValue::TimestampNanos(nanos)) + } + (bson::Bson::DateTime(value), AvroSchema::LocalTimestampMillis) => { + Ok(AvroValue::LocalTimestampMillis(value.timestamp_millis())) + } + (bson::Bson::DateTime(value), AvroSchema::LocalTimestampMicros) => { + let micros = value.timestamp_millis().checked_mul(1_000).ok_or_else(|| { + Error::Serialization(format!( + "BSON DateTime {value} cannot be represented as Avro local-timestamp-micros" + )) + })?; + Ok(AvroValue::LocalTimestampMicros(micros)) + } + (bson::Bson::DateTime(value), AvroSchema::LocalTimestampNanos) => { + let nanos = value.timestamp_millis().checked_mul(1_000_000).ok_or_else(|| { + Error::Serialization(format!( + "BSON DateTime {value} cannot be represented as Avro local-timestamp-nanos" + )) + })?; + Ok(AvroValue::LocalTimestampNanos(nanos)) + } + (bson::Bson::Binary(value), AvroSchema::Bytes) => Ok(AvroValue::Bytes(value.bytes)), + (bson::Bson::String(value), AvroSchema::Bytes) => { + Ok(AvroValue::Bytes(value.into_bytes())) + } + (bson::Bson::ObjectId(value), AvroSchema::Bytes) => { + Ok(AvroValue::Bytes(value.bytes().to_vec())) + } + (bson::Bson::Decimal128(value), AvroSchema::Bytes) => { + Ok(AvroValue::Bytes(value.bytes().to_vec())) + } + (bson::Bson::Binary(value), AvroSchema::Fixed(fixed_schema)) => { + if value.bytes.len() != fixed_schema.size { + return Err(Error::Serialization(format!( + "BSON binary length {} does not match Avro fixed size {}", + value.bytes.len(), + fixed_schema.size + ))); + } + Ok(AvroValue::Fixed(fixed_schema.size, value.bytes)) + } + (bson::Bson::ObjectId(value), AvroSchema::Fixed(fixed_schema)) => { + let bytes = value.bytes().to_vec(); + if bytes.len() != fixed_schema.size { + return Err(Error::Serialization(format!( + "BSON ObjectId length {} does not match Avro fixed size {}", + bytes.len(), + fixed_schema.size + ))); + } + Ok(AvroValue::Fixed(fixed_schema.size, bytes)) + } + (bson::Bson::Decimal128(value), AvroSchema::Fixed(fixed_schema)) => { + let bytes = value.bytes().to_vec(); + if bytes.len() != fixed_schema.size { + return Err(Error::Serialization(format!( + "BSON Decimal128 length {} does not match Avro fixed size {}", + bytes.len(), + fixed_schema.size + ))); + } + Ok(AvroValue::Fixed(fixed_schema.size, bytes)) + } + (bson::Bson::String(value), AvroSchema::Enum(enum_schema)) => { + let index = enum_schema + .symbols + .iter() + .position(|symbol| symbol == &value) + .ok_or_else(|| { + Error::Serialization(format!( + "BSON string '{value}' is not a symbol in Avro enum {}", + enum_schema.name.name + )) + })?; + Ok(AvroValue::Enum(index as u32, value)) + } + (bson::Bson::String(value), AvroSchema::Uuid) => { + let uuid = uuid::Uuid::parse_str(&value).map_err(|error| { + Error::Serialization(format!("Cannot parse BSON string as Avro UUID: {error}")) + })?; + Ok(AvroValue::Uuid(uuid)) + } + (bson::Bson::Int32(value), AvroSchema::Date) => Ok(AvroValue::Date(value)), + (bson::Bson::DateTime(value), AvroSchema::Date) => { + let days = value.timestamp_millis() / 86_400_000; + let days = i32::try_from(days).map_err(|_| { + Error::Serialization(format!( + "BSON DateTime {value} cannot be represented as Avro Date" + )) + })?; + Ok(AvroValue::Date(days)) + } + (bson::Bson::Int32(value), AvroSchema::TimeMillis) => Ok(AvroValue::TimeMillis(value)), + (bson::Bson::Int64(value), AvroSchema::TimeMillis) => { + let value = i32::try_from(value).map_err(|_| { + Error::Serialization(format!( + "BSON Int64 {value} out of range for Avro TimeMillis" + )) + })?; + Ok(AvroValue::TimeMillis(value)) + } + (bson::Bson::Int64(value), AvroSchema::TimeMicros) => Ok(AvroValue::TimeMicros(value)), + (value, schema) => Err(Error::Serialization(format!( + "Cannot convert BSON value {value:?} to Avro schema {schema:?}" + ))), + } + } + fn serde_json_to_avro_value( value: serde_json::Value, schema: &apache_avro::Schema, @@ -350,6 +590,7 @@ impl StreamEncoder for AvroStreamEncoder { match transformed_payload { Payload::Json(json_value) => self.encode_json_to_avro(json_value), + Payload::Bson(bson_value) => self.encode_bson_to_avro(bson_value), Payload::Text(text) => self.encode_text_to_avro(text), Payload::Raw(data) => self.encode_raw_to_avro(data), Payload::Avro(data) => Ok(data), @@ -402,6 +643,375 @@ mod tests { assert!(!encoded_data.is_empty()); } + #[test] + fn encode_should_handle_bson_payload_with_schema() { + let schema_json = create_test_schema_json(); + let config = AvroEncoderConfig { + schema_json: Some(schema_json), + ..AvroEncoderConfig::default() + }; + let encoder = AvroStreamEncoder::new(config); + + let payload = bson::doc! { + "name": "Alice", + "age": 30_i32, + }; + + let result = encoder.encode(Payload::Bson(bson::Bson::Document(payload))); + + assert!(result.is_ok()); + assert!(!result.unwrap().is_empty()); + } + + #[test] + fn bson_to_avro_value_should_convert_record_array_map_and_union() { + let schema = AvroSchema::parse_str( + r#"{ + "type": "record", + "name": "UserProfile", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"}, + {"name": "tags", "type": {"type": "array", "items": "string"}}, + {"name": "scores", "type": {"type": "map", "values": "long"}}, + {"name": "nickname", "type": ["null", "string"]} + ] + }"#, + ) + .unwrap(); + let value = bson::Bson::Document(bson::doc! { + "name": "Alice", + "age": 30_i32, + "tags": ["rust", "bson"], + "scores": { + "math": 90_i64, + "science": 95_i64, + }, + "nickname": bson::Bson::Null, + }); + + let result = AvroStreamEncoder::bson_to_avro_value(value, &schema).unwrap(); + + match result { + apache_avro::types::Value::Record(fields) => { + assert_eq!( + fields[0].1, + apache_avro::types::Value::String("Alice".into()) + ); + assert_eq!(fields[1].1, apache_avro::types::Value::Int(30)); + assert!(matches!(fields[2].1, apache_avro::types::Value::Array(_))); + assert!(matches!(fields[3].1, apache_avro::types::Value::Map(_))); + assert_eq!( + fields[4].1, + apache_avro::types::Value::Union(0, Box::new(apache_avro::types::Value::Null)) + ); + } + other => panic!("Expected Avro record, got {other:?}"), + } + } + + #[test] + fn bson_to_avro_value_should_convert_object_id_to_fixed() { + let schema = AvroSchema::parse_str( + r#"{ + "type": "fixed", + "name": "ObjectIdBytes", + "size": 12 + }"#, + ) + .unwrap(); + let object_id = bson::oid::ObjectId::new(); + let expected = object_id.bytes().to_vec(); + + let result = + AvroStreamEncoder::bson_to_avro_value(bson::Bson::ObjectId(object_id), &schema) + .unwrap(); + + assert_eq!(result, apache_avro::types::Value::Fixed(12, expected)); + } + + #[test] + fn bson_to_avro_value_should_convert_datetime_to_timestamp_millis() { + let schema = AvroSchema::parse_str( + r#"{ + "type": "long", + "logicalType": "timestamp-millis" + }"#, + ) + .unwrap(); + let datetime = bson::DateTime::from_millis(1_234); + + let result = + AvroStreamEncoder::bson_to_avro_value(bson::Bson::DateTime(datetime), &schema).unwrap(); + + assert_eq!(result, apache_avro::types::Value::TimestampMillis(1_234)); + } + + #[test] + fn bson_to_avro_value_should_cover_scalar_type_mappings() { + use apache_avro::types::Value as AvroValue; + + let cases = vec![ + (bson::Bson::Null, r#""null""#, AvroValue::Null), + (bson::Bson::Undefined, r#""null""#, AvroValue::Null), + ( + bson::Bson::Boolean(true), + r#""boolean""#, + AvroValue::Boolean(true), + ), + (bson::Bson::Int32(7), r#""int""#, AvroValue::Int(7)), + (bson::Bson::Int32(7), r#""long""#, AvroValue::Long(7)), + (bson::Bson::Int32(7), r#""float""#, AvroValue::Float(7.0)), + (bson::Bson::Int32(7), r#""double""#, AvroValue::Double(7.0)), + (bson::Bson::Int64(8), r#""int""#, AvroValue::Int(8)), + (bson::Bson::Int64(8), r#""long""#, AvroValue::Long(8)), + (bson::Bson::Int64(8), r#""float""#, AvroValue::Float(8.0)), + (bson::Bson::Int64(8), r#""double""#, AvroValue::Double(8.0)), + (bson::Bson::Double(1.5), r#""float""#, AvroValue::Float(1.5)), + ( + bson::Bson::Double(1.5), + r#""double""#, + AvroValue::Double(1.5), + ), + ( + bson::Bson::String("hello".to_string()), + r#""string""#, + AvroValue::String("hello".to_string()), + ), + ( + bson::Bson::Symbol("symbol".to_string()), + r#""string""#, + AvroValue::String("symbol".to_string()), + ), + ( + bson::Bson::String("abc".to_string()), + r#""bytes""#, + AvroValue::Bytes(b"abc".to_vec()), + ), + ( + bson::Bson::Int32(3), + r#"{ "type": "int", "logicalType": "date" }"#, + AvroValue::Date(3), + ), + ( + bson::Bson::Int32(4), + r#"{ "type": "int", "logicalType": "time-millis" }"#, + AvroValue::TimeMillis(4), + ), + ( + bson::Bson::Int64(5), + r#"{ "type": "int", "logicalType": "time-millis" }"#, + AvroValue::TimeMillis(5), + ), + ( + bson::Bson::Int64(6), + r#"{ "type": "long", "logicalType": "time-micros" }"#, + AvroValue::TimeMicros(6), + ), + ]; + + for (bson_value, schema_json, expected) in cases { + let schema = AvroSchema::parse_str(schema_json).unwrap(); + let result = AvroStreamEncoder::bson_to_avro_value(bson_value, &schema).unwrap(); + assert_eq!(result, expected); + } + } + + #[test] + fn bson_to_avro_value_should_cover_binary_fixed_and_special_string_mappings() { + use apache_avro::types::Value as AvroValue; + + let bytes_schema = AvroSchema::parse_str(r#""bytes""#).unwrap(); + let fixed_schema = AvroSchema::parse_str( + r#"{ + "type": "fixed", + "name": "Fixed4", + "size": 4 + }"#, + ) + .unwrap(); + let string_schema = AvroSchema::parse_str(r#""string""#).unwrap(); + + let binary = bson::Binary { + subtype: bson::spec::BinarySubtype::Generic, + bytes: vec![1, 2, 3, 4], + }; + assert_eq!( + AvroStreamEncoder::bson_to_avro_value( + bson::Bson::Binary(binary.clone()), + &bytes_schema + ) + .unwrap(), + AvroValue::Bytes(vec![1, 2, 3, 4]) + ); + assert_eq!( + AvroStreamEncoder::bson_to_avro_value(bson::Bson::Binary(binary), &fixed_schema) + .unwrap(), + AvroValue::Fixed(4, vec![1, 2, 3, 4]) + ); + + let decimal = bson::Decimal128::from_bytes([7; 16]); + assert_eq!( + AvroStreamEncoder::bson_to_avro_value( + bson::Bson::Decimal128(decimal), + &AvroSchema::parse_str( + r#"{ + "type": "fixed", + "name": "DecimalBytes", + "size": 16 + }"# + ) + .unwrap() + ) + .unwrap(), + AvroValue::Fixed(16, vec![7; 16]) + ); + + let object_id = bson::oid::ObjectId::new(); + assert_eq!( + AvroStreamEncoder::bson_to_avro_value(bson::Bson::ObjectId(object_id), &bytes_schema) + .unwrap(), + AvroValue::Bytes(object_id.bytes().to_vec()) + ); + assert_eq!( + AvroStreamEncoder::bson_to_avro_value(bson::Bson::ObjectId(object_id), &string_schema) + .unwrap(), + AvroValue::String(object_id.to_hex()) + ); + + let regex = bson::Regex { + pattern: bson::raw::CString::try_from("a.*").unwrap(), + options: bson::raw::CString::try_from("i").unwrap(), + }; + assert_eq!( + AvroStreamEncoder::bson_to_avro_value( + bson::Bson::RegularExpression(regex), + &string_schema + ) + .unwrap(), + AvroValue::String("/a.*/i".to_string()) + ); + assert_eq!( + AvroStreamEncoder::bson_to_avro_value( + bson::Bson::JavaScriptCode("return x;".to_string()), + &string_schema + ) + .unwrap(), + AvroValue::String("return x;".to_string()) + ); + } + + #[test] + fn bson_to_avro_value_should_cover_enum_uuid_and_timestamp_mappings() { + use apache_avro::types::Value as AvroValue; + + let enum_schema = AvroSchema::parse_str( + r#"{ + "type": "enum", + "name": "Status", + "symbols": ["OPEN", "CLOSED"] + }"#, + ) + .unwrap(); + assert_eq!( + AvroStreamEncoder::bson_to_avro_value( + bson::Bson::String("CLOSED".to_string()), + &enum_schema + ) + .unwrap(), + AvroValue::Enum(1, "CLOSED".to_string()) + ); + + let uuid_schema = + AvroSchema::parse_str(r#"{ "type": "string", "logicalType": "uuid" }"#).unwrap(); + let uuid = uuid::Uuid::new_v4(); + assert_eq!( + AvroStreamEncoder::bson_to_avro_value( + bson::Bson::String(uuid.to_string()), + &uuid_schema + ) + .unwrap(), + AvroValue::Uuid(uuid) + ); + + let timestamp = bson::Timestamp { + time: 7, + increment: 9, + }; + let long_schema = AvroSchema::parse_str(r#""long""#).unwrap(); + assert_eq!( + AvroStreamEncoder::bson_to_avro_value(bson::Bson::Timestamp(timestamp), &long_schema) + .unwrap(), + AvroValue::Long((7_i64 << 32) | 9_i64) + ); + + let record_schema = AvroSchema::parse_str( + r#"{ + "type": "record", + "name": "BsonTimestamp", + "fields": [ + {"name": "time", "type": "long"}, + {"name": "increment", "type": "long"} + ] + }"#, + ) + .unwrap(); + assert_eq!( + AvroStreamEncoder::bson_to_avro_value(bson::Bson::Timestamp(timestamp), &record_schema) + .unwrap(), + AvroValue::Record(vec![ + ("time".to_string(), AvroValue::Long(7)), + ("increment".to_string(), AvroValue::Long(9)), + ]) + ); + } + + #[test] + fn bson_to_avro_value_should_cover_datetime_logical_type_mappings() { + use apache_avro::types::Value as AvroValue; + + let datetime = bson::DateTime::from_millis(86_400_123); + let cases = vec![ + (r#""long""#, AvroValue::Long(86_400_123)), + ( + r#"{ "type": "long", "logicalType": "timestamp-millis" }"#, + AvroValue::TimestampMillis(86_400_123), + ), + ( + r#"{ "type": "long", "logicalType": "timestamp-micros" }"#, + AvroValue::TimestampMicros(86_400_123_000), + ), + ( + r#"{ "type": "long", "logicalType": "timestamp-nanos" }"#, + AvroValue::TimestampNanos(86_400_123_000_000), + ), + ( + r#"{ "type": "long", "logicalType": "local-timestamp-millis" }"#, + AvroValue::LocalTimestampMillis(86_400_123), + ), + ( + r#"{ "type": "long", "logicalType": "local-timestamp-micros" }"#, + AvroValue::LocalTimestampMicros(86_400_123_000), + ), + ( + r#"{ "type": "long", "logicalType": "local-timestamp-nanos" }"#, + AvroValue::LocalTimestampNanos(86_400_123_000_000), + ), + ( + r#"{ "type": "int", "logicalType": "date" }"#, + AvroValue::Date(1), + ), + ]; + + for (schema_json, expected) in cases { + let schema = AvroSchema::parse_str(schema_json).unwrap(); + let result = + AvroStreamEncoder::bson_to_avro_value(bson::Bson::DateTime(datetime), &schema) + .unwrap(); + assert_eq!(result, expected); + } + } + #[test] fn encode_should_pass_through_avro_payload() { let encoder = AvroStreamEncoder::default(); diff --git a/core/connectors/sdk/src/encoders/bson.rs b/core/connectors/sdk/src/encoders/bson.rs new file mode 100644 index 0000000000..c489c40c1b --- /dev/null +++ b/core/connectors/sdk/src/encoders/bson.rs @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{Error, Payload, Schema, StreamEncoder}; + +pub struct BsonStreamEncoder; + +impl StreamEncoder for BsonStreamEncoder { + fn schema(&self) -> Schema { + Schema::Bson + } + + fn encode(&self, _payload: Payload) -> Result, Error> { + unimplemented!() + } +} diff --git a/core/connectors/sdk/src/encoders/flatbuffer.rs b/core/connectors/sdk/src/encoders/flatbuffer.rs index 32c60daaf2..cedf2274eb 100644 --- a/core/connectors/sdk/src/encoders/flatbuffer.rs +++ b/core/connectors/sdk/src/encoders/flatbuffer.rs @@ -95,6 +95,10 @@ impl FlatBufferStreamEncoder { } } + fn encode_bson_to_flatbuffer(&self, _bson_value: bson::Bson) -> Result, Error> { + unimplemented!() + } + fn encode_json_to_flatbuffer( &self, json_value: simd_json::OwnedValue, @@ -238,6 +242,7 @@ impl StreamEncoder for FlatBufferStreamEncoder { Payload::FlatBuffer(data) => Ok(data), Payload::Proto(text) => self.encode_text_to_flatbuffer(text), Payload::Avro(data) => self.encode_raw_to_flatbuffer(data), + Payload::Bson(bson_value) => self.encode_bson_to_flatbuffer(bson_value), } } } diff --git a/core/connectors/sdk/src/encoders/mod.rs b/core/connectors/sdk/src/encoders/mod.rs index d10299e830..1e819ed397 100644 --- a/core/connectors/sdk/src/encoders/mod.rs +++ b/core/connectors/sdk/src/encoders/mod.rs @@ -16,6 +16,7 @@ // under the License. pub mod avro; +pub mod bson; pub mod flatbuffer; pub mod json; pub mod proto; diff --git a/core/connectors/sdk/src/encoders/proto.rs b/core/connectors/sdk/src/encoders/proto.rs index 8a2590b9aa..c01825c913 100644 --- a/core/connectors/sdk/src/encoders/proto.rs +++ b/core/connectors/sdk/src/encoders/proto.rs @@ -21,6 +21,7 @@ use iggy_common::IggyTimestamp; use prost::Message; use prost_types::Any; use serde::{Deserialize, Serialize}; +use simd_json::OwnedValue; use std::collections::HashMap; use std::path::PathBuf; use tracing::{error, info}; @@ -374,6 +375,11 @@ impl ProtoStreamEncoder { "avro_size": data.len(), "data": general_purpose::STANDARD.encode(&data) }), + Payload::Bson(bson_value) => { + let json_value = + serde_json::to_value(bson_value).map_err(|_| Error::InvalidBsonPayload)?; + OwnedValue::try_from(json_value).map_err(|_| Error::InvalidBsonPayload)? + } }; if let simd_json::OwnedValue::Object(json_map) = json_value { @@ -635,6 +641,17 @@ impl ProtoStreamEncoder { ), data, ), + Payload::Bson(bson_value) => { + let bson_string = + simd_json::to_string(&bson_value).map_err(|_| Error::InvalidBsonPayload)?; + ( + format!( + "{}/google.protobuf.StringValue", + self.config.format_options.type_url_prefix + ), + bson_string.into_bytes(), + ) + } }; let any = Any { @@ -655,6 +672,9 @@ impl ProtoStreamEncoder { Payload::Proto(text) => Ok(text.into_bytes()), Payload::FlatBuffer(data) => Ok(data), Payload::Avro(data) => Ok(data), + Payload::Bson(bson_value) => { + serde_json::to_vec(&bson_value).map_err(|_| Error::InvalidBsonPayload) + } } } diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs index 2e16d2a5e2..d0c4aa9edf 100644 --- a/core/connectors/sdk/src/lib.rs +++ b/core/connectors/sdk/src/lib.rs @@ -49,6 +49,8 @@ pub use convert::owned_value_to_serde_json; pub use log::LogCallback; pub use transforms::Transform; +use crate::{decoders::bson::BsonStreamDecoder, encoders::bson::BsonStreamEncoder}; + #[doc(hidden)] pub mod connector_macro_support { pub use dashmap::DashMap; @@ -143,6 +145,7 @@ pub enum Payload { Proto(String), FlatBuffer(Vec), Avro(Vec), + Bson(bson::Bson), } impl Payload { @@ -157,6 +160,9 @@ impl Payload { Payload::Proto(text) => Ok(text.into_bytes()), Payload::FlatBuffer(value) => Ok(value), Payload::Avro(value) => Ok(value), + Payload::Bson(value) => { + Ok(bson::serialize_to_vec(&value).map_err(|_| Error::InvalidBsonPayload)?) + } } } @@ -186,6 +192,9 @@ impl Payload { Payload::Proto(text) => Ok(text.as_bytes().to_vec()), Payload::FlatBuffer(value) => Ok(value.clone()), Payload::Avro(value) => Ok(value.clone()), + Payload::Bson(value) => { + bson::serialize_to_vec(value).map_err(|_| Error::InvalidBsonPayload) + } } } } @@ -203,6 +212,11 @@ impl std::fmt::Display for Payload { Payload::Proto(text) => write!(f, "Proto({text})"), Payload::FlatBuffer(value) => write!(f, "FlatBuffer({} bytes)", value.len()), Payload::Avro(value) => write!(f, "Avro({} bytes)", value.len()), + Payload::Bson(value) => write!( + f, + "Bson({})", + serde_json::to_string_pretty(value).unwrap_or_default() + ), } } } @@ -226,6 +240,8 @@ pub enum Schema { FlatBuffer, #[strum(to_string = "avro")] Avro, + #[strum(to_string = "bson")] + Bson, } impl Schema { @@ -250,6 +266,9 @@ impl Schema { }, Schema::FlatBuffer => Ok(Payload::FlatBuffer(value)), Schema::Avro => Ok(Payload::Avro(value)), + Schema::Bson => Ok(Payload::Bson( + bson::serialize_to_bson(&value).map_err(|_| Error::InvalidBsonPayload)?, + )), } } @@ -261,6 +280,7 @@ impl Schema { Schema::Proto => Arc::new(ProtoStreamDecoder::default()), Schema::FlatBuffer => Arc::new(FlatBufferStreamDecoder::default()), Schema::Avro => Arc::new(AvroStreamDecoder::default()), + Schema::Bson => Arc::new(BsonStreamDecoder), } } @@ -272,6 +292,7 @@ impl Schema { Schema::Proto => Arc::new(ProtoStreamEncoder::default()), Schema::FlatBuffer => Arc::new(FlatBufferStreamEncoder::default()), Schema::Avro => Arc::new(AvroStreamEncoder::default()), + Schema::Bson => Arc::new(BsonStreamEncoder), } } } @@ -395,6 +416,8 @@ pub enum Error { InvalidPayloadType, #[error("Invalid JSON payload.")] InvalidJsonPayload, + #[error("Invalid BSON payload.")] + InvalidBsonPayload, #[error("Invalid text payload.")] InvalidTextPayload, #[error("Cannot decode schema {0}")] diff --git a/core/connectors/sdk/src/transforms/proto_convert.rs b/core/connectors/sdk/src/transforms/proto_convert.rs index a8874ea1d7..28d23bc860 100644 --- a/core/connectors/sdk/src/transforms/proto_convert.rs +++ b/core/connectors/sdk/src/transforms/proto_convert.rs @@ -479,6 +479,7 @@ impl ProtoConvert { Err(Error::InvalidPayloadType) } } + Schema::Bson => self.protobuf_to_bson(payload), } } @@ -504,9 +505,14 @@ impl ProtoConvert { Err(Error::InvalidPayloadType) } } + Schema::Bson => self.bson_to_protobuf(payload), } } + fn protobuf_to_bson(&self, _payload: Payload) -> Result { + unimplemented!() + } + fn protobuf_to_json(&self, payload: Payload) -> Result { match payload { Payload::Proto(proto_text) => { @@ -566,6 +572,10 @@ impl ProtoConvert { } } + fn bson_to_protobuf(&self, _payload: Payload) -> Result { + unimplemented!() + } + fn json_to_protobuf(&self, payload: Payload) -> Result { match payload { Payload::Json(json_value) => { diff --git a/core/connectors/sinks/http_sink/src/lib.rs b/core/connectors/sinks/http_sink/src/lib.rs index da896b0db4..6db4a2f077 100644 --- a/core/connectors/sinks/http_sink/src/lib.rs +++ b/core/connectors/sinks/http_sink/src/lib.rs @@ -408,6 +408,7 @@ impl HttpSink { serde_json::to_value(encoded) .map_err(|e| Error::Serialization(format!("EncodedPayload: {}", e))) } + Payload::Bson(_bson_value) => todo!(), } }