Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
51eb0ff
feat(contrib): add ContribOp proto envelope + Rust planner registry SPI
schenksj May 14, 2026
f448693
feat(contrib): wire OpStruct::ContribOp dispatcher in native planner
schenksj May 14, 2026
f23500d
feat(contrib): add Scala extension SPI under org.apache.comet.spi
schenksj May 14, 2026
42234b9
feat(contrib): wire CometScanRule + CometExecRule to SPI registry
schenksj May 14, 2026
8b69471
feat(contrib): call CometExtensionRegistry.load() at extension install
schenksj May 14, 2026
d1553b5
feat(contrib): SPI crate split + worked example contrib (PR1.7 part 1)
schenksj May 14, 2026
5cb7099
feat(contrib): JVM half of contrib/example reference module
schenksj May 14, 2026
8508ec5
docs(contrib): add contributor guide for authoring contribs
schenksj May 14, 2026
e018076
feat(contrib): SPI refinements from Delta-port confidence check
schenksj May 14, 2026
14e4944
feat(contrib): ContribPlannerContext + ParquetDatasourceParams (SPI g…
schenksj May 14, 2026
8930b69
feat(contrib): review-fix pass (B1-B6, I1-I10, nits, doc updates)
schenksj May 14, 2026
68fff43
feat(contrib): second-pass review fixes (R1-R7, N-NEW-1/2/7)
schenksj May 14, 2026
e4e6e6c
feat(contrib): third-pass review fixes (R-NEW-1/2, N1-N8)
schenksj May 14, 2026
6652963
feat(contrib): fourth-pass polish (F1-F6)
schenksj May 14, 2026
91c40e0
docs(contrib): comprehensive contributor-guide rewrite
schenksj May 14, 2026
2c46552
docs(contrib): second-pass review fixes for contributor guide
schenksj May 14, 2026
cf5253e
refactor(contrib): bundle JVM half into comet-spark, matching native …
schenksj May 15, 2026
c7656fc
refactor(contrib): deps-only pom per contrib + ArcSwap registry
schenksj May 15, 2026
a2ac715
docs(contrib): rewrite PR1-description.md to reflect current architec…
schenksj May 15, 2026
e0e4151
Merge branch 'main' into comet-contrib-spi
schenksj May 15, 2026
29f685c
chore: gitignore PR1-description.md and untrack from history-forward
schenksj May 15, 2026
525b980
feat(contrib): SPI surface additions for contribs that need core helpers
schenksj May 15, 2026
e417211
feat(contrib): partition-metadata SPI hook + matchOperator marker dis…
schenksj May 15, 2026
35f1b3b
revert: matchOperator dispatch for contrib markers
schenksj May 15, 2026
4ee7102
feat(contrib): restore matchOperator dispatch for contrib markers
schenksj May 15, 2026
04b48c2
feat(contrib): make PlanDataInjector contrib-registrable
schenksj May 16, 2026
f08ac86
fix(contrib): treat any leaf CometNativeExec as a foreachUntilCometIn…
schenksj May 16, 2026
272ada1
feat(contrib): PlanDataSource trait for contrib leaf scans
schenksj May 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
CLAUDE.md
# PR1-description.md is a working artifact used when manually opening PR1; not committed.
PR1-description.md
target
.idea
*.iml
Expand All @@ -10,6 +12,7 @@ metastore_db/
spark-warehouse/
dependency-reduced-pom.xml
native/proto/src/generated
contrib/example/native/src/generated
prebuild
.flattened-pom.xml
rat.txt
Expand Down
55 changes: 55 additions & 0 deletions contrib/example/native/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "comet-contrib-example"
description = "Worked reference implementation of a Comet contrib extension. Not published; bundled as a SPI dispatch test fixture."
# Contrib crates live OUTSIDE the workspace root directory (`native/`) but are listed as
# workspace members in `native/Cargo.toml`. Cargo's auto-discovery walks up the directory
# tree, so without the explicit pointer it can't find `native/Cargo.toml` from
# `contrib/example/native/`.
workspace = "../../../native"
version = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
edition = { workspace = true }

[lib]
# rlib (not cdylib): linked INTO core's cdylib via the `contrib-example` Cargo feature
# flag on the core crate. There is exactly one libcomet.{so,dylib,dll} at runtime; the
# contrib's #[ctor] runs during that single library's init.
crate-type = ["rlib"]

[dependencies]
# Depend on the thin SPI crate, NOT on core. This is what breaks the cycle: core
# depends on contribs (Cargo feature -> rlib link); both depend on contrib-spi; nothing
# depends back on core from a contrib.
comet-contrib-spi = { path = "../../../native/contrib-spi" }
datafusion = { workspace = true }
# Used only in unit tests to construct a TestCtx that implements ContribPlannerContext;
# kept in [dependencies] (not [dev-dependencies]) because the trait's typed methods take
# spark_expression / spark_operator proto refs and the impl module is not test-gated.
datafusion-comet-proto = { workspace = true }
prost = "0.14.3"
ctor = "0.4"
log = "0.4"

# Each contrib runs its own prost-build over its own .proto files (see build.rs).
[build-dependencies]
prost-build = "0.14.3"
39 changes: 39 additions & 0 deletions contrib/example/native/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Build script for the example contrib's proto. Mirrors `native/proto/build.rs`.
//!
//! Each contrib runs its own `prost-build` invocation against its own `.proto` files.
//! This keeps core's proto crate format-agnostic and lets contribs evolve their wire
//! format independently. The generated Rust types live under `src/generated/` and are
//! gitignored.

use std::{fs, io::Result, path::Path};

fn main() -> Result<()> {
println!("cargo:rerun-if-changed=src/proto/");

let out_dir = "src/generated";
if !Path::new(out_dir).is_dir() {
fs::create_dir(out_dir)?;
}

prost_build::Config::new()
.out_dir(out_dir)
.compile_protos(&["src/proto/example_op.proto"], &["src/proto"])?;
Ok(())
}
240 changes: 240 additions & 0 deletions contrib/example/native/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Worked reference implementation of a Comet contrib extension.
//!
//! Demonstrates two patterns future contribs will follow:
//!
//! 1. **Dispatch wiring** -- registers a `ContribOperatorPlanner` against a stable
//! `kind` string at lib-init time via `#[ctor::ctor]`. The planner is called from
//! core's `OpStruct::ContribOp` dispatcher with the contrib's payload bytes.
//!
//! 2. **Proto layer** -- the contrib has its own `proto/` directory with its own
//! `.proto` schema (`example_op.proto`). `build.rs` runs `prost-build` over it;
//! generated Rust types live under `src/generated/` (gitignored). The planner
//! decodes the payload via `prost::Message::decode` -- the same way real contribs
//! (Delta etc.) will.
//!
//! Two planner kinds are registered:
//!
//! * `example-no-op` -- returns a sentinel error. Tests use this to verify
//! the dispatch chain end-to-end.
//! * `example-constant-scan` -- decodes an `ExampleConstantScan` payload, returns
//! an `EmptyExec` sized by the payload's `row_count`.
//! Real contribs (Delta) follow the same pattern,
//! just with their own message and operator.
//!
//! The whole crate is gated by `native/core/Cargo.toml`'s `contrib-example` feature flag.
//! Build core without that feature (`cargo build --no-default-features`) and zero bytes
//! of this crate end up in `libcomet`.

use std::sync::Arc;

use comet_contrib_spi::{
register_contrib_planner, ContribError, ContribOperatorPlanner, ContribPlannerContext,
};
use datafusion::arrow::datatypes::Schema;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::ExecutionPlan;
use prost::Message;

/// Generated Rust types for the contrib's proto schema. `build.rs` writes the module
/// here at compile time; `src/generated/` is gitignored.
pub mod proto {
include!(concat!("generated/", "comet.contrib.example.rs"));
}

/// Sentinel kind used by tests to verify dispatch reaches this contrib at all.
pub const EXAMPLE_NO_OP_KIND: &str = "example-no-op";

/// Kind for the proto-decoding constant-scan planner. Demonstrates the
/// proto-decode-and-build path real contribs will use.
pub const EXAMPLE_CONSTANT_SCAN_KIND: &str = "example-constant-scan";

/// A planner that intentionally does no plan-building work. Returns a sentinel error so
/// dispatch tests can assert the message reaches this code path. The payload is ignored;
/// children are ignored.
struct NoOpPlanner;

impl ContribOperatorPlanner for NoOpPlanner {
fn plan(
&self,
_ctx: &dyn ContribPlannerContext,
_payload: &[u8],
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>, ContribError> {
Err(ContribError::Plan(format!(
"comet-contrib-example: NoOpPlanner reached for kind={EXAMPLE_NO_OP_KIND:?}; \
this is the expected sentinel for SPI dispatch tests"
)))
}
}

/// Decodes the payload as an `ExampleConstantScan` proto and returns an `EmptyExec`
/// with a schema-less output. Real contribs use the same decode-then-build pattern --
/// they just decode richer messages and return richer execs.
struct ConstantScanPlanner;

impl ContribOperatorPlanner for ConstantScanPlanner {
fn plan(
&self,
_ctx: &dyn ContribPlannerContext,
payload: &[u8],
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>, ContribError> {
let msg = proto::ExampleConstantScan::decode(payload).map_err(|e| {
ContribError::BadPayload(format!(
"ExampleConstantScan: decode failed: {e}"
))
})?;
log::debug!(
"comet-contrib-example: ConstantScanPlanner produces {} synthetic rows",
msg.row_count
);
// For the worked example we don't actually populate rows -- EmptyExec is fine to
// demonstrate the build path. Real contribs return their domain-specific exec
// (Delta returns the file scan + DV filter wrap).
Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))))
}
}

/// Registers all of the example contrib's planners against the contrib registry at
/// library-init time. `#[ctor::ctor]` runs this constructor before `main`/`JNI_OnLoad`.
/// Comet's `libcomet` cdylib is the single library the JVM loads; this constructor runs
/// during that one library's init.
///
/// # Panic safety
///
/// The body is wrapped in `catch_unwind` and writes to stderr on failure. A panic inside
/// `#[ctor]` aborts the entire JVM process before `JNI_OnLoad` runs and produces no
/// diagnostic on macOS/Linux without this wrapper. Every contrib's `#[ctor]` should
/// follow the same pattern; see `docs/source/contributor-guide/contrib-extensions.md`.
///
/// # Logging
///
/// `log::*!` macros inside `#[ctor]` are no-ops because Comet's logger is initialised
/// later, in `Java_org_apache_comet_NativeBase_init`. Use `eprintln!` (or nothing) for
/// any ctor diagnostics that must be visible.
#[ctor::ctor]
fn register() {
let _ = std::panic::catch_unwind(|| {
register_contrib_planner(EXAMPLE_NO_OP_KIND, Arc::new(NoOpPlanner));
register_contrib_planner(EXAMPLE_CONSTANT_SCAN_KIND, Arc::new(ConstantScanPlanner));
})
.map_err(|panic| {
eprintln!(
"comet-contrib-example: #[ctor] panicked during planner registration; \
contrib will not be available. panic={panic:?}"
);
});
}

#[cfg(test)]
mod tests {
use super::*;
use comet_contrib_spi::{lookup_contrib_planner_by_kind, ParquetDatasourceParams};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::execution::context::SessionContext;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::physical_expr::PhysicalExpr;
use datafusion_comet_proto::{spark_expression, spark_operator};
use std::collections::HashMap;

/// Minimal `ContribPlannerContext` for unit-testing contrib planners that don't
/// actually need to build a parquet exec. All methods that the tests don't exercise
/// panic if invoked.
struct TestCtx {
ctx: Arc<SessionContext>,
}
impl ContribPlannerContext for TestCtx {
fn session_ctx(&self) -> &Arc<SessionContext> {
&self.ctx
}
fn build_physical_expr(
&self,
_expr: &spark_expression::Expr,
_input_schema: SchemaRef,
) -> Result<Arc<dyn PhysicalExpr>, ContribError> {
unimplemented!("TestCtx: build_physical_expr not used by this test")
}
fn convert_spark_schema(
&self,
_fields: &[spark_operator::SparkStructField],
) -> SchemaRef {
unimplemented!("TestCtx: convert_spark_schema not used by this test")
}
fn prepare_object_store(
&self,
_url: String,
_configs: &HashMap<String, String>,
) -> Result<(ObjectStoreUrl, datafusion::object_store::path::Path), ContribError> {
unimplemented!("TestCtx: prepare_object_store not used by this test")
}
fn build_parquet_datasource_exec(
&self,
_params: ParquetDatasourceParams,
) -> Result<Arc<dyn ExecutionPlan>, ContribError> {
unimplemented!("TestCtx: build_parquet_datasource_exec not used by this test")
}
}

fn test_ctx() -> TestCtx {
TestCtx {
ctx: Arc::new(SessionContext::new()),
}
}

#[test]
fn ctor_registers_both_planners() {
// The #[ctor] above runs at process-init time for test binaries too.
assert!(lookup_contrib_planner_by_kind(EXAMPLE_NO_OP_KIND).is_some());
assert!(lookup_contrib_planner_by_kind(EXAMPLE_CONSTANT_SCAN_KIND).is_some());
}

#[test]
fn constant_scan_decodes_payload_and_builds() {
let payload = proto::ExampleConstantScan { row_count: 42 }.encode_to_vec();
let planner = ConstantScanPlanner;
let ctx = test_ctx();
let plan = planner.plan(&ctx, &payload, vec![]).expect("decode + build");
assert!(plan.schema().fields().is_empty());
}

#[test]
fn constant_scan_handles_zero_rows() {
// Worked-example coverage: row_count = 0 must not be a special case.
let payload = proto::ExampleConstantScan { row_count: 0 }.encode_to_vec();
let planner = ConstantScanPlanner;
let ctx = test_ctx();
let plan = planner.plan(&ctx, &payload, vec![]).expect("decode + build");
assert!(plan.schema().fields().is_empty());
}

#[test]
fn constant_scan_surfaces_bad_payload() {
let planner = ConstantScanPlanner;
let ctx = test_ctx();
let bad = b"not a valid proto";
let err = planner
.plan(&ctx, bad, vec![])
.expect_err("garbage should fail decode");
match err {
ContribError::BadPayload(_) => {}
other => panic!("expected BadPayload, got {other:?}"),
}
}
}
35 changes: 35 additions & 0 deletions contrib/example/native/src/proto/example_op.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

syntax = "proto3";

// Contrib-private proto package. Each contrib's proto messages live under their own
// package so symbols never collide with core or with other contribs.
package comet.contrib.example;

// Trivial reference message used by the worked-example contrib. A real contrib's proto
// carries whatever fields its native operator needs (file paths, predicates, schemas,
// deletion vectors, etc.).
//
// The contrib's Scala side fills this message and serializes it into the
// `ContribOp.payload` bytes; the contrib's Rust side decodes the bytes back into this
// struct in its `ContribOperatorPlanner::plan`.
message ExampleConstantScan {
// Number of rows the synthetic constant scan should emit. Bounded by the contrib's
// planner -- this is a test reference, not a useful operator.
uint32 row_count = 1;
}
Loading