Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/quickwit-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ datafusion-substrait = "53"
datafusion-datasource = "53"
datafusion-physical-plan = "53"
datafusion-datasource-parquet = "53"
datafusion-distributed = "1.0"
datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed.git", rev = "d7de9c3fb99fd7fa99da925b460dc2d4529c04a2" }
object_store = "0.13"

[dev-dependencies]
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-df-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ async-trait = { workspace = true }
futures = { workspace = true }
prost = { workspace = true } # substrait Plan::decode (runtime only)
serde_json = { workspace = true }
siphasher = { workspace = true }
tracing = { workspace = true }
url = "2"

arrow = { workspace = true }
datafusion = "53"
datafusion-substrait = "53"
datafusion-distributed = "1.0"
datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed.git", rev = "d7de9c3fb99fd7fa99da925b460dc2d4529c04a2" }
object_store = "0.13"

# gRPC surface is opt-in. The extension traits + session builder do not need
Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-df-core/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ message ExecuteSqlRequest {

// Optional per-request session overrides.
map<string, string> properties = 2;

// When true, the server returns the service-level EXPLAIN output for the
// final SQL statement instead of executing it. DDL statements before the
// final query are still executed for side effects.
bool explain = 3;
}

message ExecuteSqlResponse {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions quickwit/quickwit-df-core/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,17 @@ impl data_fusion_service_server::DataFusionService for DataFusionServiceGrpcImpl
let req = request.into_inner();
let service = Arc::clone(&self.service);

let output = if req.explain {
DataFusionOutput::Explain
} else {
DataFusionOutput::Records
};
let execution = service
.execute(DataFusionRequest::records(
DataFusionInput::Sql(&req.sql),
&req.properties,
))
.execute(DataFusionRequest {
input: DataFusionInput::Sql(&req.sql),
output,
properties: &req.properties,
})
.await
.map_err(|err| {
warn!(error = %err, "DataFusion SQL execution error");
Expand Down
37 changes: 20 additions & 17 deletions quickwit/quickwit-df-core/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,18 +225,18 @@ pub struct DataFusionRatioStatistics {
}

impl DataFusionExecutionMetadata {
pub fn physical_plan_display(&self) -> String {
physical_plan_display(&self.physical_plan)
pub async fn physical_plan_display(&self) -> String {
physical_plan_display(&self.physical_plan).await
}

pub fn physical_plan_metadata(&self) -> DataFusionPhysicalPlanMetadata {
physical_plan_metadata(&self.physical_plan)
pub async fn physical_plan_metadata(&self) -> DataFusionPhysicalPlanMetadata {
physical_plan_metadata(&self.physical_plan).await
}
}

impl SubstraitExecutionMetadata {
pub fn physical_plan_display(&self) -> String {
physical_plan_display(&self.physical_plan)
pub async fn physical_plan_display(&self) -> String {
physical_plan_display(&self.physical_plan).await
}
}

Expand Down Expand Up @@ -461,20 +461,21 @@ impl DataFusionService {
execute_stream(Arc::clone(&physical_plan), ctx.task_ctx())?
}
DataFusionOutput::Explain => {
explain_stream(&logical_plan_display, &physical_plan, None)?
explain_stream(&logical_plan_display, &physical_plan, None).await?
}
DataFusionOutput::ExplainAnalyze => {
let analyze_execution_start = Instant::now();
let num_rows =
execute_plan_for_metrics(Arc::clone(&physical_plan), ctx.task_ctx()).await?;
analyze_execution_duration = Some(analyze_execution_start.elapsed());
analyze_output_rows = Some(num_rows);
let physical_plan_metadata = physical_plan_metadata(&physical_plan);
let physical_plan_metadata = physical_plan_metadata(&physical_plan).await;
explain_stream(
&logical_plan_display,
&physical_plan,
Some(&physical_plan_metadata),
)?
)
.await?
}
};
let stream_creation_duration = stream_creation_start.elapsed();
Expand Down Expand Up @@ -813,7 +814,7 @@ async fn execute_plan_for_metrics(
Ok(num_rows)
}

fn explain_stream(
async fn explain_stream(
logical_plan: &str,
physical_plan: &Arc<dyn ExecutionPlan>,
physical_plan_metadata: Option<&DataFusionPhysicalPlanMetadata>,
Expand All @@ -827,7 +828,7 @@ fn explain_stream(
plan_types.push("execution_statistics_json".to_string());
plans.push(physical_plan_metadata.statistics.to_json_string());
} else {
physical_plan_display_text = physical_plan_display(physical_plan);
physical_plan_display_text = physical_plan_display(physical_plan).await;
plans.push(physical_plan_display_text);
}
let plan_type: ArrayRef = Arc::new(StringArray::from(plan_types));
Expand All @@ -850,10 +851,10 @@ fn explain_schema() -> SchemaRef {
]))
}

fn physical_plan_metadata(
async fn physical_plan_metadata(
physical_plan: &Arc<dyn ExecutionPlan>,
) -> DataFusionPhysicalPlanMetadata {
let (physical_plan, show_metrics) = physical_plan_with_metrics(physical_plan);
let (physical_plan, show_metrics) = physical_plan_with_metrics(physical_plan).await;
let display = if physical_plan.as_any().is::<DistributedExec>() {
display_plan_ascii(physical_plan.as_ref(), show_metrics)
} else {
Expand All @@ -868,18 +869,20 @@ fn physical_plan_metadata(
}
}

fn physical_plan_display(physical_plan: &Arc<dyn ExecutionPlan>) -> String {
physical_plan_metadata(physical_plan).display
async fn physical_plan_display(physical_plan: &Arc<dyn ExecutionPlan>) -> String {
physical_plan_metadata(physical_plan).await.display
}

fn physical_plan_with_metrics(
async fn physical_plan_with_metrics(
physical_plan: &Arc<dyn ExecutionPlan>,
) -> (Arc<dyn ExecutionPlan>, bool) {
if physical_plan.as_any().is::<DistributedExec>() {
return match rewrite_distributed_plan_with_metrics(
Arc::clone(physical_plan),
DistributedMetricsFormat::PerTask,
) {
)
.await
{
Ok(physical_plan) => (physical_plan, true),
Err(_) => (Arc::clone(physical_plan), false),
};
Expand Down
8 changes: 6 additions & 2 deletions quickwit/quickwit-df-core/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use datafusion::execution::object_store::ObjectStoreRegistry;
use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, TaskEstimator, WorkerResolver,
DistributedExt, SessionStateBuilderExt, TaskEstimator, TaskRoutingContext, WorkerResolver,
};

use crate::data_source::{
Expand Down Expand Up @@ -346,7 +346,7 @@ impl DataFusionSessionBuilder {
builder = builder
.with_distributed_worker_resolver(ArcWorkerResolver(Arc::clone(resolver)))
.with_distributed_task_estimator(ArcTaskEstimator(Arc::clone(&self.task_estimator)))
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule));
.with_distributed_planner();
}

Ok(SessionContext::new_with_state(builder.build()))
Expand Down Expand Up @@ -386,6 +386,10 @@ impl TaskEstimator for ArcTaskEstimator {
) -> Option<Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
self.0.scale_up_leaf_node(plan, task_count, cfg)
}

fn route_tasks(&self, routing_ctx: &TaskRoutingContext<'_>) -> DFResult<Option<Vec<url::Url>>> {
self.0.route_tasks(routing_ctx)
}
}

#[cfg(test)]
Expand Down
Loading
Loading