diff --git a/janusgraph-cdc/README.md b/janusgraph-cdc/README.md
new file mode 100644
index 0000000000..2d806d3f48
--- /dev/null
+++ b/janusgraph-cdc/README.md
@@ -0,0 +1,115 @@
+# JanusGraph CDC Module
+
+This module provides Change Data Capture (CDC) support for JanusGraph mixed index mutations to ensure eventual consistency between the storage backend (Cassandra) and mixed indexes (ElasticSearch, Solr, etc.).
+
+## Overview
+
+When using JanusGraph with external index backends, there's a risk of inconsistency if the index update fails while the graph data is successfully written to the storage backend. CDC addresses this by:
+
+1. Publishing all mixed index mutations to a Kafka topic
+2. Running separate CDC workers that consume these events and apply them to the index
+3. Ensuring eventual consistency even in failure scenarios
+
+## Architecture
+
+### Components
+
+1. **CdcProducer**: Publishes index mutation events to Kafka
+2. **CdcWorker**: Consumes events from Kafka and applies them to indexes
+3. **CdcIndexTransaction**: Wraps IndexTransaction to capture mutations
+4. **CdcConfiguration**: Manages CDC settings
+
+### CDC Modes
+
+- **dual** (default): Write to index during transaction AND publish to CDC topic for redundancy
+- **skip**: Skip index writes during transaction, rely entirely on CDC
+- **cdc-only**: Alias for skip mode (for backward compatibility)
+
+## Configuration
+
+Add the following to your JanusGraph configuration:
+
+```properties
+# Enable CDC for mixed indexes
+index.search.cdc.enabled=true
+
+# CDC mode (dual, skip, or cdc-only)
+index.search.cdc.mode=dual
+
+# Kafka bootstrap servers
+index.search.cdc.kafka-bootstrap-servers=localhost:9092
+
+# Kafka topic for CDC events
+index.search.cdc.kafka-topic=janusgraph-cdc-index-mutations
+```
+
+## Usage
+
+### Running CDC Workers
+
+CDC workers should be run as separate processes:
+
+```java
+// Create index provider and retriever
+IndexProvider indexProvider = ...;
+KeyInformation.IndexRetriever indexRetriever = ...;
+
+// Create and start CDC worker
+CdcWorker worker = new CdcWorker(
+ "localhost:9092", // Kafka bootstrap servers
+ "janusgraph-cdc-index-mutations", // Topic name
+ "janusgraph-cdc-group", // Consumer group ID
+ indexProvider,
+ indexRetriever
+);
+
+worker.start();
+
+// Keep worker running...
+
+// Shutdown gracefully
+worker.stop();
+worker.close();
+```
+
+### Integration with JanusGraph
+
+CDC can be integrated programmatically using the CdcIndexTransactionFactory:
+
+```java
+Configuration config = ...; // Your JanusGraph configuration
+CdcIndexTransactionFactory cdcFactory = new CdcIndexTransactionFactory(config);
+
+// Wrap index transactions with CDC support
+IndexTransaction indexTx = ...; // Original index transaction
+IndexTransaction wrappedTx = cdcFactory.wrapIfEnabled(indexTx);
+
+// Use wrappedTx as normal
+// Mutations will be captured and published to Kafka
+```
+
+## Benefits
+
+1. **Eventual Consistency**: Guarantees that index and storage backend will eventually be consistent
+2. **Failure Recovery**: Automatic recovery from index update failures
+3. **Operational Flexibility**: CDC workers can be scaled independently
+4. **Minimal Performance Impact**: Asynchronous processing offloads index updates
+
+## Dependencies
+
+- Apache Kafka 3.6.1+
+- Debezium Core 2.5.0+
+- Jackson for JSON serialization
+
+## Limitations
+
+- Requires Kafka infrastructure
+- Eventual consistency means slight delay in index updates
+- CDC workers must be managed separately from JanusGraph instances
+
+## Future Enhancements
+
+- Automatic integration with Backend
+- Support for other message brokers (RabbitMQ, etc.)
+- Built-in CDC worker management
+- Metrics and monitoring integration
diff --git a/janusgraph-cdc/pom.xml b/janusgraph-cdc/pom.xml
new file mode 100644
index 0000000000..24d2ce91de
--- /dev/null
+++ b/janusgraph-cdc/pom.xml
@@ -0,0 +1,120 @@
+
+
+ 4.0.0
+
+
+ org.janusgraph
+ janusgraph
+ 1.2.0-SNAPSHOT
+
+
+ janusgraph-cdc
+ JanusGraph-CDC: Change Data Capture Support
+ https://janusgraph.org
+
+
+ ${basedir}/..
+ 3.6.1
+ 2.5.0.Final
+
+
+
+
+
+ org.janusgraph
+ janusgraph-core
+ ${project.version}
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+
+
+
+
+ io.debezium
+ debezium-core
+ ${debezium.version}
+
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ org.janusgraph
+ janusgraph-backend-testutils
+ ${project.version}
+ test
+
+
+ org.janusgraph
+ janusgraph-cql
+ ${project.version}
+ test
+
+
+ org.janusgraph
+ janusgraph-es
+ ${project.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ test
+
+
+ org.testcontainers
+ kafka
+ test
+
+
+ org.testcontainers
+ cassandra
+ test
+
+
+ org.testcontainers
+ elasticsearch
+ test
+
+
+ ch.qos.logback
+ logback-classic
+ test
+
+
+
+
+
+
+ maven-surefire-plugin
+
+
+
+
diff --git a/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcConfiguration.java b/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcConfiguration.java
new file mode 100644
index 0000000000..2ae54cbe40
--- /dev/null
+++ b/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcConfiguration.java
@@ -0,0 +1,98 @@
+// Copyright 2025 JanusGraph Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.janusgraph.diskstorage.cdc;
+
+import org.janusgraph.diskstorage.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_CDC_ENABLED;
+import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_CDC_KAFKA_BOOTSTRAP_SERVERS;
+import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_CDC_KAFKA_TOPIC;
+import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_CDC_MODE;
+
+/**
+ * Configuration holder for CDC settings.
+ */
+public class CdcConfiguration {
+
+ private static final Logger log = LoggerFactory.getLogger(CdcConfiguration.class);
+
+ private final boolean enabled;
+ private final CdcIndexTransaction.CdcMode mode;
+ private final String kafkaBootstrapServers;
+ private final String kafkaTopic;
+
+ public CdcConfiguration(Configuration config) {
+ this.enabled = config.get(INDEX_CDC_ENABLED);
+
+ if (enabled) {
+ this.mode = parseCdcMode(config.get(INDEX_CDC_MODE));
+ this.kafkaBootstrapServers = config.get(INDEX_CDC_KAFKA_BOOTSTRAP_SERVERS);
+ this.kafkaTopic = config.get(INDEX_CDC_KAFKA_TOPIC);
+ validate();
+ log.info("CDC enabled with mode: {}, topic: {}, bootstrap servers: {}",
+ mode, kafkaTopic, kafkaBootstrapServers);
+ } else {
+ // When disabled, these values are not used
+ this.mode = null;
+ this.kafkaBootstrapServers = null;
+ this.kafkaTopic = null;
+ }
+ }
+
+ private CdcIndexTransaction.CdcMode parseCdcMode(String modeStr) {
+ if (modeStr == null || modeStr.isEmpty()) {
+ return CdcIndexTransaction.CdcMode.DUAL;
+ }
+
+ switch (modeStr.toLowerCase()) {
+ case "skip":
+ return CdcIndexTransaction.CdcMode.SKIP;
+ case "dual":
+ return CdcIndexTransaction.CdcMode.DUAL;
+ case "cdc-only":
+ return CdcIndexTransaction.CdcMode.CDC_ONLY;
+ default:
+ log.warn("Unknown CDC mode: {}, defaulting to DUAL", modeStr);
+ return CdcIndexTransaction.CdcMode.DUAL;
+ }
+ }
+
+ private void validate() {
+ if (kafkaBootstrapServers == null || kafkaBootstrapServers.isEmpty()) {
+ throw new IllegalArgumentException("CDC is enabled but kafka bootstrap servers are not configured");
+ }
+ if (kafkaTopic == null || kafkaTopic.isEmpty()) {
+ throw new IllegalArgumentException("CDC is enabled but kafka topic is not configured");
+ }
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public CdcIndexTransaction.CdcMode getMode() {
+ return mode;
+ }
+
+ public String getKafkaBootstrapServers() {
+ return kafkaBootstrapServers;
+ }
+
+ public String getKafkaTopic() {
+ return kafkaTopic;
+ }
+}
diff --git a/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcIndexTransaction.java b/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcIndexTransaction.java
new file mode 100644
index 0000000000..0c168a0740
--- /dev/null
+++ b/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcIndexTransaction.java
@@ -0,0 +1,215 @@
+// Copyright 2025 JanusGraph Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.janusgraph.diskstorage.cdc;
+
+import org.janusgraph.diskstorage.BackendException;
+import org.janusgraph.diskstorage.indexing.IndexEntry;
+import org.janusgraph.diskstorage.indexing.IndexQuery;
+import org.janusgraph.diskstorage.indexing.IndexTransaction;
+import org.janusgraph.diskstorage.indexing.KeyInformation;
+import org.janusgraph.diskstorage.indexing.RawQuery;
+import org.janusgraph.graphdb.database.serialize.DataOutput;
+import org.janusgraph.graphdb.tinkerpop.optimize.step.Aggregation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+/**
+ * Wrapper around IndexTransaction that captures mutations and publishes them as CDC events.
+ * Supports different CDC modes: skip, dual, or cdc-only.
+ */
+public class CdcIndexTransaction {
+
+ private static final Logger log = LoggerFactory.getLogger(CdcIndexTransaction.class);
+
+ public enum CdcMode {
+ SKIP, // Skip mutations during transaction, rely entirely on CDC
+ DUAL, // Write during transaction AND via CDC for consistency
+ CDC_ONLY // Only via CDC (deprecated, same as SKIP for now)
+ }
+
+ private final IndexTransaction delegate;
+ private final CdcProducer cdcProducer;
+ private final CdcMode cdcMode;
+ private final Map> pendingMutations;
+
+ private static class MutationAccumulator {
+ final List additions = new ArrayList<>();
+ final List deletions = new ArrayList<>();
+ boolean isNew;
+ boolean isDeleted;
+ final long timestamp;
+
+ MutationAccumulator(boolean isNew, boolean isDeleted) {
+ this.isNew = isNew;
+ this.isDeleted = isDeleted;
+ this.timestamp = System.currentTimeMillis();
+ }
+ }
+
+ public CdcIndexTransaction(IndexTransaction delegate, CdcProducer cdcProducer, CdcMode cdcMode) {
+ this.delegate = delegate;
+ this.cdcProducer = cdcProducer;
+ this.cdcMode = cdcMode;
+ this.pendingMutations = new HashMap<>();
+ log.debug("Created CdcIndexTransaction with mode: {}", cdcMode);
+ }
+
+ public void add(String store, String documentId, IndexEntry entry, boolean isNew) {
+ // Accumulate mutations for CDC
+ accumulateAddition(store, documentId, entry, isNew);
+
+ // Forward to delegate based on mode
+ if (cdcMode != CdcMode.SKIP && cdcMode != CdcMode.CDC_ONLY) {
+ delegate.add(store, documentId, entry, isNew);
+ }
+ }
+
+ public void add(String store, String documentId, String key, Object value, boolean isNew) {
+ // Accumulate mutations for CDC
+ accumulateAddition(store, documentId, new IndexEntry(key, value), isNew);
+
+ // Forward to delegate based on mode
+ if (cdcMode != CdcMode.SKIP && cdcMode != CdcMode.CDC_ONLY) {
+ delegate.add(store, documentId, key, value, isNew);
+ }
+ }
+
+ public void delete(String store, String documentId, String key, Object value, boolean deleteAll) {
+ // Accumulate mutations for CDC
+ accumulateDeletion(store, documentId, new IndexEntry(key, value), deleteAll);
+
+ // Forward to delegate based on mode
+ if (cdcMode != CdcMode.SKIP && cdcMode != CdcMode.CDC_ONLY) {
+ delegate.delete(store, documentId, key, value, deleteAll);
+ }
+ }
+
+ private void accumulateAddition(String store, String documentId, IndexEntry entry, boolean isNew) {
+ MutationAccumulator accumulator = getOrCreateAccumulator(store, documentId, isNew, false);
+ accumulator.additions.add(entry);
+ }
+
+ private void accumulateDeletion(String store, String documentId, IndexEntry entry, boolean deleteAll) {
+ MutationAccumulator accumulator = getOrCreateAccumulator(store, documentId, false, deleteAll);
+ accumulator.deletions.add(entry);
+ }
+
+ private MutationAccumulator getOrCreateAccumulator(String store, String documentId,
+ boolean isNew, boolean isDeleted) {
+ Map storeAccumulator =
+ pendingMutations.computeIfAbsent(store, k -> new HashMap<>());
+
+ MutationAccumulator accumulator = storeAccumulator.get(documentId);
+ if (accumulator == null) {
+ accumulator = new MutationAccumulator(isNew, isDeleted);
+ storeAccumulator.put(documentId, accumulator);
+ } else {
+ // Update flags if needed
+ if (isNew) accumulator.isNew = true;
+ if (isDeleted) accumulator.isDeleted = true;
+ }
+ return accumulator;
+ }
+
+ public void commit() throws BackendException {
+ // Publish CDC events first
+ publishCdcEvents();
+
+ // Then commit the delegate transaction
+ delegate.commit();
+ }
+
+ private void publishCdcEvents() throws BackendException {
+ if (pendingMutations.isEmpty()) {
+ return;
+ }
+
+ for (Map.Entry> storeEntry : pendingMutations.entrySet()) {
+ String storeName = storeEntry.getKey();
+ for (Map.Entry docEntry : storeEntry.getValue().entrySet()) {
+ String documentId = docEntry.getKey();
+ MutationAccumulator accumulator = docEntry.getValue();
+
+ CdcMutationEvent event = new CdcMutationEvent(
+ storeName,
+ documentId,
+ accumulator.additions,
+ accumulator.deletions,
+ accumulator.isNew,
+ accumulator.isDeleted,
+ accumulator.timestamp
+ );
+
+ cdcProducer.send(event);
+ }
+ }
+
+ cdcProducer.flush();
+ pendingMutations.clear();
+ }
+
+ public void rollback() throws BackendException {
+ pendingMutations.clear();
+ delegate.rollback();
+ }
+
+ // Delegate all other methods to the underlying transaction
+
+ public void clearStorage() throws BackendException {
+ delegate.clearStorage();
+ }
+
+ public void clearStore(String storeName) throws BackendException {
+ delegate.clearStore(storeName);
+ }
+
+ public void register(String store, String key, KeyInformation information) throws BackendException {
+ delegate.register(store, key, information);
+ }
+
+ public Stream queryStream(IndexQuery query) throws BackendException {
+ return delegate.queryStream(query);
+ }
+
+ public Number queryAggregation(IndexQuery query, Aggregation aggregation) throws BackendException {
+ return delegate.queryAggregation(query, aggregation);
+ }
+
+ public Stream> queryStream(RawQuery query) throws BackendException {
+ return delegate.queryStream(query);
+ }
+
+ public Long totals(RawQuery query) throws BackendException {
+ return delegate.totals(query);
+ }
+
+ public void restore(Map>> documents) throws BackendException {
+ delegate.restore(documents);
+ }
+
+ public void logMutations(DataOutput out) {
+ delegate.logMutations(out);
+ }
+
+ public void invalidate(String store) {
+ delegate.invalidate(store);
+ }
+}
diff --git a/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcIndexTransactionFactory.java b/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcIndexTransactionFactory.java
new file mode 100644
index 0000000000..339138c3ea
--- /dev/null
+++ b/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcIndexTransactionFactory.java
@@ -0,0 +1,76 @@
+// Copyright 2025 JanusGraph Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.janusgraph.diskstorage.cdc;
+
+import org.janusgraph.diskstorage.configuration.Configuration;
+import org.janusgraph.diskstorage.indexing.IndexTransaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory for creating CDC-enabled index transactions.
+ * Manages the lifecycle of CDC producers and provides wrapped index transactions.
+ */
+public class CdcIndexTransactionFactory implements AutoCloseable {
+
+ private static final Logger log = LoggerFactory.getLogger(CdcIndexTransactionFactory.class);
+
+ private final CdcConfiguration cdcConfig;
+ private final CdcProducer cdcProducer;
+
+ public CdcIndexTransactionFactory(Configuration configuration) {
+ this.cdcConfig = new CdcConfiguration(configuration);
+
+ if (cdcConfig.isEnabled()) {
+ this.cdcProducer = new KafkaCdcProducer(
+ cdcConfig.getKafkaBootstrapServers(),
+ cdcConfig.getKafkaTopic()
+ );
+ log.info("CDC Index Transaction Factory initialized");
+ } else {
+ this.cdcProducer = null;
+ }
+ }
+
+ /**
+ * Wrap an IndexTransaction with CDC support if enabled.
+ *
+ * @param indexTransaction The base index transaction to wrap
+ * @return The wrapped transaction if CDC is enabled, otherwise null (caller should use original)
+ */
+ public CdcIndexTransaction wrapIfEnabled(IndexTransaction indexTransaction) {
+ if (cdcConfig.isEnabled() && cdcProducer != null) {
+ return new CdcIndexTransaction(indexTransaction, cdcProducer, cdcConfig.getMode());
+ }
+ return null;
+ }
+
+ /**
+ * Check if CDC is enabled.
+ *
+ * @return true if CDC is enabled
+ */
+ public boolean isEnabled() {
+ return cdcConfig.isEnabled();
+ }
+
+ @Override
+ public void close() {
+ if (cdcProducer != null) {
+ cdcProducer.close();
+ log.info("CDC Index Transaction Factory closed");
+ }
+ }
+}
diff --git a/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcMutationEvent.java b/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcMutationEvent.java
new file mode 100644
index 0000000000..60746b1ed2
--- /dev/null
+++ b/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcMutationEvent.java
@@ -0,0 +1,136 @@
+// Copyright 2025 JanusGraph Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.janusgraph.diskstorage.cdc;
+
+import org.janusgraph.diskstorage.indexing.IndexEntry;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Represents a CDC event for a mixed index mutation.
+ * This event captures the changes made to a specific document in a mixed index store.
+ */
+public class CdcMutationEvent implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String storeName;
+ private final String documentId;
+ private final List additions;
+ private final List deletions;
+ private final boolean isNew;
+ private final boolean isDeleted;
+ private final long timestamp;
+ private final MutationType mutationType;
+
+ public enum MutationType {
+ ADDED,
+ UPDATED,
+ DELETED
+ }
+
+ public CdcMutationEvent(String storeName, String documentId,
+ List additions, List deletions,
+ boolean isNew, boolean isDeleted,
+ long timestamp) {
+ this.storeName = storeName;
+ this.documentId = documentId;
+ this.additions = additions;
+ this.deletions = deletions;
+ this.isNew = isNew;
+ this.isDeleted = isDeleted;
+ this.timestamp = timestamp;
+ this.mutationType = determineMutationType(isNew, isDeleted, additions, deletions);
+ }
+
+ private static MutationType determineMutationType(boolean isNew, boolean isDeleted,
+ List additions, List deletions) {
+ if (isDeleted) {
+ return MutationType.DELETED;
+ } else if (isNew) {
+ return MutationType.ADDED;
+ } else {
+ return MutationType.UPDATED;
+ }
+ }
+
+ public String getStoreName() {
+ return storeName;
+ }
+
+ public String getDocumentId() {
+ return documentId;
+ }
+
+ public List getAdditions() {
+ return additions;
+ }
+
+ public List getDeletions() {
+ return deletions;
+ }
+
+ public boolean isNew() {
+ return isNew;
+ }
+
+ public boolean isDeleted() {
+ return isDeleted;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public MutationType getMutationType() {
+ return mutationType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CdcMutationEvent that = (CdcMutationEvent) o;
+ return isNew == that.isNew &&
+ isDeleted == that.isDeleted &&
+ timestamp == that.timestamp &&
+ Objects.equals(storeName, that.storeName) &&
+ Objects.equals(documentId, that.documentId) &&
+ Objects.equals(additions, that.additions) &&
+ Objects.equals(deletions, that.deletions) &&
+ mutationType == that.mutationType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(storeName, documentId, additions, deletions, isNew, isDeleted, timestamp, mutationType);
+ }
+
+ @Override
+ public String toString() {
+ return "CdcMutationEvent{" +
+ "storeName='" + storeName + '\'' +
+ ", documentId='" + documentId + '\'' +
+ ", mutationType=" + mutationType +
+ ", additions=" + (additions != null ? additions.size() : 0) +
+ ", deletions=" + (deletions != null ? deletions.size() : 0) +
+ ", isNew=" + isNew +
+ ", isDeleted=" + isDeleted +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+}
diff --git a/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcProducer.java b/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcProducer.java
new file mode 100644
index 0000000000..79d3dd8a0b
--- /dev/null
+++ b/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcProducer.java
@@ -0,0 +1,45 @@
+// Copyright 2025 JanusGraph Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.janusgraph.diskstorage.cdc;
+
+import org.janusgraph.diskstorage.BackendException;
+
+/**
+ * Interface for producing CDC events to Kafka topics.
+ * Implementations should handle serialization and delivery of CDC events.
+ */
+public interface CdcProducer extends AutoCloseable {
+
+ /**
+ * Send a CDC mutation event to the configured Kafka topic.
+ *
+ * @param event The CDC mutation event to send
+ * @throws BackendException if the event cannot be sent
+ */
+ void send(CdcMutationEvent event) throws BackendException;
+
+ /**
+ * Flush any pending events to ensure they are sent.
+ *
+ * @throws BackendException if flush fails
+ */
+ void flush() throws BackendException;
+
+ /**
+ * Close the producer and release resources.
+ */
+ @Override
+ void close();
+}
diff --git a/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcWorker.java b/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcWorker.java
new file mode 100644
index 0000000000..744b584134
--- /dev/null
+++ b/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcWorker.java
@@ -0,0 +1,218 @@
+// Copyright 2025 JanusGraph Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.janusgraph.diskstorage.cdc;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.janusgraph.diskstorage.BackendException;
+import org.janusgraph.diskstorage.BaseTransaction;
+import org.janusgraph.diskstorage.indexing.IndexMutation;
+import org.janusgraph.diskstorage.indexing.IndexProvider;
+import org.janusgraph.diskstorage.indexing.KeyInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * CDC worker that consumes CDC events from Kafka and applies mutations to mixed indexes.
+ * This ensures eventual consistency between the storage backend and mixed indexes.
+ */
+public class CdcWorker implements AutoCloseable {
+
+ private static final Logger log = LoggerFactory.getLogger(CdcWorker.class);
+ private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+ private final KafkaConsumer consumer;
+ private final IndexProvider indexProvider;
+ private final KeyInformation.IndexRetriever indexRetriever;
+ private final ObjectMapper objectMapper;
+ private final AtomicBoolean running;
+ private final Thread workerThread;
+
+ public CdcWorker(String bootstrapServers, String topicName, String groupId,
+ IndexProvider indexProvider, KeyInformation.IndexRetriever indexRetriever) {
+ this(createDefaultProperties(bootstrapServers, groupId), topicName, indexProvider, indexRetriever);
+ }
+
+ public CdcWorker(Properties consumerProperties, String topicName,
+ IndexProvider indexProvider, KeyInformation.IndexRetriever indexRetriever) {
+ this.consumer = new KafkaConsumer<>(consumerProperties);
+ this.consumer.subscribe(Collections.singletonList(topicName));
+ this.indexProvider = indexProvider;
+ this.indexRetriever = indexRetriever;
+ this.objectMapper = new ObjectMapper();
+ this.running = new AtomicBoolean(false);
+ this.workerThread = new Thread(this::run, "CDC-Worker-" + topicName);
+ log.info("Initialized CdcWorker for topic: {}", topicName);
+ }
+
+ private static Properties createDefaultProperties(String bootstrapServers, String groupId) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
+ return props;
+ }
+
+ /**
+ * Start the CDC worker in a separate thread.
+ */
+ public void start() {
+ if (running.compareAndSet(false, true)) {
+ workerThread.start();
+ log.info("Started CDC worker thread");
+ }
+ }
+
+ /**
+ * Main processing loop for CDC events.
+ */
+ private void run() {
+ log.info("CDC worker thread started");
+ while (running.get()) {
+ try {
+ ConsumerRecords records = consumer.poll(POLL_TIMEOUT);
+ if (!records.isEmpty()) {
+ processBatch(records);
+ consumer.commitSync();
+ }
+ } catch (Exception e) {
+ log.error("Error processing CDC events", e);
+ // Continue processing on error
+ }
+ }
+ log.info("CDC worker thread stopped");
+ }
+
+ /**
+ * Process a batch of CDC events.
+ */
+ private void processBatch(ConsumerRecords records) {
+ Map> mutations = new HashMap<>();
+
+ for (ConsumerRecord record : records) {
+ try {
+ CdcMutationEvent event = objectMapper.readValue(record.value(), CdcMutationEvent.class);
+ processEvent(event, mutations);
+ } catch (Exception e) {
+ log.error("Failed to deserialize or process CDC event from offset {}: {}",
+ record.offset(), record.value(), e);
+ }
+ }
+
+ // Apply all mutations to the index
+ if (!mutations.isEmpty()) {
+ applyMutations(mutations);
+ }
+ }
+
+ /**
+ * Process a single CDC event and accumulate mutations.
+ */
+ private void processEvent(CdcMutationEvent event, Map> mutations) {
+ String storeName = event.getStoreName();
+ String documentId = event.getDocumentId();
+
+ // Get or create the store mutations map
+ Map storeMutations = mutations.computeIfAbsent(storeName, k -> new HashMap<>());
+
+ // Get or create the document mutation
+ IndexMutation mutation = storeMutations.get(documentId);
+ if (mutation == null) {
+ mutation = new IndexMutation(
+ indexRetriever.get(storeName),
+ event.getAdditions(),
+ event.getDeletions(),
+ event.isNew(),
+ event.isDeleted()
+ );
+ storeMutations.put(documentId, mutation);
+ } else {
+ // Merge with existing mutation
+ IndexMutation newMutation = new IndexMutation(
+ indexRetriever.get(storeName),
+ event.getAdditions(),
+ event.getDeletions(),
+ event.isNew(),
+ event.isDeleted()
+ );
+ mutation.merge(newMutation);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Processed CDC event: {}", event);
+ }
+ }
+
+ /**
+ * Apply accumulated mutations to the index provider.
+ */
+ private void applyMutations(Map> mutations) {
+ try {
+ BaseTransaction tx = indexProvider.beginTransaction(null);
+ try {
+ indexProvider.mutate(mutations, indexRetriever, tx);
+ tx.commit();
+ log.info("Successfully applied {} store mutations from CDC", mutations.size());
+ } catch (Exception e) {
+ tx.rollback();
+ throw e;
+ }
+ } catch (BackendException e) {
+ log.error("Failed to apply CDC mutations to index", e);
+ throw new RuntimeException("Failed to apply CDC mutations", e);
+ }
+ }
+
+ /**
+ * Stop the CDC worker gracefully.
+ */
+ public void stop() {
+ if (running.compareAndSet(true, false)) {
+ try {
+ workerThread.join(5000);
+ log.info("Stopped CDC worker thread");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Interrupted while stopping CDC worker");
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ stop();
+ try {
+ consumer.close();
+ log.info("Closed CDC worker");
+ } catch (Exception e) {
+ log.error("Error closing Kafka consumer", e);
+ }
+ }
+}
diff --git a/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/KafkaCdcProducer.java b/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/KafkaCdcProducer.java
new file mode 100644
index 0000000000..dd2871d229
--- /dev/null
+++ b/janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/KafkaCdcProducer.java
@@ -0,0 +1,118 @@
+// Copyright 2025 JanusGraph Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.janusgraph.diskstorage.cdc;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.janusgraph.diskstorage.BackendException;
+import org.janusgraph.diskstorage.PermanentBackendException;
+import org.janusgraph.diskstorage.TemporaryBackendException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Kafka-based implementation of CDC producer.
+ * Serializes CDC events as JSON and sends them to a Kafka topic.
+ */
+public class KafkaCdcProducer implements CdcProducer {
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaCdcProducer.class);
+ private static final long SEND_TIMEOUT_MS = 30000; // 30 seconds
+
+ private final KafkaProducer producer;
+ private final String topicName;
+ private final ObjectMapper objectMapper;
+
+ public KafkaCdcProducer(String bootstrapServers, String topicName) {
+ this(createDefaultProperties(bootstrapServers), topicName);
+ }
+
+ public KafkaCdcProducer(Properties kafkaProperties, String topicName) {
+ this.topicName = topicName;
+ this.objectMapper = new ObjectMapper();
+ this.producer = new KafkaProducer<>(kafkaProperties);
+ log.info("Initialized KafkaCdcProducer for topic: {}", topicName);
+ }
+
+ private static Properties createDefaultProperties(String bootstrapServers) {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.RETRIES_CONFIG, 3);
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
+ return props;
+ }
+
+ @Override
+ public void send(CdcMutationEvent event) throws BackendException {
+ try {
+ String key = event.getStoreName() + ":" + event.getDocumentId();
+ String value = objectMapper.writeValueAsString(event);
+
+ ProducerRecord record = new ProducerRecord<>(topicName, key, value);
+
+ producer.send(record, (metadata, exception) -> {
+ if (exception != null) {
+ log.error("Failed to send CDC event to Kafka topic {}: {}", topicName, event, exception);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Successfully sent CDC event to topic {} partition {} offset {}",
+ topicName, metadata.partition(), metadata.offset());
+ }
+ }
+ }).get(SEND_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new TemporaryBackendException("Interrupted while sending CDC event", e);
+ } catch (ExecutionException e) {
+ throw new PermanentBackendException("Failed to send CDC event to Kafka", e.getCause());
+ } catch (TimeoutException e) {
+ throw new TemporaryBackendException("Timeout while sending CDC event to Kafka", e);
+ } catch (Exception e) {
+ throw new PermanentBackendException("Error serializing or sending CDC event", e);
+ }
+ }
+
+ @Override
+ public void flush() throws BackendException {
+ try {
+ producer.flush();
+ } catch (Exception e) {
+ throw new TemporaryBackendException("Failed to flush Kafka producer", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ producer.close();
+ log.info("Closed KafkaCdcProducer for topic: {}", topicName);
+ } catch (Exception e) {
+ log.error("Error closing Kafka producer", e);
+ }
+ }
+}
diff --git a/janusgraph-cdc/src/test/java/org/janusgraph/diskstorage/cdc/CdcConfigurationTest.java b/janusgraph-cdc/src/test/java/org/janusgraph/diskstorage/cdc/CdcConfigurationTest.java
new file mode 100644
index 0000000000..1ff9995863
--- /dev/null
+++ b/janusgraph-cdc/src/test/java/org/janusgraph/diskstorage/cdc/CdcConfigurationTest.java
@@ -0,0 +1,84 @@
+// Copyright 2025 JanusGraph Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.janusgraph.diskstorage.cdc;
+
+import org.janusgraph.diskstorage.configuration.BasicConfiguration;
+import org.janusgraph.diskstorage.configuration.WriteConfiguration;
+import org.janusgraph.diskstorage.configuration.backend.CommonsConfiguration;
+import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for CDC configuration and transaction wrapping.
+ */
+public class CdcConfigurationTest {
+
+ @Test
+ public void testCdcConfigurationDisabled() {
+ WriteConfiguration config = new CommonsConfiguration();
+ config.set("index.search.backend", "elasticsearch");
+
+ BasicConfiguration basicConfig = new BasicConfiguration(
+ GraphDatabaseConfiguration.ROOT_NS,
+ config,
+ BasicConfiguration.Restriction.NONE
+ );
+
+ CdcConfiguration cdcConfig = new CdcConfiguration(basicConfig.restrictTo("search"));
+ assertFalse(cdcConfig.isEnabled());
+ }
+
+ @Test
+ public void testCdcConfigurationEnabled() {
+ WriteConfiguration config = new CommonsConfiguration();
+ config.set("index.search.cdc.enabled", true);
+ config.set("index.search.cdc.kafka-bootstrap-servers", "localhost:9092");
+ config.set("index.search.cdc.kafka-topic", "test-topic");
+ config.set("index.search.cdc.mode", "dual");
+
+ BasicConfiguration basicConfig = new BasicConfiguration(
+ GraphDatabaseConfiguration.ROOT_NS,
+ config,
+ BasicConfiguration.Restriction.NONE
+ );
+
+ CdcConfiguration cdcConfig = new CdcConfiguration(basicConfig.restrictTo("search"));
+ assertTrue(cdcConfig.isEnabled());
+ assertEquals("localhost:9092", cdcConfig.getKafkaBootstrapServers());
+ assertEquals("test-topic", cdcConfig.getKafkaTopic());
+ assertEquals(CdcIndexTransaction.CdcMode.DUAL, cdcConfig.getMode());
+ }
+
+ @Test
+ public void testCdcModeDefaults() {
+ WriteConfiguration config = new CommonsConfiguration();
+ config.set("index.search.cdc.enabled", true);
+ config.set("index.search.cdc.kafka-bootstrap-servers", "localhost:9092");
+ config.set("index.search.cdc.kafka-topic", "test-topic");
+
+ BasicConfiguration basicConfig = new BasicConfiguration(
+ GraphDatabaseConfiguration.ROOT_NS,
+ config,
+ BasicConfiguration.Restriction.NONE
+ );
+
+ CdcConfiguration cdcConfig = new CdcConfiguration(basicConfig.restrictTo("search"));
+ assertEquals(CdcIndexTransaction.CdcMode.DUAL, cdcConfig.getMode());
+ }
+}
diff --git a/janusgraph-cdc/src/test/java/org/janusgraph/diskstorage/cdc/CdcKafkaIntegrationTest.java b/janusgraph-cdc/src/test/java/org/janusgraph/diskstorage/cdc/CdcKafkaIntegrationTest.java
new file mode 100644
index 0000000000..ffac559863
--- /dev/null
+++ b/janusgraph-cdc/src/test/java/org/janusgraph/diskstorage/cdc/CdcKafkaIntegrationTest.java
@@ -0,0 +1,155 @@
+// Copyright 2025 JanusGraph Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.janusgraph.diskstorage.cdc;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.janusgraph.diskstorage.indexing.IndexEntry;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Integration test for CDC producer using Kafka testcontainer.
+ */
+@Testcontainers
+public class CdcKafkaIntegrationTest {
+
+ private static final String TOPIC_NAME = "test-cdc-topic";
+
+ @Container
+ private static final KafkaContainer kafka = new KafkaContainer(
+ DockerImageName.parse("confluentinc/cp-kafka:7.5.0")
+ );
+
+ private KafkaCdcProducer producer;
+ private KafkaConsumer consumer;
+ private ObjectMapper objectMapper;
+
+ @BeforeEach
+ public void setup() {
+ String bootstrapServers = kafka.getBootstrapServers();
+
+ // Create producer
+ producer = new KafkaCdcProducer(bootstrapServers, TOPIC_NAME);
+
+ // Create consumer for verification
+ Properties consumerProps = new Properties();
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ consumer = new KafkaConsumer<>(consumerProps);
+ consumer.subscribe(Collections.singletonList(TOPIC_NAME));
+
+ objectMapper = new ObjectMapper();
+ }
+
+ @AfterEach
+ public void teardown() {
+ if (producer != null) {
+ producer.close();
+ }
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+
+ @Test
+ public void testProducerSendsEventToKafka() throws Exception {
+ // Create a CDC event
+ List additions = Arrays.asList(
+ new IndexEntry("name", "John"),
+ new IndexEntry("age", 30)
+ );
+
+ CdcMutationEvent event = new CdcMutationEvent(
+ "testStore",
+ "doc123",
+ additions,
+ null,
+ true,
+ false,
+ System.currentTimeMillis()
+ );
+
+ // Send event
+ producer.send(event);
+ producer.flush();
+
+ // Consume and verify
+ ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
+ assertEquals(1, records.count(), "Should receive one message");
+
+ ConsumerRecord record = records.iterator().next();
+ assertEquals("testStore:doc123", record.key());
+
+ // Deserialize and verify
+ CdcMutationEvent receivedEvent = objectMapper.readValue(record.value(), CdcMutationEvent.class);
+ assertNotNull(receivedEvent);
+ assertEquals("testStore", receivedEvent.getStoreName());
+ assertEquals("doc123", receivedEvent.getDocumentId());
+ assertEquals(CdcMutationEvent.MutationType.ADDED, receivedEvent.getMutationType());
+ }
+
+ @Test
+ public void testMultipleEventsPreserveOrder() throws Exception {
+ // Send multiple events
+ for (int i = 0; i < 5; i++) {
+ CdcMutationEvent event = new CdcMutationEvent(
+ "store" + i,
+ "doc" + i,
+ Arrays.asList(new IndexEntry("field", "value" + i)),
+ null,
+ false,
+ false,
+ System.currentTimeMillis()
+ );
+ producer.send(event);
+ }
+ producer.flush();
+
+ // Verify all events received
+ ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
+ assertEquals(5, records.count(), "Should receive five messages");
+
+ int count = 0;
+ for (ConsumerRecord record : records) {
+ CdcMutationEvent event = objectMapper.readValue(record.value(), CdcMutationEvent.class);
+ assertNotNull(event);
+ count++;
+ }
+ assertEquals(5, count);
+ }
+}
diff --git a/janusgraph-cdc/src/test/java/org/janusgraph/diskstorage/cdc/CdcMutationEventTest.java b/janusgraph-cdc/src/test/java/org/janusgraph/diskstorage/cdc/CdcMutationEventTest.java
new file mode 100644
index 0000000000..6b267af558
--- /dev/null
+++ b/janusgraph-cdc/src/test/java/org/janusgraph/diskstorage/cdc/CdcMutationEventTest.java
@@ -0,0 +1,97 @@
+// Copyright 2025 JanusGraph Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.janusgraph.diskstorage.cdc;
+
+import org.janusgraph.diskstorage.indexing.IndexEntry;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests for CDC classes.
+ */
+public class CdcMutationEventTest {
+
+ @Test
+ public void testCdcMutationEventCreation() {
+ List additions = Arrays.asList(
+ new IndexEntry("field1", "value1"),
+ new IndexEntry("field2", "value2")
+ );
+ List deletions = Arrays.asList(
+ new IndexEntry("field3", "value3")
+ );
+
+ CdcMutationEvent event = new CdcMutationEvent(
+ "testStore",
+ "doc123",
+ additions,
+ deletions,
+ true,
+ false,
+ System.currentTimeMillis()
+ );
+
+ assertEquals("testStore", event.getStoreName());
+ assertEquals("doc123", event.getDocumentId());
+ assertEquals(2, event.getAdditions().size());
+ assertEquals(1, event.getDeletions().size());
+ assertTrue(event.isNew());
+ assertFalse(event.isDeleted());
+ assertEquals(CdcMutationEvent.MutationType.ADDED, event.getMutationType());
+ }
+
+ @Test
+ public void testMutationTypeDetection() {
+ // Test ADDED type
+ CdcMutationEvent addedEvent = new CdcMutationEvent(
+ "store", "doc1", null, null, true, false, System.currentTimeMillis()
+ );
+ assertEquals(CdcMutationEvent.MutationType.ADDED, addedEvent.getMutationType());
+
+ // Test UPDATED type
+ CdcMutationEvent updatedEvent = new CdcMutationEvent(
+ "store", "doc2", null, null, false, false, System.currentTimeMillis()
+ );
+ assertEquals(CdcMutationEvent.MutationType.UPDATED, updatedEvent.getMutationType());
+
+ // Test DELETED type
+ CdcMutationEvent deletedEvent = new CdcMutationEvent(
+ "store", "doc3", null, null, false, true, System.currentTimeMillis()
+ );
+ assertEquals(CdcMutationEvent.MutationType.DELETED, deletedEvent.getMutationType());
+ }
+
+ @Test
+ public void testEqualsAndHashCode() {
+ long timestamp = System.currentTimeMillis();
+ List additions = Arrays.asList(new IndexEntry("field1", "value1"));
+
+ CdcMutationEvent event1 = new CdcMutationEvent(
+ "store", "doc1", additions, null, true, false, timestamp
+ );
+ CdcMutationEvent event2 = new CdcMutationEvent(
+ "store", "doc1", additions, null, true, false, timestamp
+ );
+
+ assertEquals(event1, event2);
+ assertEquals(event1.hashCode(), event2.hashCode());
+ }
+}
diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java
index 7c51ad087a..06fe224c8d 100644
--- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java
+++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java
@@ -1118,6 +1118,29 @@ public boolean apply(@Nullable String s) {
"and its the developers responsibility to avoid field collisions.",
ConfigOption.Type.GLOBAL, true);
+ // ############## CDC System ######################
+ // ################################################
+
+ public static final ConfigNamespace CDC_NS = new ConfigNamespace(INDEX_NS,"cdc","Configuration options for Change Data Capture for mixed indexes");
+
+ public static final ConfigOption INDEX_CDC_ENABLED = new ConfigOption<>(CDC_NS,"enabled",
+ "Enable Change Data Capture (CDC) for mixed index mutations. When enabled, mixed index mutations are " +
+ "published to a Kafka topic for eventual consistency processing by CDC workers.",
+ ConfigOption.Type.GLOBAL, false);
+
+ public static final ConfigOption INDEX_CDC_MODE = new ConfigOption<>(CDC_NS,"mode",
+ "CDC mode for mixed index mutations. Options: 'skip' (skip mutations during transaction, rely on CDC), " +
+ "'dual' (write during transaction AND via CDC for consistency), 'cdc-only' (only via CDC).",
+ ConfigOption.Type.GLOBAL, "dual");
+
+ public static final ConfigOption INDEX_CDC_KAFKA_BOOTSTRAP_SERVERS = new ConfigOption<>(CDC_NS,"kafka-bootstrap-servers",
+ "Comma-separated list of Kafka bootstrap servers for CDC event publishing.",
+ ConfigOption.Type.GLOBAL, String.class);
+
+ public static final ConfigOption INDEX_CDC_KAFKA_TOPIC = new ConfigOption<>(CDC_NS,"kafka-topic",
+ "Kafka topic name for publishing CDC events for mixed index mutations.",
+ ConfigOption.Type.GLOBAL, "janusgraph-cdc-index-mutations");
+
// ############## Logging System ######################
// ################################################
diff --git a/pom.xml b/pom.xml
index 31f39b2f91..61dfe9552e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,6 +131,7 @@
cassandra-hadoop-util
scylla-hadoop-util
janusgraph-cql
+ janusgraph-cdc
janusgraph-hadoop
janusgraph-hbase
janusgraph-bigtable
@@ -1156,6 +1157,11 @@
solr
${testcontainers.version}
+
+ org.testcontainers
+ kafka
+ ${testcontainers.version}
+
org.testcontainers
junit-jupiter