Skip to content

Commit 18c81c4

Browse files
authored
[8.19](backport #47265) Add support for inputs.[input_type].pipelines in beatreceivers (#47475)
* Add support for `inputs.[input_type].pipelines` in beatreceivers (#47265)
1 parent 48652dc commit 18c81c4

File tree

5 files changed

+62
-2
lines changed

5 files changed

+62
-2
lines changed

libbeat/otelbeat/beatconverter/beatconverter_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ exporters:
4747
max_retries: 3
4848
user: elastic
4949
max_conns_per_host: 1
50+
logs_dynamic_pipeline:
51+
enabled: true
5052
sending_queue:
5153
batch:
5254
flush_timeout: 10s
@@ -208,6 +210,8 @@ exporters:
208210
max_interval: 1m0s
209211
max_retries: 3
210212
user: elastic-cloud
213+
logs_dynamic_pipeline:
214+
enabled: true
211215
max_conns_per_host: 1
212216
sending_queue:
213217
batch:
@@ -431,6 +435,8 @@ exporters:
431435
elasticsearch:
432436
endpoints:
433437
- http://localhost:9200
438+
logs_dynamic_pipeline:
439+
enabled: true
434440
retry:
435441
enabled: true
436442
initial_interval: 1s

libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
5959
escfg := defaultOptions
6060

6161
// check for unsupported config
62-
err := checkUnsupportedConfig(output, logger)
62+
err := checkUnsupportedConfig(output)
6363
if err != nil {
6464
return nil, err
6565
}
@@ -134,6 +134,9 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
134134
"mapping": map[string]any{
135135
"mode": "bodymap",
136136
},
137+
"logs_dynamic_pipeline": map[string]any{
138+
"enabled": true,
139+
},
137140
}
138141

139142
// Compression
@@ -163,7 +166,7 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
163166
}
164167

165168
// log warning for unsupported config
166-
func checkUnsupportedConfig(cfg *config.C, logger *logp.Logger) error {
169+
func checkUnsupportedConfig(cfg *config.C) error {
167170
if cfg.HasField("indices") {
168171
return fmt.Errorf("indices is currently not supported: %w", errors.ErrUnsupported)
169172
} else if cfg.HasField("pipelines") {

libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ logs_index: some-index
6363
max_conns_per_host: 1
6464
password: changeme
6565
pipeline: some-ingest-pipeline
66+
logs_dynamic_pipeline:
67+
enabled: true
6668
retry:
6769
enabled: true
6870
initial_interval: 42s
@@ -109,6 +111,8 @@ api_key: "TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA"
109111
endpoints:
110112
- http://localhost:9200
111113
logs_index: some-index
114+
logs_dynamic_pipeline:
115+
enabled: true
112116
retry:
113117
enabled: true
114118
initial_interval: 1s
@@ -155,6 +159,8 @@ preset: %s
155159
`
156160

157161
commonOTelCfg := `
162+
logs_dynamic_pipeline:
163+
enabled: true
158164
endpoints:
159165
- http://localhost:9200
160166
retry:
@@ -221,6 +227,8 @@ retry:
221227
max_interval: 5m0s
222228
max_retries: 3
223229
logs_index: some-index
230+
logs_dynamic_pipeline:
231+
enabled: true
224232
password: changeme
225233
user: elastic
226234
max_conns_per_host: 1
@@ -315,6 +323,8 @@ retry:
315323
initial_interval: 1s
316324
max_interval: 1m0s
317325
max_retries: 3
326+
logs_dynamic_pipeline:
327+
enabled: true
318328
max_conns_per_host: 1
319329
user: elastic
320330
sending_queue:

x-pack/libbeat/outputs/otelconsumer/otelconsumer.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,13 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)
115115
}
116116
}
117117

118+
// if pipeline field is set on event metadata
119+
if pipeline, err := event.Content.Meta.GetValue("pipeline"); err == nil {
120+
if s, ok := pipeline.(string); ok {
121+
logRecord.Attributes().PutStr("elasticsearch.ingest_pipeline", s)
122+
}
123+
}
124+
118125
beatEvent := event.Content.Fields.Clone()
119126
if beatEvent == nil {
120127
beatEvent = mapstr.M{}

x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,40 @@ func TestPublish(t *testing.T) {
108108
}
109109
})
110110

111+
t.Run("elasticsearch.ingest_pipeline fields are set on logrecord.Attribute", func(t *testing.T) {
112+
event1.Meta = mapstr.M{}
113+
event1.Meta["pipeline"] = "error_pipeline"
114+
115+
batch := outest.NewBatch(event1)
116+
117+
var countLogs int
118+
var attributes pcommon.Map
119+
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
120+
countLogs = countLogs + ld.LogRecordCount()
121+
for i := 0; i < ld.ResourceLogs().Len(); i++ {
122+
resourceLog := ld.ResourceLogs().At(i)
123+
for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
124+
scopeLog := resourceLog.ScopeLogs().At(j)
125+
for k := 0; k < scopeLog.LogRecords().Len(); k++ {
126+
LogRecord := scopeLog.LogRecords().At(k)
127+
attributes = LogRecord.Attributes()
128+
}
129+
}
130+
}
131+
return nil
132+
})
133+
134+
err := otelConsumer.Publish(ctx, batch)
135+
assert.NoError(t, err)
136+
assert.Len(t, batch.Signals, 1)
137+
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)
138+
139+
dynamicAttributeKey := "elasticsearch.ingest_pipeline"
140+
gotValue, ok := attributes.Get(dynamicAttributeKey)
141+
require.True(t, ok, "dynamic pipeline attribute was not set")
142+
assert.EqualValues(t, "error_pipeline", gotValue.AsString())
143+
})
144+
111145
t.Run("retries the batch on non-permanent consumer error", func(t *testing.T) {
112146
batch := outest.NewBatch(event1, event2, event3)
113147

0 commit comments

Comments
 (0)