Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ private AuditServerConstants() {}
public static final String PROP_BUFFER_PARTITIONS = "kafka.topic.partitions.buffer";
public static final String PROP_REPLICATION_FACTOR = "kafka.replication.factor";

// Dynamic partition plan (Kafka compacted registry topic)
public static final String PROP_PARTITION_PLAN_TOPIC = "kafka.partition.plan.topic";
public static final String PROP_PARTITION_PLAN_REFRESH_INTERVAL_MS = "kafka.partition.plan.refresh.interval.ms";
public static final String PROP_PARTITION_PLAN_CONSUMER_POLL_TIMEOUT_MS = "kafka.partition.plan.consumer.poll.timeout.ms";
public static final String PROP_PARTITION_PLAN_DYNAMIC_ENABLED = "kafka.partition.plan.dynamic.enabled";
public static final String PROP_PARTITION_PLAN_ALLOWED_USERS = "kafka.partition.plan.allowed.users";
public static final String DEFAULT_PARTITION_PLAN_TOPIC = "ranger_audit_partition_plan";
public static final int DEFAULT_PARTITION_PLAN_REFRESH_INTERVAL_MS = 30000;
public static final int DEFAULT_PARTITION_PLAN_CONSUMER_POLL_TIMEOUT_MS = 500;
public static final int PARTITION_PLAN_TOPIC_PARTITION_COUNT = 1;
public static final String KAFKA_TOPIC_CLEANUP_POLICY_COMPACT = "compact";

// Kafka producer tuning (ranger.audit.ingestor.kafka.producer.*)
public static final String PROP_KAFKA_PRODUCER_PREFIX = "kafka.producer.";
public static final String PROP_PRODUCER_BATCH_SIZE = "batch.size";
Expand Down Expand Up @@ -108,9 +120,9 @@ private AuditServerConstants() {}
public static final long DEFAULT_OFFSET_COMMIT_INTERVAL_MS = 30000; // 30 seconds
public static final int DEFAULT_MAX_POLL_RECORDS = 500; // Kafka default batch size

// Default configured plugins: each gets allocated partitions from the topic
// Empty by default: operators opt in via XML (static) or REST (dynamic). See ranger-audit-ingestor-site.xml.
public static final String DEFAULT_PARTITIONER_CLASS = "org.apache.ranger.audit.producer.kafka.AuditPartitioner";
public static final String DEFAULT_CONFIGURED_PLUGINS = "hdfs,yarn,knox,hiveServer2,hiveMetastore,kafka,hbaseRegional,hbaseMaster,solr,trino,ozone,kudu,nifi";
public static final String DEFAULT_CONFIGURED_PLUGINS = "";
public static final short DEFAULT_REPLICATION_FACTOR = 3;
public static final int DEFAULT_TOPIC_PARTITIONS = 10;
public static final int DEFAULT_PARTITIONS_PER_CONFIGURED_PLUGIN = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.ranger.audit.provider.MiscUtil;
import org.apache.ranger.audit.server.AuditServerConstants;
import org.slf4j.Logger;
Expand All @@ -39,6 +40,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;

public class AuditMessageQueueUtils {
private static final Logger LOG = LoggerFactory.getLogger(AuditMessageQueueUtils.class);
Expand All @@ -50,30 +52,13 @@ public static String createAuditsTopicIfNotExists(Properties props, String propP
LOG.info("==> AuditMessageQueueUtils:createAuditsTopicIfNotExists(propPrefix={})", propPrefix);

String ret = null;
String topicName = MiscUtil.getStringProperty(props, propPrefix + "." + AuditServerConstants.PROP_TOPIC_NAME, AuditServerConstants.DEFAULT_TOPIC);
String bootstrapServers = MiscUtil.getStringProperty(props, propPrefix + "." + AuditServerConstants.PROP_BOOTSTRAP_SERVERS);
String securityProtocol = MiscUtil.getStringProperty(props, propPrefix + "." + AuditServerConstants.PROP_SECURITY_PROTOCOL, AuditServerConstants.DEFAULT_SECURITY_PROTOCOL);
String saslMechanism = MiscUtil.getStringProperty(props, propPrefix + "." + AuditServerConstants.PROP_SASL_MECHANISM, AuditServerConstants.DEFAULT_SASL_MECHANISM);
int connMaxIdleTimeoutMS = MiscUtil.getIntProperty(props, propPrefix + "." + AuditServerConstants.PROP_CONN_MAX_IDEAL_MS, 10000);
int partitions = getPartitions(props, propPrefix);
short replicationFactor = (short) MiscUtil.getIntProperty(props, propPrefix + "." + AuditServerConstants.PROP_REPLICATION_FACTOR, AuditServerConstants.DEFAULT_REPLICATION_FACTOR);
int reqTimeoutMS = MiscUtil.getIntProperty(props, propPrefix + "." + AuditServerConstants.PROP_REQ_TIMEOUT_MS, 5000);
int maxAttempts = MiscUtil.getIntProperty(props, propPrefix + "." + AuditServerConstants.PROP_KAFKA_TOPIC_INIT_MAX_RETRIES, 10) + 1;
int retryDelayMs = MiscUtil.getIntProperty(props, propPrefix + "." + AuditServerConstants.PROP_KAFKA_TOPIC_INIT_RETRY_DELAY_MS, 3000);
String topicName = MiscUtil.getStringProperty(props, propPrefix + "." + AuditServerConstants.PROP_TOPIC_NAME, AuditServerConstants.DEFAULT_TOPIC);
int partitions = getPartitions(props, propPrefix);
short replicationFactor = (short) MiscUtil.getIntProperty(props, propPrefix + "." + AuditServerConstants.PROP_REPLICATION_FACTOR, AuditServerConstants.DEFAULT_REPLICATION_FACTOR);
int maxAttempts = MiscUtil.getIntProperty(props, propPrefix + "." + AuditServerConstants.PROP_KAFKA_TOPIC_INIT_MAX_RETRIES, 10) + 1;
int retryDelayMs = MiscUtil.getIntProperty(props, propPrefix + "." + AuditServerConstants.PROP_KAFKA_TOPIC_INIT_RETRY_DELAY_MS, 3000);

Map<String, Object> kafkaProp = new HashMap<>();

kafkaProp.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaProp.put("sasl.mechanism", saslMechanism);
kafkaProp.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);

if (securityProtocol != null && securityProtocol.toUpperCase().contains("SASL")) {
kafkaProp.put(AuditServerConstants.PROP_SASL_JAAS_CONFIG, getJAASConfig(props, propPrefix));
kafkaProp.put(AuditServerConstants.PROP_SASL_KERBEROS_SERVICE_NAME, AuditServerConstants.DEFAULT_SERVICE_NAME);
}

kafkaProp.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, reqTimeoutMS);
kafkaProp.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, connMaxIdleTimeoutMS);
Map<String, Object> kafkaProp = buildAdminClientConfig(props, propPrefix);

for (int currentAttempt = 1; currentAttempt <= maxAttempts && ret == null; currentAttempt++) {
try (AdminClient admin = AdminClient.create(kafkaProp)) {
Expand All @@ -92,7 +77,7 @@ public static String createAuditsTopicIfNotExists(Properties props, String propP
LOG.info("Topic '{}' configs: {}", topicName, topicConfigs);
}

admin.createTopics(Collections.singletonList(topic)).all().get();
createTopicIgnoringAlreadyExists(admin, topic);

ret = topic.name();

Expand Down Expand Up @@ -143,6 +128,116 @@ public static String createAuditsTopicIfNotExists(Properties props, String propP
return ret;
}

/**
* Create the compacted partition-plan registry topic if missing.
* Always exactly one partition ({@link AuditServerConstants#PARTITION_PLAN_TOPIC_PARTITION_COUNT});
* partition count is not configurable. A single partition is required so every plan version compacts
* under one key on one log end. Safe when multiple ingestor pods start together (concurrent topic creation).
*/
public static String createPartitionPlanTopicIfNotExists(Properties props, String propPrefix) {
String planTopic = MiscUtil.getStringProperty(props, propPrefix + "." + AuditServerConstants.PROP_PARTITION_PLAN_TOPIC, AuditServerConstants.DEFAULT_PARTITION_PLAN_TOPIC);
short replicationFactor = (short) MiscUtil.getIntProperty(props, propPrefix + "." + AuditServerConstants.PROP_REPLICATION_FACTOR, AuditServerConstants.DEFAULT_REPLICATION_FACTOR);
int planTopicPartitions = AuditServerConstants.PARTITION_PLAN_TOPIC_PARTITION_COUNT;
int maxAttempts = MiscUtil.getIntProperty(props, propPrefix + "." + AuditServerConstants.PROP_KAFKA_TOPIC_INIT_MAX_RETRIES, 10) + 1;
int retryDelayMs = MiscUtil.getIntProperty(props, propPrefix + "." + AuditServerConstants.PROP_KAFKA_TOPIC_INIT_RETRY_DELAY_MS, 3000);
Map<String, Object> adminConfig = buildAdminClientConfig(props, propPrefix);

for (int attempt = 1; attempt <= maxAttempts; attempt++) {
try (AdminClient admin = AdminClient.create(adminConfig)) {
Set<String> topicNames = admin.listTopics().names().get();
if (!topicNames.contains(planTopic)) {
LOG.info("Creating partition plan topic '{}' with {} partition and replication factor {}", planTopic, planTopicPartitions, replicationFactor);
NewTopic topic = new NewTopic(planTopic, planTopicPartitions, replicationFactor);
topic.configs(Collections.singletonMap("cleanup.policy", AuditServerConstants.KAFKA_TOPIC_CLEANUP_POLICY_COMPACT));
createTopicIgnoringAlreadyExists(admin, topic);
AuditServerUtils.waitUntilTopicReady(admin, planTopic, Duration.ofSeconds(60));
int partitionCount = describeTopicPartitionCount(admin, planTopic);
LOG.info("Partition plan topic '{}' is ready with {} partition(s)", planTopic, partitionCount);
} else {
int partitionCount = describeTopicPartitionCount(admin, planTopic);
requirePlanTopicPartitionCount(planTopic, partitionCount, planTopicPartitions);
LOG.info("Partition plan topic '{}' already exists with {} partition(s)", planTopic, partitionCount);
}
return planTopic;
} catch (Exception ex) {
if (attempt < maxAttempts) {
LOG.warn("Failed to ensure partition plan topic on attempt {}/{}. Retrying in {} ms. Error: {}", attempt, maxAttempts, retryDelayMs, ex.getMessage());
try {
Thread.sleep(retryDelayMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
} else {
LOG.error("Failed to create partition plan topic '{}' after {} attempts", planTopic, attempt, ex);
throw new RuntimeException("Failed to create partition plan topic '" + planTopic + "' after " + attempt + " attempts", ex);
}
}
}
throw new RuntimeException("Failed to create partition plan topic '" + planTopic + "'");
}

/**
* Returns whether the compacted partition-plan registry topic already exists on the audit Kafka cluster.
* When the check fails (broker unreachable, ACL denied), returns {@code false} so callers can fall back
* to static XML {@code auth_to_local} rules until the plan topic is available.
*/
public static boolean partitionPlanTopicExists(Properties props, String propPrefix) {
String planTopic = MiscUtil.getStringProperty(props, propPrefix + "." + AuditServerConstants.PROP_PARTITION_PLAN_TOPIC, AuditServerConstants.DEFAULT_PARTITION_PLAN_TOPIC);
Map<String, Object> adminConfig = buildAdminClientConfig(props, propPrefix);
try (AdminClient admin = AdminClient.create(adminConfig)) {
Set<String> topicNames = admin.listTopics().names().get();
boolean exists = topicNames.contains(planTopic);
LOG.debug("Partition plan topic '{}' exists: {}", planTopic, exists);
return exists;
} catch (Exception ex) {
LOG.warn("Could not determine whether partition plan topic '{}' exists: {}. Assuming it does not.", planTopic, ex.getMessage());
return false;
}
}

public static Map<String, Object> buildAdminClientConfig(Properties props, String propPrefix) {
String bootstrapServers = MiscUtil.getStringProperty(props, propPrefix + "." + AuditServerConstants.PROP_BOOTSTRAP_SERVERS);
String securityProtocol = MiscUtil.getStringProperty(props, propPrefix + "." + AuditServerConstants.PROP_SECURITY_PROTOCOL, AuditServerConstants.DEFAULT_SECURITY_PROTOCOL);
String saslMechanism = MiscUtil.getStringProperty(props, propPrefix + "." + AuditServerConstants.PROP_SASL_MECHANISM, AuditServerConstants.DEFAULT_SASL_MECHANISM);
int connMaxIdleTimeoutMS = MiscUtil.getIntProperty(props, propPrefix + "." + AuditServerConstants.PROP_CONN_MAX_IDEAL_MS, 10000);
int reqTimeoutMS = MiscUtil.getIntProperty(props, propPrefix + "." + AuditServerConstants.PROP_REQ_TIMEOUT_MS, 5000);

Map<String, Object> kafkaProp = new HashMap<>();
kafkaProp.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaProp.put("sasl.mechanism", saslMechanism);
kafkaProp.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);
if (securityProtocol != null && securityProtocol.toUpperCase().contains("SASL")) {
kafkaProp.put(AuditServerConstants.PROP_SASL_JAAS_CONFIG, getJAASConfig(props, propPrefix));
kafkaProp.put(AuditServerConstants.PROP_SASL_KERBEROS_SERVICE_NAME, AuditServerConstants.DEFAULT_SERVICE_NAME);
}
kafkaProp.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, reqTimeoutMS);
kafkaProp.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, connMaxIdleTimeoutMS);
return kafkaProp;
}

private static int describeTopicPartitionCount(AdminClient admin, String topicName) throws Exception {
TopicDescription topicDescription = admin.describeTopics(Collections.singletonList(topicName)).values().get(topicName).get();
return topicDescription.partitions().size();
}

private static void requirePlanTopicPartitionCount(String planTopic, int actualPartitions, int expectedPartitions) {
if (actualPartitions != expectedPartitions) {
throw new RuntimeException("Partition plan topic '" + planTopic + "' must have exactly " + expectedPartitions + " partition(s) for compacted registry semantics, but has " + actualPartitions);
}
}

private static void createTopicIgnoringAlreadyExists(AdminClient admin, NewTopic topic) throws Exception {
try {
admin.createTopics(Collections.singletonList(topic)).all().get();
} catch (ExecutionException ex) {
if (!(ex.getCause() instanceof TopicExistsException)) {
throw ex;
}
LOG.info("Topic '{}' already exists", topic.name());
}
}

public static String getJAASConfig(Properties props, String propPrefix) {
// Use ranger service principal and keytab for Kafka authentication
// This ensures consistent identity across all Ranger services and destination writes
Expand Down Expand Up @@ -215,6 +310,22 @@ public static String getJAASConfig(Properties props, String propPrefix) {
return jaasConfig;
}

/** Grows the audit topic partition count when the plan requires more partitions than exist today. */
public static void ensureTopicPartitionCount(Properties props, String propPrefix, String topicName, int requiredPartitions) {
LOG.info("Ensuring topic '{}' has at least {} partitions", topicName, requiredPartitions);
Map<String, Object> adminConfig = buildAdminClientConfig(props, propPrefix);
try (AdminClient admin = AdminClient.create(adminConfig)) {
updateExistingTopicPartitions(admin, topicName, requiredPartitions);
LOG.info("Topic '{}' partition count satisfied (required >= {})", topicName, requiredPartitions);
} catch (Exception e) {
LOG.error("Failed to ensure partition count for topic '{}' (required >= {})", topicName, requiredPartitions, e);
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new RuntimeException("Failed to ensure partition count for topic '" + topicName + "'", e);
}
}

private static String updateExistingTopicPartitions(AdminClient admin, String topicName, int partitions) {
LOG.info("==> AuditMessageQueueUtils:updateExistingTopicPartitions() topic: {}, desired partitions: {}", topicName, partitions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ranger.audit.utils;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.ranger.audit.server.AuditServerConstants;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -65,4 +66,16 @@ public void testBuildTopicConfigsMapsAllSetProperties() {
assertEquals("lz4", configs.get("compression.type"));
assertEquals("2", configs.get("min.insync.replicas"));
}

@Test
public void testBuildAdminClientConfigUsesBootstrapServers() {
Properties props = new Properties();
props.setProperty(PROP_PREFIX + "." + AuditServerConstants.PROP_BOOTSTRAP_SERVERS, "kafka:9092");
props.setProperty(PROP_PREFIX + "." + AuditServerConstants.PROP_SECURITY_PROTOCOL, "PLAINTEXT");

Map<String, Object> adminConfig = AuditMessageQueueUtils.buildAdminClientConfig(props, PROP_PREFIX);

assertEquals("kafka:9092", adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
assertEquals("PLAINTEXT", adminConfig.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG));
}
}
Loading