Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public void methodAdvice(MethodTransformer transformer) {
.and(named("update"))
.and(takesArgument(1, named("org.apache.kafka.common.requests.MetadataResponse"))),
MetadataInstrumentation.class.getName() + "$MetadataUpdate22AndAfterAdvice");
transformer.applyAdvice(
isMethod().and(named("failedUpdate")),
MetadataInstrumentation.class.getName() + "$FailedUpdateAdvice");
}

public static class MetadataUpdateBefore22Advice {
Expand All @@ -103,6 +106,21 @@ public static void muzzleCheck(ConsumerRecord record) {
}
}

public static class FailedUpdateAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.This final Metadata metadata) {
MetadataState state =
InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata);
if (state != null) {
KafkaConfigHelper.reportPendingConfigAsFailed(state);
}
}

public static void muzzleCheck(ConsumerRecord record) {
record.headers();
}
}

public static class MetadataUpdate22AndAfterAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,7 @@ public void methodAdvice(MethodTransformer transformer) {
.and(named("update"))
.and(takesArgument(1, named("org.apache.kafka.common.requests.MetadataResponse"))),
packageName + ".MetadataUpdate22AndAfterAdvice");
transformer.applyAdvice(
isMethod().and(named("failedUpdate")), packageName + ".MetadataFailedUpdateAdvice");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package datadog.trace.instrumentation.kafka_clients38;

import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.instrumentation.kafka_common.KafkaConfigHelper;
import datadog.trace.instrumentation.kafka_common.MetadataState;
import net.bytebuddy.asm.Advice;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class MetadataFailedUpdateAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.This final Metadata metadata) {
MetadataState state =
InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata);
if (state != null) {
KafkaConfigHelper.reportPendingConfigAsFailed(state);
}
}

public static void muzzleCheck(ConsumerRecord record) {
record.headers();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ public class KafkaConfigHelper {
"socket.connection.setup.timeout.ms",
"socket.connection.setup.timeout.max.ms",
"security.protocol",
"sasl.mechanism",
"sasl.kerberos.service.name",
"sasl.login.callback.handler.class",
"ssl.protocol",
"ssl.enabled.protocols",
"ssl.endpoint.identification.algorithm",
"ssl.truststore.type",
"ssl.keystore.type",
"ssl.cipher.suites",
"metrics.sample.window.ms",
"metrics.num.samples",
"metrics.recording.level",
Expand Down Expand Up @@ -107,14 +116,38 @@ public static void storePendingConsumerConfig(

/** Called from metadata update advice when the cluster ID becomes available. */
public static void reportPendingConfig(MetadataState state, String clusterId) {
reportPendingConfig(state, clusterId, PendingConfig.STATUS_CONNECTED);
}

/**
* Called from failure advice when the client cannot reach / authenticate to the cluster. Peeks
* (does not consume) the pending config, so a later successful update can still emit "connected".
*/
public static void reportPendingConfigAsFailed(MetadataState state) {
PendingConfig pending = state.peekPendingConfig();
if (pending != null) {
emitKafkaConfig(pending, state.clusterId, PendingConfig.STATUS_FAILED);
}
}

private static void reportPendingConfig(MetadataState state, String clusterId, String status) {
PendingConfig pending = state.takePendingConfig();
if (pending != null) {
log.debug("Received cluster ID, reporting {} config", pending.type);
if (Config.get().isDataStreamsEnabled()) {
AgentTracer.get()
.getDataStreamsMonitoring()
.reportKafkaConfig(pending.type, clusterId, pending.consumerGroup, pending.config);
}
emitKafkaConfig(pending, clusterId, status);
}
}

private static void emitKafkaConfig(PendingConfig pending, String clusterId, String status) {
log.debug("Reporting {} config with status={}", pending.type, status);
if (Config.get().isDataStreamsEnabled()) {
AgentTracer.get()
.getDataStreamsMonitoring()
.reportKafkaConfig(
pending.type,
clusterId != null ? clusterId : "",
pending.consumerGroup,
pending.config,
status);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ public void setPendingConfig(PendingConfig config) {
public PendingConfig takePendingConfig() {
return pendingConfig.getAndSet(null);
}

/** Snapshot the pending config without clearing it. */
public PendingConfig peekPendingConfig() {
return pendingConfig.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import java.util.Map;

/** Holds pending Kafka config info until the cluster ID becomes available from metadata. */
/** Holds pending Kafka config info until the client's connection lifecycle resolves. */
public class PendingConfig {
public static final String STATUS_CONNECTED = "connected";
public static final String STATUS_FAILED = "failed";

public final String type;
public final String consumerGroup;
public final Map<String, String> config;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package datadog.trace.instrumentation.kafka_common;

import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;

import java.util.HashMap;
import org.junit.jupiter.api.Test;

class MetadataStateTest {

private static PendingConfig newPending() {
return new PendingConfig("kafka_producer", "", new HashMap<>());
}

@Test
void peekDoesNotConsumePendingConfig() {
MetadataState state = new MetadataState();
PendingConfig pending = newPending();
state.setPendingConfig(pending);

// Reviewer's concern: a transient failedUpdate must leave the pending config in place
// so a later successful update can still take it and emit "connected".
assertSame(pending, state.peekPendingConfig());
assertSame(pending, state.peekPendingConfig());

assertSame(pending, state.takePendingConfig());
assertNull(state.peekPendingConfig());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,18 @@ public void reportSchemaRegistryUsage(

@Override
public void reportKafkaConfig(
String type, String kafkaClusterId, String consumerGroup, Map<String, String> config) {
String type,
String kafkaClusterId,
String consumerGroup,
Map<String, String> config,
String connectionStatus) {
inbox.offer(
new KafkaConfigReport(
type,
kafkaClusterId,
consumerGroup,
config,
connectionStatus,
timeSource.getCurrentTimeNanos(),
getThreadServiceName()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter
private static final byte[] CONFIG_KAFKA_CLUSTER_ID = "KafkaClusterId".getBytes(ISO_8859_1);
private static final byte[] CONFIG_CONSUMER_GROUP = "ConsumerGroup".getBytes(ISO_8859_1);
private static final byte[] CONFIG_ENTRIES = "Config".getBytes(ISO_8859_1);
private static final byte[] CONFIG_CONNECTION_STATUS = "ConnectionStatus".getBytes(ISO_8859_1);

private static final int INITIAL_CAPACITY = 512 * 1024;

Expand Down Expand Up @@ -290,7 +291,7 @@ private void writeKafkaConfigs(List<KafkaConfigReport> configs, Writable packer)
packer.writeUTF8(CONFIGS);
packer.startArray(configs.size());
for (KafkaConfigReport config : configs) {
packer.startMap(4); // Type, KafkaClusterId, ConsumerGroup, Config
packer.startMap(5); // Type, KafkaClusterId, ConsumerGroup, ConnectionStatus, Config

packer.writeUTF8(CONFIG_TYPE);
packer.writeString(config.getType(), null);
Expand All @@ -301,6 +302,9 @@ private void writeKafkaConfigs(List<KafkaConfigReport> configs, Writable packer)
packer.writeUTF8(CONFIG_CONSUMER_GROUP);
packer.writeString(config.getConsumerGroup(), null);

packer.writeUTF8(CONFIG_CONNECTION_STATUS);
packer.writeString(config.getConnectionStatus(), null);

packer.writeUTF8(CONFIG_ENTRIES);
Map<String, String> entries = config.getConfig();
packer.startMap(entries.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,13 @@ void writeKafkaConfigsToMockServer() throws InterruptedException, IOException {
producerConfig.put("bootstrap.servers", "localhost:9092");
producerConfig.put("acks", "all");
producerConfig.put("linger.ms", "5");
dataStreams.reportKafkaConfig("kafka_producer", "", "", producerConfig);
dataStreams.reportKafkaConfig("kafka_producer", "", "", producerConfig, "connected");

Map<String, String> consumerConfig = new HashMap<>();
consumerConfig.put("bootstrap.servers", "localhost:9092");
consumerConfig.put("group.id", "test-group");
consumerConfig.put("auto.offset.reset", "earliest");
dataStreams.reportKafkaConfig("kafka_consumer", "", "test-group", consumerConfig);
dataStreams.reportKafkaConfig("kafka_consumer", "", "test-group", consumerConfig, "connected");

// Also add a stats point so the bucket is not empty of stats
dataStreams.add(
Expand Down Expand Up @@ -355,8 +355,8 @@ void duplicateKafkaConfigsAreEachSerializedInPayload() throws InterruptedExcepti
Map<String, String> producerConfig = new HashMap<>();
producerConfig.put("bootstrap.servers", "localhost:9092");
producerConfig.put("acks", "all");
dataStreams.reportKafkaConfig("kafka_producer", "", "", producerConfig);
dataStreams.reportKafkaConfig("kafka_producer", "", "", producerConfig);
dataStreams.reportKafkaConfig("kafka_producer", "", "", producerConfig, "connected");
dataStreams.reportKafkaConfig("kafka_producer", "", "", producerConfig, "connected");

// Also add a stats point so the bucket has content
dataStreams.add(
Expand Down Expand Up @@ -407,13 +407,15 @@ private void validateKafkaConfigMessage(byte[] message) throws IOException {
// Collect configs in a map keyed by type
Map<String, Map<String, String>> configsByType = new HashMap<>();
for (int n = 0; n < numConfigs; n++) {
assertEquals(4, unpacker.unpackMapHeader());
assertEquals(5, unpacker.unpackMapHeader());
assertEquals("Type", unpacker.unpackString());
String type = unpacker.unpackString();
assertEquals("KafkaClusterId", unpacker.unpackString());
unpacker.unpackString(); // skip cluster id value
assertEquals("ConsumerGroup", unpacker.unpackString());
unpacker.unpackString(); // skip consumer group value
assertEquals("ConnectionStatus", unpacker.unpackString());
assertEquals("connected", unpacker.unpackString());
assertEquals("Config", unpacker.unpackString());
int configSize = unpacker.unpackMapHeader();
Map<String, String> configEntries = new HashMap<>();
Expand Down Expand Up @@ -481,13 +483,15 @@ private void validateDuplicateKafkaConfigMessage(byte[] message) throws IOExcept
assertEquals(2, numConfigs);

for (int n = 0; n < numConfigs; n++) {
assertEquals(4, unpacker.unpackMapHeader());
assertEquals(5, unpacker.unpackMapHeader());
assertEquals("Type", unpacker.unpackString());
assertEquals("kafka_producer", unpacker.unpackString());
assertEquals("KafkaClusterId", unpacker.unpackString());
unpacker.unpackString(); // skip cluster id value
assertEquals("ConsumerGroup", unpacker.unpackString());
unpacker.unpackString(); // skip consumer group value
assertEquals("ConnectionStatus", unpacker.unpackString());
assertEquals("connected", unpacker.unpackString());
assertEquals("Config", unpacker.unpackString());
int configSize = unpacker.unpackMapHeader();
Map<String, String> configEntries = new HashMap<>();
Expand Down
Loading