Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions janusgraph-cdc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# JanusGraph CDC Module

This module provides Change Data Capture (CDC) support for JanusGraph mixed index mutations to ensure eventual consistency between the storage backend (Cassandra) and mixed indexes (ElasticSearch, Solr, etc.).

## Overview

When using JanusGraph with external index backends, there's a risk of inconsistency if the index update fails while the graph data is successfully written to the storage backend. CDC addresses this by:

1. Publishing all mixed index mutations to a Kafka topic
2. Running separate CDC workers that consume these events and apply them to the index
3. Ensuring eventual consistency even in failure scenarios

## Architecture

### Components

1. **CdcProducer**: Publishes index mutation events to Kafka
2. **CdcWorker**: Consumes events from Kafka and applies them to indexes
3. **CdcIndexTransaction**: Wraps IndexTransaction to capture mutations
4. **CdcConfiguration**: Manages CDC settings

### CDC Modes

- **dual** (default): Write to index during transaction AND publish to CDC topic for redundancy
- **skip**: Skip index writes during transaction, rely entirely on CDC
- **cdc-only**: Alias for skip mode (for backward compatibility)

## Configuration

Add the following to your JanusGraph configuration:

```properties
# Enable CDC for mixed indexes
index.search.cdc.enabled=true

# CDC mode (dual, skip, or cdc-only)
index.search.cdc.mode=dual

# Kafka bootstrap servers
index.search.cdc.kafka-bootstrap-servers=localhost:9092

# Kafka topic for CDC events
index.search.cdc.kafka-topic=janusgraph-cdc-index-mutations
```

## Usage

### Running CDC Workers

CDC workers should be run as separate processes:

```java
// Create index provider and retriever
IndexProvider indexProvider = ...;
KeyInformation.IndexRetriever indexRetriever = ...;

// Create and start CDC worker
CdcWorker worker = new CdcWorker(
"localhost:9092", // Kafka bootstrap servers
"janusgraph-cdc-index-mutations", // Topic name
"janusgraph-cdc-group", // Consumer group ID
indexProvider,
indexRetriever
);

worker.start();

// Keep worker running...

// Shutdown gracefully
worker.stop();
worker.close();
```

### Integration with JanusGraph

CDC can be integrated programmatically using the CdcIndexTransactionFactory:

```java
Configuration config = ...; // Your JanusGraph configuration
CdcIndexTransactionFactory cdcFactory = new CdcIndexTransactionFactory(config);

// Wrap index transactions with CDC support
IndexTransaction indexTx = ...; // Original index transaction
IndexTransaction wrappedTx = cdcFactory.wrapIfEnabled(indexTx);

// Use wrappedTx as normal
// Mutations will be captured and published to Kafka
```

## Benefits

1. **Eventual Consistency**: Guarantees that index and storage backend will eventually be consistent
2. **Failure Recovery**: Automatic recovery from index update failures
3. **Operational Flexibility**: CDC workers can be scaled independently
4. **Minimal Performance Impact**: Asynchronous processing offloads index updates

## Dependencies

- Apache Kafka 3.6.1+
- Debezium Core 2.5.0+
- Jackson for JSON serialization

## Limitations

- Requires Kafka infrastructure
- Eventual consistency means slight delay in index updates
- CDC workers must be managed separately from JanusGraph instances

## Future Enhancements

- Automatic integration with Backend
- Support for other message brokers (RabbitMQ, etc.)
- Built-in CDC worker management
- Metrics and monitoring integration
120 changes: 120 additions & 0 deletions janusgraph-cdc/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>

<artifactId>janusgraph-cdc</artifactId>
<name>JanusGraph-CDC: Change Data Capture Support</name>
<url>https://janusgraph.org</url>

<properties>
<top.level.basedir>${basedir}/..</top.level.basedir>
<kafka.version>3.6.1</kafka.version>
<debezium.version>2.5.0.Final</debezium.version>
</properties>

<dependencies>
<!-- JanusGraph Core -->
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-core</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Kafka Dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<!-- Debezium Dependencies -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.version}</version>
</dependency>

<!-- Jackson for JSON serialization -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<!-- SLF4J for logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<!-- Test Dependencies -->
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-backend-testutils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-cql</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-es</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>cassandra</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2025 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.diskstorage.cdc;

import org.janusgraph.diskstorage.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_CDC_ENABLED;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_CDC_KAFKA_BOOTSTRAP_SERVERS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_CDC_KAFKA_TOPIC;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_CDC_MODE;

/**
* Configuration holder for CDC settings.
*/
public class CdcConfiguration {

private static final Logger log = LoggerFactory.getLogger(CdcConfiguration.class);

private final boolean enabled;
private final CdcIndexTransaction.CdcMode mode;
private final String kafkaBootstrapServers;
private final String kafkaTopic;

public CdcConfiguration(Configuration config) {
this.enabled = config.get(INDEX_CDC_ENABLED);

if (enabled) {
this.mode = parseCdcMode(config.get(INDEX_CDC_MODE));
this.kafkaBootstrapServers = config.get(INDEX_CDC_KAFKA_BOOTSTRAP_SERVERS);
this.kafkaTopic = config.get(INDEX_CDC_KAFKA_TOPIC);
validate();
log.info("CDC enabled with mode: {}, topic: {}, bootstrap servers: {}",
mode, kafkaTopic, kafkaBootstrapServers);
} else {
// When disabled, these values are not used
this.mode = null;
this.kafkaBootstrapServers = null;
this.kafkaTopic = null;
}
}

private CdcIndexTransaction.CdcMode parseCdcMode(String modeStr) {
if (modeStr == null || modeStr.isEmpty()) {
return CdcIndexTransaction.CdcMode.DUAL;
}

switch (modeStr.toLowerCase()) {
case "skip":
return CdcIndexTransaction.CdcMode.SKIP;
case "dual":
return CdcIndexTransaction.CdcMode.DUAL;
case "cdc-only":
return CdcIndexTransaction.CdcMode.CDC_ONLY;
default:
log.warn("Unknown CDC mode: {}, defaulting to DUAL", modeStr);
return CdcIndexTransaction.CdcMode.DUAL;
}
}

private void validate() {
if (kafkaBootstrapServers == null || kafkaBootstrapServers.isEmpty()) {
throw new IllegalArgumentException("CDC is enabled but kafka bootstrap servers are not configured");
}
if (kafkaTopic == null || kafkaTopic.isEmpty()) {
throw new IllegalArgumentException("CDC is enabled but kafka topic is not configured");
}
}

public boolean isEnabled() {
return enabled;
}

public CdcIndexTransaction.CdcMode getMode() {
return mode;
}

public String getKafkaBootstrapServers() {
return kafkaBootstrapServers;
}

public String getKafkaTopic() {
return kafkaTopic;
}
}
Loading
Loading