Skip to content

Commit 11fb9ed

Browse files
authored
feat(adxexporter): Switch MetricsExporter to OTLP push mode (#1029)
* feat(adxexporter): Switch MetricsExporter to OTLP push mode Replace Prometheus scraping with direct OTLP push using PromToOtlpExporter. Changes: - Add ToWriteRequest() to transform MetricData to prompb.WriteRequest with pooling - Update MetricsExporterReconciler to use PromToOtlpExporter instead of OTel Meter - Update CLI: --otlp-endpoint now required, remove deprecated --metrics-* flags - Update tests to use mock OTLP server instead of metrics scraping - Update documentation for OTLP push mode Benefits: - Timestamp fidelity: preserves actual KQL query timestamps - Memory efficiency: uses pkg/prompb object pooling for high cardinality - Simpler deployment: no Collector scraping configuration needed - Reuses battle-tested PromToOtlpExporter from Collector * Add --add-resource-attributes flag to AdxExporter - Add --add-resource-attributes CLI flag for specifying OTLP resource attributes - Merge ClusterLabels (base) with AddResourceAttributes (can override or add new) - Add parseKeyValuePairs() helper function - Add tests for resource attribute merging behavior * fix(adxexporter): Add --add-resource-attributes flag to AdxExporter Switches MetricsExporter from Prometheus scraping to OTLP push mode. The metricNamePrefix field in the CRD controls the full metric name prefix. Users who previously had metrics with 'adxexporter_' prefix should update their CRDs to include it explicitly in metricNamePrefix, e.g.: metricNamePrefix: adxexporter_cluster_autoscaler * feat(adxexporter): Add --default-metric-name-prefix CLI flag Adds a new CLI flag that sets a default prefix for metric names when the MetricsExporter CRD doesn't specify a metricNamePrefix. This provides: 1. Cluster-wide default naming convention without modifying every CRD 2. CRD-level override when specific prefixes are needed 3. Better user experience - operators configure once, users don't need to remember to add prefixes Metric name precedence: 1. CRD's metricNamePrefix (if specified) 2. CLI's --default-metric-name-prefix (if CRD prefix is empty) 3. No prefix (if both are empty) Example deployment: adxexporter --default-metric-name-prefix=adxexporter ... This produces metrics like 'adxexporter_cluster_autoscaler_availability_*' without requiring each MetricsExporter CRD to specify the prefix. * feat(adxexporter): Change --metric-name-prefix to always prepend to CRD prefix BREAKING CHANGE: Renamed --default-metric-name-prefix to --metric-name-prefix The new behavior always prepends the CLI prefix to metric names, then appends the CRD's metricNamePrefix. This allows operators to enforce a global prefix (e.g., for allow-list compliance) while teams can still add sub-prefixes via CRD. Old behavior (--default-metric-name-prefix): - CLI: adxexporter, CRD: custom → Result: custom_* (CRD wins) New behavior (--metric-name-prefix): - CLI: adxexporter, CRD: custom → Result: adxexporter_custom_* - CLI: adxexporter, CRD: empty → Result: adxexporter_* - CLI: empty, CRD: custom → Result: custom_* This ensures all MetricsExporter metrics can be prefixed uniformly without requiring each CRD to manually include the global prefix.
1 parent 9da11d1 commit 11fb9ed

File tree

11 files changed

+1325
-260
lines changed

11 files changed

+1325
-260
lines changed

adxexporter/criteriaexpression_test.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ package adxexporter
33
import (
44
"context"
55
"fmt"
6+
"io"
7+
"net/http"
8+
"net/http/httptest"
69
"testing"
710
"time"
811

@@ -158,15 +161,25 @@ func TestMetricsExporter_CriteriaExpressionProcessing(t *testing.T) {
158161
if tt.expectExecuted {
159162
mockKusto.SetNextResult(t, [][]interface{}{{"test_metric", 1.0, time.Now()}})
160163
}
164+
165+
// Create mock OTLP server
166+
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
167+
io.Copy(io.Discard, r.Body)
168+
r.Body.Close()
169+
w.WriteHeader(http.StatusOK)
170+
w.Write([]byte{})
171+
}))
172+
defer mockServer.Close()
173+
161174
reconciler := &MetricsExporterReconciler{
162-
Client: fakeClient,
163-
Scheme: s,
164-
ClusterLabels: labels,
165-
KustoClusters: map[string]string{"test-db": "https://test-cluster.kusto.windows.net"},
166-
QueryExecutors: map[string]*QueryExecutor{"test-db": NewQueryExecutor(mockKusto)},
167-
EnableMetricsEndpoint: false,
175+
Client: fakeClient,
176+
Scheme: s,
177+
ClusterLabels: labels,
178+
KustoClusters: map[string]string{"test-db": "https://test-cluster.kusto.windows.net"},
179+
QueryExecutors: map[string]*QueryExecutor{"test-db": NewQueryExecutor(mockKusto)},
180+
OTLPEndpoint: mockServer.URL,
168181
}
169-
require.NoError(t, reconciler.exposeMetricsServer())
182+
require.NoError(t, reconciler.initOtlpExporter())
170183
req := reconcile.Request{NamespacedName: types.NamespacedName{Name: me.Name, Namespace: me.Namespace}}
171184
res, err := reconciler.Reconcile(context.Background(), req)
172185
require.NoError(t, err)

adxexporter/metricsexporter.go

Lines changed: 69 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,21 @@ package adxexporter
33
import (
44
"context"
55
"fmt"
6+
"strings"
67
"time"
78

89
adxmonv1 "github.com/Azure/adx-mon/api/v1"
10+
"github.com/Azure/adx-mon/collector/export"
911
"github.com/Azure/adx-mon/pkg/kustoutil"
1012
"github.com/Azure/adx-mon/pkg/logger"
13+
"github.com/Azure/adx-mon/pkg/prompb"
1114
"github.com/Azure/adx-mon/transform"
12-
"go.opentelemetry.io/otel"
13-
"go.opentelemetry.io/otel/attribute"
14-
"go.opentelemetry.io/otel/exporters/prometheus"
15-
"go.opentelemetry.io/otel/metric"
16-
"go.opentelemetry.io/otel/metric/noop"
17-
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
1815
"k8s.io/apimachinery/pkg/api/meta"
1916
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2017
"k8s.io/apimachinery/pkg/runtime"
2118
"k8s.io/utils/clock"
2219
ctrl "sigs.k8s.io/controller-runtime"
2320
"sigs.k8s.io/controller-runtime/pkg/client"
24-
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
2521
)
2622

2723
// MetricsExporterReconciler reconciles MetricsExporter objects
@@ -33,16 +29,18 @@ type MetricsExporterReconciler struct {
3329
ClusterLabels map[string]string
3430
KustoClusters map[string]string // database name -> endpoint URL
3531
OTLPEndpoint string
36-
EnableMetricsEndpoint bool
37-
MetricsPort string // Used for controller-runtime metrics server configuration
38-
MetricsPath string // For documentation/consistency (controller-runtime uses /metrics)
32+
AddResourceAttributes map[string]string // Additional OTLP resource attributes (merged with ClusterLabels)
33+
MetricNamePrefix string // Global prefix prepended to all metric names (combined with CRD prefix)
34+
EnableMetricsEndpoint bool // Deprecated: kept for backward compatibility, not used with OTLP push
35+
MetricsPort string // Deprecated: kept for backward compatibility
36+
MetricsPath string // Deprecated: kept for backward compatibility
3937

4038
// Query execution components
4139
QueryExecutors map[string]*QueryExecutor // keyed by database name
4240
Clock clock.Clock
4341

44-
// Metrics components
45-
Meter metric.Meter
42+
// OTLP push client for metrics delivery
43+
OtlpExporter *export.PromToOtlpExporter
4644
}
4745

4846
// Reconcile handles MetricsExporter reconciliation
@@ -101,34 +99,41 @@ func (r *MetricsExporterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
10199
return ctrl.Result{RequeueAfter: requeueAfter}, nil
102100
}
103101

104-
func (r *MetricsExporterReconciler) exposeMetricsServer() error {
105-
if !r.EnableMetricsEndpoint {
106-
r.Meter = noop.NewMeterProvider().Meter("noop")
107-
return nil
102+
func (r *MetricsExporterReconciler) initOtlpExporter() error {
103+
if r.OTLPEndpoint == "" {
104+
return fmt.Errorf("OTLP endpoint is required: specify --otlp-endpoint")
108105
}
109106

110-
// Register with controller-runtime's shared metrics registry, replacing the default registry
111-
exporter, err := prometheus.New(
112-
prometheus.WithRegisterer(crmetrics.Registry),
113-
// Adds a namespace prefix to all metrics
114-
prometheus.WithNamespace("adxexporter"),
115-
// Disables the long otel specific scope string since we're only exposing through metrics
116-
prometheus.WithoutScopeInfo(),
117-
)
118-
if err != nil {
119-
return fmt.Errorf("failed to create metrics exporter: %w", err)
107+
// Create resource attributes by merging cluster labels with explicit attributes.
108+
// ClusterLabels provide a base set of resource attributes for identification.
109+
// AddResourceAttributes allows explicit overrides or additional attributes.
110+
resourceAttrs := make(map[string]string, len(r.ClusterLabels)+len(r.AddResourceAttributes))
111+
for k, v := range r.ClusterLabels {
112+
resourceAttrs[k] = v
113+
}
114+
// Explicit attributes take precedence over cluster labels
115+
for k, v := range r.AddResourceAttributes {
116+
resourceAttrs[k] = v
117+
}
118+
119+
// Create a pass-through transformer (no filtering - KQL already selected the data)
120+
transformer := &transform.RequestTransformer{
121+
DefaultDropMetrics: false,
120122
}
121-
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(exporter))
122-
otel.SetMeterProvider(provider)
123123

124-
r.Meter = otel.GetMeterProvider().Meter("adxexporter")
124+
opts := export.PromToOtlpExporterOpts{
125+
Transformer: transformer,
126+
Destination: r.OTLPEndpoint,
127+
AddResourceAttributes: resourceAttrs,
128+
}
125129

130+
r.OtlpExporter = export.NewPromToOtlpExporter(opts)
126131
return nil
127132
}
128133

129134
// SetupWithManager sets up the service with the Manager
130135
func (r *MetricsExporterReconciler) SetupWithManager(mgr ctrl.Manager) error {
131-
if err := r.exposeMetricsServer(); err != nil {
136+
if err := r.initOtlpExporter(); err != nil {
132137
return err
133138
}
134139

@@ -265,7 +270,7 @@ func (r *MetricsExporterReconciler) executeMetricsExporter(ctx context.Context,
265270
return true, nil
266271
}
267272

268-
// transformAndRegisterMetrics converts KQL query results to metrics and registers them
273+
// transformAndRegisterMetrics converts KQL query results to metrics and pushes them via OTLP
269274
func (r *MetricsExporterReconciler) transformAndRegisterMetrics(ctx context.Context, me *adxmonv1.MetricsExporter, rows []map[string]any) error {
270275
if len(rows) == 0 {
271276
if logger.IsDebug() {
@@ -274,17 +279,32 @@ func (r *MetricsExporterReconciler) transformAndRegisterMetrics(ctx context.Cont
274279
return nil
275280
}
276281

282+
// Build the effective metric name prefix by combining CLI and CRD prefixes:
283+
// - CLI prefix (--metric-name-prefix) is always prepended first if set
284+
// - CRD prefix (transform.metricNamePrefix) is appended after CLI prefix if set
285+
// This ensures operators can enforce a global prefix (e.g., for allow-lists)
286+
// while teams can still add their own sub-prefixes via CRD.
287+
var prefixParts []string
288+
if r.MetricNamePrefix != "" {
289+
prefixParts = append(prefixParts, r.MetricNamePrefix)
290+
}
291+
if me.Spec.Transform.MetricNamePrefix != "" {
292+
prefixParts = append(prefixParts, me.Spec.Transform.MetricNamePrefix)
293+
}
294+
effectivePrefix := strings.Join(prefixParts, "_")
295+
277296
// Create transformer with the MetricsExporter's transform configuration
297+
// Note: meter parameter is nil since we're using OTLP push instead of OTel SDK registration
278298
transformer := transform.NewKustoToMetricsTransformer(
279299
transform.TransformConfig{
280300
MetricNameColumn: me.Spec.Transform.MetricNameColumn,
281-
MetricNamePrefix: me.Spec.Transform.MetricNamePrefix,
301+
MetricNamePrefix: effectivePrefix,
282302
ValueColumns: me.Spec.Transform.ValueColumns,
283303
TimestampColumn: me.Spec.Transform.TimestampColumn,
284304
LabelColumns: me.Spec.Transform.LabelColumns,
285305
DefaultMetricName: me.Spec.Transform.DefaultMetricName,
286306
},
287-
r.Meter,
307+
nil, // meter not needed for OTLP push path
288308
)
289309

290310
// Validate the transform configuration against the query results
@@ -298,44 +318,32 @@ func (r *MetricsExporterReconciler) transformAndRegisterMetrics(ctx context.Cont
298318
return fmt.Errorf("failed to transform rows to metrics: %w", err)
299319
}
300320

301-
if err := r.registerMetrics(ctx, metrics); err != nil {
302-
return fmt.Errorf("failed to register metrics: %w", err)
321+
if err := r.pushMetrics(ctx, metrics); err != nil {
322+
return fmt.Errorf("failed to push metrics: %w", err)
303323
}
304324

305-
logger.Infof("Successfully transformed and registered %d metrics for MetricsExporter %s/%s",
325+
logger.Infof("Successfully transformed and pushed %d metrics for MetricsExporter %s/%s",
306326
len(metrics), me.Namespace, me.Name)
307327

308328
return nil
309329
}
310330

311-
func (r *MetricsExporterReconciler) registerMetrics(ctx context.Context, metrics []transform.MetricData) error {
312-
// Group metrics by name for efficient registration
313-
metricsByName := make(map[string][]transform.MetricData)
314-
for _, metric := range metrics {
315-
metricsByName[metric.Name] = append(metricsByName[metric.Name], metric)
331+
// pushMetrics converts MetricData to WriteRequest and sends via OTLP
332+
func (r *MetricsExporterReconciler) pushMetrics(ctx context.Context, metrics []transform.MetricData) error {
333+
if len(metrics) == 0 {
334+
return nil
316335
}
317336

318-
// Register each unique metric name as a gauge
319-
for metricName, metricData := range metricsByName {
320-
gauge, err := r.Meter.Float64Gauge(metricName)
321-
if err != nil {
322-
return fmt.Errorf("failed to create gauge for metric '%s': %w", metricName, err)
323-
}
324-
325-
// Record all data points for this metric
326-
for _, data := range metricData {
327-
// Convert labels to OpenTelemetry attributes
328-
attrs := make([]attribute.KeyValue, 0, len(data.Labels))
329-
for key, value := range data.Labels {
330-
attrs = append(attrs, attribute.String(key, value))
331-
}
337+
// Convert to WriteRequest using pooled objects for efficiency
338+
wr := transform.ToWriteRequest(metrics)
339+
defer func() {
340+
// Return pooled objects
341+
wr.Reset()
342+
prompb.WriteRequestPool.Put(wr)
343+
}()
332344

333-
// Record the metric value
334-
gauge.Record(ctx, data.Value, metric.WithAttributes(attrs...))
335-
}
336-
}
337-
338-
return nil
345+
// Push via OTLP exporter
346+
return r.OtlpExporter.Write(ctx, wr)
339347
}
340348

341349
// initializeQueryExecutors creates QueryExecutors for all configured databases

0 commit comments

Comments
 (0)