diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..451cb26
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,7 @@
+.idea
+target
+.project
+.settings
+.classpath
+.java-version
+*.iml
\ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..35802c4
--- /dev/null
+++ b/README.md
@@ -0,0 +1 @@
+# Apache Kafka Client Metrics Reporter Plugin
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..5e0757a
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,103 @@
+
+
+ 4.0.0
+
+ com.kafka.instaclustr
+ apache-kafka-client-metrics-reporter-plugin
+ 1.0
+ jar
+
+
+ 11
+ 11
+ UTF-8
+
+
+
+
+
+ org.slf4j
+ slf4j-api
+ 2.0.13
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 3.8.0
+
+
+
+ org.yaml
+ snakeyaml
+ 2.5
+
+
+
+ io.opentelemetry.proto
+ opentelemetry-proto
+ 1.8.0-alpha
+
+
+
+ io.grpc
+ grpc-netty-shaded
+ 1.75.0
+
+
+ io.grpc
+ grpc-protobuf
+ 1.75.0
+
+
+ io.grpc
+ grpc-stub
+ 1.75.0
+
+
+
+ com.google.protobuf
+ protobuf-java
+ 4.32.0
+
+
+
+
+
+
+
+ src/main/resources
+
+
+
+
+ maven-assembly-plugin
+ 3.7.1
+
+
+ jar-with-dependencies
+
+
+
+ com.instaclustr.kafka.KafkaClientMetricsReporter
+ true
+ true
+
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/instaclustr/kafka/KafkaClientMetricsReporter.java b/src/main/java/com/instaclustr/kafka/KafkaClientMetricsReporter.java
new file mode 100644
index 0000000..943d43b
--- /dev/null
+++ b/src/main/java/com/instaclustr/kafka/KafkaClientMetricsReporter.java
@@ -0,0 +1,46 @@
+package com.instaclustr.kafka;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.server.telemetry.ClientTelemetry;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaClientMetricsReporter implements MetricsReporter, ClientTelemetry {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaClientMetricsReporter.class);
+
+
+ @Override
+ public void init(List metrics) {
+ logger.debug("Initializing the client metric reporter: {}", metrics);
+ }
+
+ @Override
+ public void configure(final Map configs) {
+ logger.debug("Configuration of the reporter");
+ }
+
+ @Override
+ public void metricChange(final KafkaMetric metric) {
+ logger.debug("Changing the metric {}", metric.metricName());
+ }
+
+ @Override
+ public void metricRemoval(final KafkaMetric metric) {
+ logger.debug("Removing the metric {}", metric.metricName());
+ }
+
+ @Override
+ public void close() {
+ logger.debug("Closing the reporter");
+ }
+
+ @Override
+ public ClientTelemetryReceiver clientReceiver() {
+ return new KafkaClientMetricsReporterReceiver();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/instaclustr/kafka/KafkaClientMetricsReporterConfig.java b/src/main/java/com/instaclustr/kafka/KafkaClientMetricsReporterConfig.java
new file mode 100644
index 0000000..d8ea061
--- /dev/null
+++ b/src/main/java/com/instaclustr/kafka/KafkaClientMetricsReporterConfig.java
@@ -0,0 +1,28 @@
+package com.instaclustr.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileInputStream;
+import java.util.Map;
+
+public class KafkaClientMetricsReporterConfig {
+
+ public Map configurations;
+ private static final Logger logger = LoggerFactory.getLogger(KafkaClientMetricsReporterConfig.class);
+
+ public KafkaClientMetricsReporterConfig() {
+ try {
+ Yaml yaml = new Yaml();
+ final String kafkaClientMetricsConfigFilePath = System.getenv("KAFKA_CLIENT_METRICS_CONFIG_PATH");
+ logger.debug("Loading telemetry config from: {}", kafkaClientMetricsConfigFilePath);
+ final FileInputStream fis = new FileInputStream(kafkaClientMetricsConfigFilePath);
+ this.configurations = yaml.load(fis);
+
+ } catch (final Exception ex) {
+ logger.debug("Failed to load telemetry config", ex);
+ throw new RuntimeException("Failed to load telemetry config", ex);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/instaclustr/kafka/KafkaClientMetricsReporterReceiver.java b/src/main/java/com/instaclustr/kafka/KafkaClientMetricsReporterReceiver.java
new file mode 100644
index 0000000..f9bd245
--- /dev/null
+++ b/src/main/java/com/instaclustr/kafka/KafkaClientMetricsReporterReceiver.java
@@ -0,0 +1,28 @@
+package com.instaclustr.kafka;
+
+import com.instaclustr.kafka.exporters.MetricsExporter;
+import com.instaclustr.kafka.exporters.MetricsExporterFactory;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaClientMetricsReporterReceiver implements ClientTelemetryReceiver {
+
+ private final MetricsExporter metricsExporter;
+ private static final Logger logger = LoggerFactory.getLogger(KafkaClientMetricsReporterReceiver.class);
+
+ public KafkaClientMetricsReporterReceiver() {
+ logger.info("Initializing the Kafka Client Metrics Reporter Receiver");
+ final KafkaClientMetricsReporterConfig kafkaClientMetricsReporterConfig = new KafkaClientMetricsReporterConfig();
+ this.metricsExporter = MetricsExporterFactory.create(kafkaClientMetricsReporterConfig.configurations);
+ logger.info("Initialized the Kafka Client Metrics Reporter Receiver");
+ }
+
+ @Override
+ public void exportMetrics(AuthorizableRequestContext requestContext, ClientTelemetryPayload telemetryPayload) {
+ metricsExporter.export(telemetryPayload);
+ }
+}
+
diff --git a/src/main/java/com/instaclustr/kafka/exporters/GrpcMetricsExporter.java b/src/main/java/com/instaclustr/kafka/exporters/GrpcMetricsExporter.java
new file mode 100644
index 0000000..340fb25
--- /dev/null
+++ b/src/main/java/com/instaclustr/kafka/exporters/GrpcMetricsExporter.java
@@ -0,0 +1,62 @@
+package com.instaclustr.kafka.exporters;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
+import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
+import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+public class GrpcMetricsExporter implements MetricsExporter {
+ private final String endpoint;
+ private final int timeout;
+ private final ManagedChannel channel;
+ private final MetricsServiceGrpc.MetricsServiceBlockingStub stub;
+ private final Logger logger = LoggerFactory.getLogger(GrpcMetricsExporter.class);
+
+ public GrpcMetricsExporter(String endpoint, int timeout) {
+ this.endpoint = endpoint;
+ this.timeout = timeout;
+ this.channel = ManagedChannelBuilder
+ .forTarget(endpoint)
+ .usePlaintext()
+ .build();
+ this.stub = MetricsServiceGrpc
+ .newBlockingStub(channel)
+ .withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void export(ClientTelemetryPayload payload) {
+ try {
+ ByteBuffer byteBuffer = payload.data();
+ byte[] bytes;
+
+ if (byteBuffer.hasArray()) {
+ bytes = byteBuffer.array();
+ } else {
+ bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ }
+
+ ExportMetricsServiceRequest req = ExportMetricsServiceRequest.parseFrom(bytes);
+
+ stub.export(req);
+ logger.info("Successfully exported {} ResourceMetrics to {}",
+ req.getResourceMetricsCount(), endpoint);
+ } catch (Exception e) {
+ logger.error("Failed to export OTLP metrics to {}: {}", endpoint, e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Call this on shutdown to cleanly close the gRPC channel.
+ */
+ public void shutdown() throws InterruptedException {
+ channel.shutdown().awaitTermination(this.timeout, TimeUnit.MILLISECONDS);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/instaclustr/kafka/exporters/HttpMetricsExporter.java b/src/main/java/com/instaclustr/kafka/exporters/HttpMetricsExporter.java
new file mode 100644
index 0000000..79d49e5
--- /dev/null
+++ b/src/main/java/com/instaclustr/kafka/exporters/HttpMetricsExporter.java
@@ -0,0 +1,125 @@
+package com.instaclustr.kafka.exporters;
+
+import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
+import org.apache.kafka.shaded.io.opentelemetry.proto.common.v1.AnyValue;
+import org.apache.kafka.shaded.io.opentelemetry.proto.common.v1.KeyValue;
+import org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.MetricsData;
+import org.apache.kafka.shaded.io.opentelemetry.proto.resource.v1.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Map;
+
+public class HttpMetricsExporter implements MetricsExporter {
+ private final String endpoint;
+ private final HttpClient httpClient;
+ final Map metadata;
+ private final Logger logger = LoggerFactory.getLogger(HttpMetricsExporter.class);
+
+ public HttpMetricsExporter(final String endpoint, final int timeoutMillis, final Map metadata) {
+ this.endpoint = endpoint;
+ this.metadata = metadata;
+ this.httpClient = HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_2)
+ .connectTimeout(Duration.ofMillis(timeoutMillis))
+ .followRedirects(HttpClient.Redirect.NORMAL)
+ .build();
+ }
+
+ @Override
+ public void export(final ClientTelemetryPayload payload) {
+ try {
+ final byte[] rawBytes = bufferToBytes(payload.data());
+ MetricsData metricsData = MetricsData.parseFrom(rawBytes);
+
+ final byte[] finalBytes = shouldEnrich(metricsData)
+ ? enrichMetricsData(metricsData)
+ : rawBytes;
+ HttpRequest request = buildRequest(finalBytes);
+ sendAsync(request);
+ } catch (Exception e) {
+ logger.error("Error exporting OTLP metrics to {}: {}", endpoint, e.getMessage(), e);
+ }
+ }
+
+ private byte[] bufferToBytes(ByteBuffer buffer) {
+ if (buffer.hasArray()) {
+ return buffer.array();
+ } else {
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ return bytes;
+ }
+ }
+
+ private boolean shouldEnrich(final MetricsData metricsData) {
+ return !this.metadata.isEmpty() && metricsData.getResourceMetricsCount() > 0;
+ }
+
+ private byte[] enrichMetricsData(MetricsData metricsData) {
+ MetricsData.Builder dataBuilder = metricsData.toBuilder();
+
+ for (int i = 0; i < dataBuilder.getResourceMetricsCount(); i++) {
+ Resource.Builder resourceBuilder =
+ dataBuilder.getResourceMetricsBuilder(i).getResourceBuilder();
+
+ this.metadata.forEach((key, value) ->
+ resourceBuilder.addAttributes(toKeyValue(key, value))
+ );
+ }
+
+ return dataBuilder.build().toByteArray();
+ }
+
+ private KeyValue toKeyValue(final String key, final Object value) {
+ return KeyValue.newBuilder()
+ .setKey(key)
+ .setValue(toAnyValue(value))
+ .build();
+ }
+
+ private AnyValue toAnyValue(final Object value) {
+ AnyValue.Builder b = AnyValue.newBuilder();
+ if (value instanceof String) {
+ b.setStringValue((String) value);
+ } else if (value instanceof Boolean) {
+ b.setBoolValue((Boolean) value);
+ } else if (value instanceof Long) {
+ b.setIntValue((Long) value);
+ } else if (value instanceof Integer) {
+ b.setIntValue(((Integer) value).longValue());
+ } else if (value instanceof Double) {
+ b.setDoubleValue((Double) value);
+ } else {
+ // fallback
+ b.setStringValue(value.toString());
+ }
+ return b.build();
+ }
+
+ private HttpRequest buildRequest(final byte[] payload) {
+ return HttpRequest.newBuilder()
+ .uri(URI.create(endpoint))
+ .header("Content-Type", "application/x-protobuf")
+ .POST(HttpRequest.BodyPublishers.ofByteArray(payload))
+ .build();
+ }
+
+ private void sendAsync(final HttpRequest request) {
+ httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
+ .thenAccept(response -> {
+ logger.info("OTLP metrics endpoint status: {}", response.statusCode());
+ logger.debug("Response body: {}", response.body());
+ })
+ .exceptionally(ex -> {
+ logger.error("Error invoking the OTLP metrics endpoint", ex);
+ return null;
+ });
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/instaclustr/kafka/exporters/LogMetricsExporter.java b/src/main/java/com/instaclustr/kafka/exporters/LogMetricsExporter.java
new file mode 100644
index 0000000..337b9b1
--- /dev/null
+++ b/src/main/java/com/instaclustr/kafka/exporters/LogMetricsExporter.java
@@ -0,0 +1,19 @@
+package com.instaclustr.kafka.exporters;
+
+import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LogMetricsExporter implements MetricsExporter {
+ private final String logPath;
+ private final Logger logger = LoggerFactory.getLogger(LogMetricsExporter.class);
+
+ public LogMetricsExporter(String logPath) {
+ this.logPath = logPath != null ? logPath : "/lib/default-metrics.log";
+ }
+
+ @Override
+ public void export(ClientTelemetryPayload payload) {
+ logger.info("Logging metrics to {}: {}", logPath, payload);
+ }
+}
diff --git a/src/main/java/com/instaclustr/kafka/exporters/MetricsExporter.java b/src/main/java/com/instaclustr/kafka/exporters/MetricsExporter.java
new file mode 100644
index 0000000..fd91d8e
--- /dev/null
+++ b/src/main/java/com/instaclustr/kafka/exporters/MetricsExporter.java
@@ -0,0 +1,7 @@
+package com.instaclustr.kafka.exporters;
+
+import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
+
+public interface MetricsExporter {
+ void export(ClientTelemetryPayload payload);
+}
diff --git a/src/main/java/com/instaclustr/kafka/exporters/MetricsExporterFactory.java b/src/main/java/com/instaclustr/kafka/exporters/MetricsExporterFactory.java
new file mode 100644
index 0000000..955183a
--- /dev/null
+++ b/src/main/java/com/instaclustr/kafka/exporters/MetricsExporterFactory.java
@@ -0,0 +1,42 @@
+package com.instaclustr.kafka.exporters;
+
+import java.util.Map;
+
+public class MetricsExporterFactory {
+
+ private MetricsExporterFactory() {}
+
+ @SuppressWarnings("unchecked")
+ public static MetricsExporter create(final Map configurations) {
+
+ final Object exporterObj = configurations.get("exporter");
+ if (!(exporterObj instanceof Map)) {
+ throw new IllegalArgumentException("Exporter configuration is not a valid map");
+ }
+ final Map exporterMap = (Map) exporterObj;
+
+ Object metadataObj = configurations.get("metadata");
+ if (metadataObj != null && !(metadataObj instanceof Map)) {
+ throw new IllegalArgumentException("Metadata configuration is not a valid map");
+ }
+ final Map metadata = (Map) metadataObj;
+
+ switch (((String) exporterMap.get("mode")).toUpperCase()) {
+ case "HTTP":
+ return new HttpMetricsExporter(
+ (String) exporterMap.get("endpoint"),
+ (int) exporterMap.get("timeout"),
+ metadata
+ );
+ case "GRPC":
+ return new GrpcMetricsExporter(
+ (String) exporterMap.get("endpoint"),
+ (int) exporterMap.get("timeout")
+ );
+ case "LOG":
+ return new LogMetricsExporter((String) exporterMap.get("logPath"));
+ default:
+ throw new IllegalArgumentException("Unknown mode: " + exporterMap.get("mode"));
+ }
+ }
+}