Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 109 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Runtime configuration, via [`ConfigOptions`]

use arrow::array::timezone::Tz;
use arrow_ipc::CompressionType;

#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -720,6 +721,42 @@ impl From<SpillCompression> for Option<CompressionType> {
}
}

/// A validated timezone configuration value.
#[derive(Debug, Copy, Clone)]
pub struct ConfigTimeZone(Tz);

impl ConfigTimeZone {
/// Returns the parsed timezone value.
pub fn tz(&self) -> Tz {
self.0
}
}

impl FromStr for ConfigTimeZone {
type Err = DataFusionError;

fn from_str(value: &str) -> Result<Self> {
value
.parse::<Tz>()
.map(Self)
.map_err(|e| DataFusionError::Configuration(e.to_string()))
}
}

impl Display for ConfigTimeZone {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}

impl PartialEq for ConfigTimeZone {
fn eq(&self, other: &Self) -> bool {
self.to_string() == other.to_string()
}
}

impl Eq for ConfigTimeZone {}

config_namespace! {
/// Options related to query execution
///
Expand Down Expand Up @@ -770,7 +807,7 @@ config_namespace! {
/// The default time zone
///
/// Some functions, e.g. `now` return timestamps in this time zone
pub time_zone: Option<String>, default = None
pub time_zone: Option<ConfigTimeZone>, default = None

/// Parquet options
pub parquet: ParquetOptions, default = Default::default()
Expand Down Expand Up @@ -2238,6 +2275,38 @@ impl<F: ConfigField + Default> ConfigField for Option<F> {
}
}

impl ConfigField for Option<ConfigTimeZone> {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
match self {
Some(tz) => v.some(key, tz, description),
None => v.none(key, description),
}
}

fn set(&mut self, key: &str, value: &str) -> Result<()> {
if !key.is_empty() {
return _config_err!(
"Config field is an optional timezone and does not have nested field \"{}\"",
key
);
}
*self = Some(value.parse()?);
Ok(())
}

fn reset(&mut self, key: &str) -> Result<()> {
if key.is_empty() {
*self = None;
Ok(())
} else {
_config_err!(
"Config field is an optional timezone and does not have nested field \"{}\"",
key
)
}
}
}

/// Default transformation to parse a [`ConfigField`] for a string.
///
/// This uses [`FromStr`] to parse the data.
Expand Down Expand Up @@ -4324,6 +4393,45 @@ mod tests {
);
}

#[test]
fn test_execution_time_zone_validation() {
use crate::config::{ConfigOptions, ConfigTimeZone};

let mut config = ConfigOptions::default();

for valid in ["+08:00", "-08:00", "+0800", "+08", "Asia/Taipei"] {
config.set("datafusion.execution.time_zone", valid).unwrap();
assert_eq!(
config.execution.time_zone.as_ref().map(ToString::to_string),
Some(valid.parse::<ConfigTimeZone>().unwrap().to_string())
);
}

let previous = config.execution.time_zone;
for invalid in ["+08:00:00", "08:00", "08", "Asia/Taipei2"] {
let err = config
.set("datafusion.execution.time_zone", invalid)
.unwrap_err();
assert_eq!(
err.to_string(),
format!(
"Invalid or Unsupported Configuration: Parser error: Invalid timezone {invalid:?}: failed to parse timezone"
)
);
assert_eq!(config.execution.time_zone, previous);
}

let err = config
.set("datafusion.execution.time_zone.name", "UTC")
.unwrap_err();
assert!(
err.strip_backtrace()
.contains("does not have nested field \"name\""),
"Unexpected error {err:?}"
);
assert_eq!(config.execution.time_zone, previous);
}

#[cfg(feature = "parquet")]
#[test]
fn set_cdc_enabled_flag() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1970,7 +1970,12 @@ async fn test_config_options_work_for_scalar_func() -> Result<()> {
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let tz = args.config_options.execution.time_zone.clone();
let tz = args
.config_options
.execution
.time_zone
.as_ref()
.map(ToString::to_string);
Ok(ColumnarValue::Scalar(ScalarValue::from(tz)))
}
}
Expand All @@ -1980,7 +1985,7 @@ async fn test_config_options_work_for_scalar_func() -> Result<()> {
});

let mut config = SessionConfig::new();
config.options_mut().execution.time_zone = Some("AEST".into());
config.options_mut().execution.time_zone = Some("Australia/Sydney".parse().unwrap());

let ctx = SessionContext::new_with_config(config);

Expand All @@ -1993,7 +1998,7 @@ async fn test_config_options_work_for_scalar_func() -> Result<()> {
let expected_schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
let expected = RecordBatch::try_new(
SchemaRef::from(expected_schema),
vec![create_array!(Utf8, ["AEST"])],
vec![create_array!(Utf8, ["Australia/Sydney"])],
)?;

assert_eq!(expected, actual[0]);
Expand Down
7 changes: 6 additions & 1 deletion datafusion/ffi/src/tests/udf_udaf_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ impl ScalarUDFImpl for TimeZoneUDF {
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let tz = args.config_options.execution.time_zone.clone();
let tz = args
.config_options
.execution
.time_zone
.as_ref()
.map(ToString::to_string);
Ok(ColumnarValue::Scalar(ScalarValue::from(tz)))
}
}
Expand Down
5 changes: 3 additions & 2 deletions datafusion/ffi/tests/ffi_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ mod tests {
assert!(result[0].column(0).as_string::<i32>().is_null(0));

let mut config = SessionConfig::new();
config.options_mut().execution.time_zone = Some("AEST".into());
config.options_mut().execution.time_zone =
Some("Australia/Sydney".parse().unwrap());

let ctx = SessionContext::new_with_config(config);

Expand All @@ -148,7 +149,7 @@ mod tests {
assert!(result.len() == 1);
assert!(!result[0].column(0).as_string::<i32>().is_null(0));
let result = result[0].column(0).as_string::<i32>().value(0);
assert_eq!(result, "AEST");
assert_eq!(result, "Australia/Sydney");

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/benches/to_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let arg_field = Field::new("a", DataType::Utf8, false).into();
let arg_fields = vec![arg_field];
let mut options = ConfigOptions::default();
options.execution.time_zone = Some("UTC".into());
options.execution.time_zone = Some("UTC".parse().unwrap());
let config_options = Arc::new(options);

let to_timestamp_udf = to_timestamp(config_options.as_ref());
Expand Down
3 changes: 1 addition & 2 deletions datafusion/functions/src/datetime/current_date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::timezone::Tz;
use arrow::datatypes::DataType;
use arrow::datatypes::DataType::Date32;
use chrono::{Datelike, NaiveDate, TimeZone};
Expand Down Expand Up @@ -121,7 +120,7 @@ impl ScalarUDFImpl for CurrentDateFunc {
.execution
.time_zone
.as_ref()
.and_then(|tz| tz.parse::<Tz>().ok())
.map(|tz| tz.tz())
.map_or_else(
|| datetime_to_days(&now_ts),
|tz| {
Expand Down
5 changes: 2 additions & 3 deletions datafusion/functions/src/datetime/current_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::timezone::Tz;
use arrow::datatypes::DataType;
use arrow::datatypes::DataType::Time64;
use arrow::datatypes::TimeUnit::Nanosecond;
Expand Down Expand Up @@ -118,7 +117,7 @@ impl ScalarUDFImpl for CurrentTimeFunc {
.execution
.time_zone
.as_ref()
.and_then(|tz| tz.parse::<Tz>().ok())
.map(|tz| tz.tz())
.map_or_else(
|| datetime_to_time_nanos(&now_ts),
|tz| {
Expand Down Expand Up @@ -160,7 +159,7 @@ mod tests {
config.execution.time_zone = if tz.is_empty() {
None
} else {
Some(tz.to_string())
Some(tz.parse().unwrap())
};
let schema = Arc::new(DFSchema::empty());
SimplifyContext::builder()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/src/datetime/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl NowFunc {
.execution
.time_zone
.as_ref()
.map(|tz| Arc::from(tz.as_str())),
.map(|tz| Arc::from(tz.to_string())),
}
}
}
Expand Down
Loading