feat(connectors): add Meilisearch source connector#3404
Conversation
Meilisearch write operations are task-based, so the connector uses the official Rust SDK for health checks, index creation, document writes, and task completion while preserving Iggy connector lifecycle semantics. The sink injects a stable iggy_id primary key when payloads omit the configured primary key, keeping repeated deliveries idempotent across stream/topic partitions. Constraint: User requested the official Meilisearch SDK instead of direct REST calls Constraint: Connector runtime tests live under core/integration/tests/connectors, not connector-local tests Rejected: Reuse the Elasticsearch bulk client shape directly | Meilisearch writes complete through asynchronous tasks rather than bulk item responses Rejected: Keep connector-local live tests | inconsistent with existing connector integration layout Confidence: high Scope-risk: moderate Directive: Keep Meilisearch task waiting enabled by default unless delivery semantics are intentionally relaxed Tested: cargo test -p iggy_connector_meilisearch_sink Tested: cargo clippy -p iggy_connector_meilisearch_sink --all-targets -- -D warnings Tested: cargo build -p iggy_connector_meilisearch_sink Tested: cargo test -p integration --test mod --no-run Tested: env CARGO_BIN_EXE_iggy-server=... CARGO_BIN_EXE_iggy-connectors=... cargo test -p integration --test mod -- meilisearch_sink_indexes_json_messages --nocapture Not-tested: Full unfiltered integration test suite
Meilisearch already had a sink connector, so this adds the matching source connector and test fixture structure to support bidirectional connector coverage. The source uses the official Meilisearch SDK for health checks and search polling, preserving the connector runtime's state-based offset persistence pattern. Constraint: Use the existing connector SDK Source lifecycle and persisted ConnectorState format Constraint: Use the official meilisearch-sdk rather than hand-written search HTTP calls Rejected: Keep Meilisearch source fixture code inside sink.rs | source and sink test setup should mirror the Elasticsearch container/source/sink split Confidence: high Scope-risk: moderate Directive: Keep Meilisearch source polling semantics tied to stable search ordering when indexes mutate during runtime Tested: cargo check -p iggy_connector_meilisearch_source Tested: cargo test -p iggy_connector_meilisearch_source Tested: cargo check -p integration Not-tested: Full Docker-backed Meilisearch integration test runtime
The PR branch needed to absorb upstream connector and workspace updates so GitHub can merge it cleanly. The resolution keeps upstream dependency bumps and connector list entries while preserving the new Meilisearch source connector workspace membership and documentation. Constraint: PR targets apache/iggy master and must be mergeable there Rejected: Rebase the branch | a merge commit preserves the already-published PR branch without rewriting history Confidence: high Scope-risk: moderate Directive: Keep upstream connector additions and Meilisearch source additions together in shared connector listings Tested: cargo check -p iggy_connector_meilisearch_source Tested: cargo test -p iggy_connector_meilisearch_source Tested: cargo check -p integration Tested: git diff --check Not-tested: Full Docker-backed connector integration suite
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3404 +/- ##
=============================================
- Coverage 74.67% 46.51% -28.16%
Complexity 943 943
=============================================
Files 1228 1245 +17
Lines 120529 106832 -13697
Branches 97266 83167 -14099
=============================================
- Hits 90002 49693 -40309
- Misses 27588 54436 +26848
+ Partials 2939 2703 -236
🚀 New features to boost your workflow:
|
The pre-merge TOML formatter expects long inline dependency feature lists to be expanded. Formatting the Meilisearch SDK dependency keeps the branch aligned with the existing CI taplo check after pulling the latest branch update. Constraint: Only the Meilisearch branch Cargo.toml formatting drift is in scope Confidence: high Scope-risk: narrow Tested: ./scripts/ci/taplo.sh --check Cargo.toml
Cargo refused to run locked checks because the workspace manifest resolution no longer matched Cargo.lock after the latest Meilisearch branch update. Regenerating the lockfile removes the stale duplicate untrusted 0.7.1 resolution and lets --locked checks run without mutating the lockfile. Constraint: Preserve the Meilisearch-only scope and update only the lockfile drift required by Cargo Confidence: high Scope-risk: narrow Tested: cargo check -p iggy_connector_meilisearch_source Tested: cargo check --locked -p iggy_connector_meilisearch_source
The integration harness starts the connectors runtime before the test body indexes Meilisearch documents. Treating a missing Meilisearch index as an empty poll keeps the source connector retryable during startup instead of surfacing a transient configuration error. The source fixture also disables metadata wrapping so the test receives the raw documents it asserts on. Constraint: Keep the fix scoped to Meilisearch source behavior and its test fixture Rejected: Sleeping in the test before starting the runtime | the harness owns runtime startup order Confidence: high Scope-risk: narrow Tested: cargo check --locked -p iggy_connector_meilisearch_source Tested: env CARGO_BIN_EXE_iggy-server=/Users/radudiaconu/Desktop/Code/Rust/iggy-meilisearch/target/debug/iggy-server CARGO_BIN_EXE_iggy-connectors=/Users/radudiaconu/Desktop/Code/Rust/iggy-meilisearch/target/debug/iggy-connectors cargo test -p integration --test mod connectors::meilisearch::meilisearch_source::meilisearch_source_produces_index_documents -- --nocapture
The previous Python BDD job failed before test execution because Docker Hub metadata requests timed out while resolving base images. This empty commit retriggers the PR checks without changing the Meilisearch connector diff. Constraint: No code changes are needed for an external Docker registry timeout Rejected: Modify BDD or Docker configuration | failure happened before tests and outside the branch scope Confidence: high Scope-risk: narrow Tested: Not run; empty commit for CI rerun only
|
/ready |
| pub fn new(id: u32, config: MeilisearchSinkConfig) -> Self { | ||
| let primary_key = config |
There was a problem hiding this comment.
this function makes a lot of assumptions about the config, some of which are encoded as consts and some of which do not have consts (create_index_if_not_exists, include_metadata, wait_for_tasks). I think it would be prudent to make these as consts as well, and document the default options as a rustdoc or in the readme for the user.
| fn map_sdk_error(error: MeilisearchSdkError) -> Error { | ||
| match error { | ||
| MeilisearchSdkError::Meilisearch(meilisearch_error) => { | ||
| if meilisearch_error.error_type == MeilisearchErrorType::Internal { | ||
| Error::HttpRequestFailed(meilisearch_error.to_string()) | ||
| } else { |
There was a problem hiding this comment.
A from trait can be used here to conform with idiomatic rust
| async fn wait_for_task( | ||
| &self, | ||
| client: &Client, | ||
| task: TaskInfo, | ||
| allow_index_already_exists: bool, |
There was a problem hiding this comment.
This function is supposed to be a generic helper function, but it is mixing feature-specific code with generic code by including the allow_index_already_exists flag. It would be better to let create_index handle the IndexAlreadyExists part.
| pub struct MeilisearchSink { | ||
| id: u32, | ||
| config: MeilisearchSinkConfig, | ||
| primary_key: String, | ||
| document_action: MeilisearchDocumentAction, | ||
| create_index_if_not_exists: bool, | ||
| include_metadata: bool, | ||
| batch_size: usize, | ||
| timeout: Duration, |
There was a problem hiding this comment.
This code carries both the raw config and a second normalized config. This creates two sources of truth inside the same type.
| fn normalize_host(raw: &str) -> Result<String, Error> { | ||
| let url = Url::parse(raw) | ||
| .map_err(|error| Error::Connection(format!("Invalid Meilisearch URL: {error}")))?; | ||
| let mut host = url.to_string(); | ||
| while host.ends_with('/') { | ||
| host.pop(); | ||
| } | ||
| Ok(host) | ||
| } |
There was a problem hiding this comment.
The normalize_host in the sink and the source are different. Any reason for that?
| async fn create_index(&self, client: &Client) -> Result<(), Error> { | ||
| info!( | ||
| "Creating Meilisearch index '{}' with primary key '{}'", | ||
| self.config.index, self.primary_key | ||
| ); | ||
|
|
||
| let task = self | ||
| .retry_sdk_operation("create index", || { | ||
| client.create_index(&self.config.index, Some(&self.primary_key)) | ||
| }) | ||
| .await?; | ||
| self.wait_for_task(client, task, true).await?; | ||
|
|
||
| info!("Created Meilisearch index '{}'", self.config.index); | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
The readme says that the wait_for_tasks only controls whether consume() waits for meilisearch tasks, but this function uses wait_for_tasks and immediately returns if the flag is false (the false case configuration is broken). This lets open() succeed before the index exists, so the first consume() can race and fail with IndexNotFound
| _ => { | ||
| warn!("Unsupported payload format: {}", messages_metadata.schema); | ||
| return None; |
There was a problem hiding this comment.
The current approach logs a warning before simply filtering out non-json/text messages. If the batch becomes empty, consume returns Ok(()) which is a silent data-loss path.
| if self.include_metadata { | ||
| object | ||
| .entry(DEFAULT_PRIMARY_KEY.to_string()) | ||
| .or_insert_with(|| Value::String(generated_id)); | ||
| object.insert("iggy_message_id".to_string(), Value::String(id.to_string())); | ||
| object.insert("iggy_offset".to_string(), Value::from(offset)); | ||
| object.insert( | ||
| "iggy_stream".to_string(), |
There was a problem hiding this comment.
These inserts unconditionally overwrite user fields. If those fields already exist, the document gets mutated on ingestion. Using entry(...).or_insert(...) would be a more prudent approach. Logging a warning would also be good.
| fn generated_document_id( | ||
| topic_metadata: &TopicMetadata, | ||
| messages_metadata: &MessagesMetadata, | ||
| message: &ConsumedMessage, | ||
| ) -> String { | ||
| format!( | ||
| "{}_{}_{}_{}_{}", | ||
| sanitize_identifier_component(&topic_metadata.stream), | ||
| sanitize_identifier_component(&topic_metadata.topic), | ||
| messages_metadata.partition_id, | ||
| message.offset, | ||
| message.id | ||
| ) | ||
| } |
There was a problem hiding this comment.
sanitize_identifier_component would convert both orders.stream and orders/stream to orders_stream. If both also have the same topic after sanitization, same partition, same offset, and same message ID, they produce the same Meilisearch document ID, resulting in loss of data.
The sink carried duplicated raw and normalized config, treated index creation like an optional consume-time task wait, silently ignored unsupported payload schemas, overwrote user metadata fields, and generated IDs through lossy sanitization. Resolving config once, waiting for index creation independently, preserving user fields, and encoding exact ID components remove the review risks without changing unrelated connectors. Constraint: Keep the change scoped to Meilisearch sink review feedback Rejected: Add shared Meilisearch utility crate | source and sink only need URL normalization alignment for this PR Rejected: Implement From<MeilisearchSdkError> for iggy_connector_sdk::Error | Rust orphan rules forbid implementing a foreign trait for a foreign type Confidence: high Scope-risk: narrow Tested: cargo test -p iggy_connector_meilisearch_sink Tested: cargo check --locked -p iggy_connector_meilisearch_sink Tested: git diff --check Not-tested: Meilisearch sink integration test could not start locally because Docker client failed to connect while listing networks
Summary
meilisearch-sdkcontainer.rs,sink.rs, andsource.rs, then add source runtime coverageValidation
cargo check -p iggy_connector_meilisearch_sourcecargo test -p iggy_connector_meilisearch_sourcecargo check -p integrationcargo clippyhook passedNotes