@@ -7,11 +7,9 @@ package fbreceiver
77import (
88 "bytes"
99 "context"
10- "encoding/base64"
1110 "encoding/json"
1211 "fmt"
1312 "io"
14- "math/rand/v2"
1513 "net"
1614 "net/http"
1715 "net/url"
@@ -23,9 +21,7 @@ import (
2321 "sync/atomic"
2422 "testing"
2523
26- "github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
27- "github.com/elastic/elastic-agent-libs/mapstr"
28-
24+ "github.com/gofrs/uuid/v5"
2925 "github.com/stretchr/testify/assert"
3026 "github.com/stretchr/testify/require"
3127 "go.opentelemetry.io/collector/component"
@@ -36,16 +32,14 @@ import (
3632 "go.uber.org/zap"
3733 "go.uber.org/zap/zapcore"
3834 "go.uber.org/zap/zaptest/observer"
35+
36+ "github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
37+ "github.com/elastic/elastic-agent-libs/mapstr"
3938)
4039
4140func TestNewReceiver (t * testing.T ) {
42- monitorSocket := genSocketPath ()
43- var monitorHost string
44- if runtime .GOOS == "windows" {
45- monitorHost = "npipe:///" + filepath .Base (monitorSocket )
46- } else {
47- monitorHost = "unix://" + monitorSocket
48- }
41+ monitorSocket := genSocketPath (t )
42+ monitorHost := hostFromSocket (monitorSocket )
4943 config := Config {
5044 Beatconfig : map [string ]any {
5145 "filebeat" : map [string ]any {
@@ -170,150 +164,137 @@ func benchmarkFactoryWithLogLevel(b *testing.B, level zapcore.Level) {
170164 }
171165}
172166
173- func TestMultipleReceivers (t * testing.T ) {
174- // This test verifies that multiple receivers can be instantiated
175- // in isolation, started, and can ingest logs without interfering
176- // with each other.
177-
178- // Receivers need distinct home directories so wrap the config in a function.
179- config := func (monitorSocket string , homePath string , ingestPath string ) * Config {
180- var monitorHost string
181- if runtime .GOOS == "windows" {
182- monitorHost = "npipe:///" + filepath .Base (monitorSocket )
183- } else {
184- monitorHost = "unix://" + monitorSocket
185- }
186- return & Config {
187- Beatconfig : map [string ]any {
188- "filebeat" : map [string ]any {
189- "inputs" : []map [string ]any {
190- {
191- "type" : "benchmark" ,
192- "enabled" : true ,
193- "message" : "test" ,
194- "count" : 1 ,
195- },
196- {
197- "type" : "filestream" ,
198- "enabled" : true ,
199- "id" : "must-be-unique" ,
200- "paths" : []string {ingestPath },
201- "file_identity.native" : nil ,
202- },
167+ // multiReceiverConfig creates a Config for testing multiple receivers.
168+ // Each receiver gets a unique home path.
169+ func multiReceiverConfig (helper multiReceiverHelper ) * Config {
170+ return & Config {
171+ Beatconfig : map [string ]any {
172+ "filebeat" : map [string ]any {
173+ "inputs" : []map [string ]any {
174+ {
175+ "type" : "benchmark" ,
176+ "enabled" : true ,
177+ "message" : "test" ,
178+ "count" : 1 ,
203179 },
204- },
205- "output" : map [string ]any {
206- "otelconsumer" : map [string ]any {},
207- },
208- "logging" : map [string ]any {
209- "level" : "info" ,
210- "selectors" : []string {
211- "*" ,
180+ {
181+ "type" : "filestream" ,
182+ "enabled" : true ,
183+ "id" : "must-be-unique" ,
184+ "paths" : []string {helper .ingest },
185+ "file_identity.native" : nil ,
212186 },
213187 },
214- "path.home" : homePath ,
215- "http.enabled" : true ,
216- "http.host" : monitorHost ,
217188 },
218- }
189+ "output" : map [string ]any {
190+ "otelconsumer" : map [string ]any {},
191+ },
192+ "logging" : map [string ]any {
193+ "level" : "info" ,
194+ "selectors" : []string {
195+ "*" ,
196+ },
197+ },
198+ "path.home" : helper .home ,
199+ "http.enabled" : true ,
200+ "http.host" : hostFromSocket (helper .monitorSocket ),
201+ },
219202 }
203+ }
204+
205+ type multiReceiverHelper struct {
206+ name string
207+ home string
208+ ingest string
209+ monitorSocket string
210+ }
211+
212+ func newMultiReceiverHelper (t * testing.T , number int ) multiReceiverHelper {
213+ return multiReceiverHelper {
214+ name : fmt .Sprintf ("r%d" , number ),
215+ home : t .TempDir (),
216+ ingest : filepath .Join (t .TempDir (), fmt .Sprintf ("test%d.log" , number )),
217+ monitorSocket : genSocketPath (t ),
218+ }
219+ }
220+
221+ // TestMultipleReceivers verifies that multiple receivers can be instantiated in isolation, started, and can ingest logs
222+ // without interfering with each other.
223+ func TestMultipleReceivers (t * testing.T ) {
224+ const nReceivers = 2
220225
221226 factory := NewFactory ()
222- monitorSocket1 := genSocketPath ()
223- monitorSocket2 := genSocketPath ()
224- dir1 := t .TempDir ()
225- dir2 := t .TempDir ()
226- ingest1 := filepath .Join (t .TempDir (), "test1.log" )
227- ingest2 := filepath .Join (t .TempDir (), "test2.log" )
227+
228+ helpers := make ([]multiReceiverHelper , nReceivers )
229+ configs := make ([]oteltest.ReceiverConfig , nReceivers )
230+ for i := range helpers {
231+ helper := newMultiReceiverHelper (t , i )
232+ helpers [i ] = helper
233+ configs [i ] = oteltest.ReceiverConfig {
234+ Name : helper .name ,
235+ Beat : "filebeat" ,
236+ Config : multiReceiverConfig (helper ),
237+ Factory : factory ,
238+ }
239+ }
240+
228241 oteltest .CheckReceivers (oteltest.CheckReceiversParams {
229242 T : t ,
230243 NumRestarts : 5 ,
231- Receivers : []oteltest.ReceiverConfig {
232- {
233- Name : "r1" ,
234- Beat : "filebeat" ,
235- Config : config (monitorSocket1 , dir1 , ingest1 ),
236- Factory : factory ,
237- },
238- {
239- Name : "r2" ,
240- Beat : "filebeat" ,
241- Config : config (monitorSocket2 , dir2 , ingest2 ),
242- Factory : factory ,
243- },
244- },
244+ Receivers : configs ,
245245 AssertFunc : func (c * assert.CollectT , logs map [string ][]mapstr.M , zapLogs * observer.ObservedLogs ) {
246- // Add data to be ingested with filestream
247- f1 , err := os .OpenFile (ingest1 , os .O_APPEND | os .O_CREATE | os .O_WRONLY , 0o644 )
248- require .NoError (c , err )
249- _ , err = f1 .WriteString ("A log line\n " )
250- require .NoError (c , err )
251- f1 .Close ()
252- f2 , err := os .OpenFile (ingest2 , os .O_APPEND | os .O_CREATE | os .O_WRONLY , 0o644 )
253- require .NoError (c , err )
254- _ , err = f2 .WriteString ("A log line\n " )
255- require .NoError (c , err )
256- f2 .Close ()
257-
258- require .Greater (c , len (logs ["r1" ]), 0 , "receiver r1 does not have any logs" )
259- require .Greater (c , len (logs ["r2" ]), 0 , "receiver r2 does not have any logs" )
260-
261- assert .Equal (c , "test" , logs ["r1" ][0 ].Flatten ()["message" ], "expected r1 message field to be 'test'" )
262- assert .Equal (c , "test" , logs ["r2" ][0 ].Flatten ()["message" ], "expected r2 message field to be 'test'" )
263-
264- // Make sure that each receiver has a separate logger
265- // instance and does not interfere with others. Previously, the
266- // logger in Beats was global, causing logger fields to be
267- // overwritten when multiple receivers started in the same process.
268- r1StartLogs := zapLogs .FilterMessageSnippet ("Beat ID" ).FilterField (zap .String ("otelcol.component.id" , "filebeatreceiver/r1" ))
269- assert .Equal (c , 1 , r1StartLogs .Len (), "r1 should have a single start log" )
270- r2StartLogs := zapLogs .FilterMessageSnippet ("Beat ID" ).FilterField (zap .String ("otelcol.component.id" , "filebeatreceiver/r2" ))
271- assert .Equal (c , 1 , r2StartLogs .Len (), "r2 should have a single start log" )
272-
273- meta1Path := filepath .Join (dir1 , "/data/meta.json" )
274- assert .FileExists (c , meta1Path , "dir1/data/meta.json should exist" )
275- meta1Data , err := os .ReadFile (meta1Path )
276- assert .NoError (c , err )
277-
278- meta2Path := filepath .Join (dir2 , "/data/meta.json" )
279- assert .FileExists (c , meta2Path , "dir2/data/meta.json should exist" )
280- meta2Data , err := os .ReadFile (meta2Path )
281- assert .NoError (c , err )
282-
283- assert .NotEqual (c , meta1Data , meta2Data , "meta data files should be different" )
246+ allMetaData := make ([]string , 0 , nReceivers )
247+ allRegData := make ([]string , 0 , nReceivers )
248+ for _ , helper := range helpers {
249+ writeFile (c , helper .ingest , "A log line" )
250+
251+ require .Greaterf (c , len (logs [helper .name ]), 0 , "receiver %v does not have any logs" , helper )
252+
253+ assert .Equalf (c , "test" , logs [helper .name ][0 ].Flatten ()["message" ], "expected %v message field to be 'test'" , helper )
254+
255+ // Make sure that each receiver has a separate logger
256+ // instance and does not interfere with others. Previously, the
257+ // logger in Beats was global, causing logger fields to be
258+ // overwritten when multiple receivers started in the same process.
259+ startLogs := zapLogs .FilterMessageSnippet ("Beat ID" ).FilterField (zap .String ("otelcol.component.id" , "filebeatreceiver/" + helper .name ))
260+ assert .Equalf (c , 1 , startLogs .Len (), "%v should have a single start log" , helper )
261+
262+ metaPath := filepath .Join (helper .home , "/data/meta.json" )
263+ assert .FileExistsf (c , metaPath , "%s of %v should exist" , metaPath , helper )
264+ metaData , err := os .ReadFile (metaPath )
265+ assert .NoError (c , err )
266+ allMetaData = append (allMetaData , string (metaData ))
267+
268+ var lastError strings.Builder
269+ assert .Conditionf (c , func () bool {
270+ return getFromSocket (t , & lastError , helper .monitorSocket , "stats" )
271+ }, "failed to connect to monitoring socket of %v, stats endpoint, last error was: %s" , helper , & lastError )
272+ assert .Conditionf (c , func () bool {
273+ return getFromSocket (t , & lastError , helper .monitorSocket , "inputs" )
274+ }, "failed to connect to monitoring socket of %v, inputs endpoint, last error was: %s" , helper , & lastError )
275+
276+ ingestJson , err := json .Marshal (helper .ingest )
277+ assert .NoError (c , err )
278+
279+ regPath := filepath .Join (helper .home , "/data/registry/filebeat/log.json" )
280+ assert .FileExistsf (c , regPath , "receiver %v filebeat registry should exist" , helper )
281+ regData , err := os .ReadFile (regPath )
282+ allRegData = append (allRegData , string (regData ))
283+ assert .NoError (c , err )
284+ assert .Containsf (c , string (regData ), string (ingestJson ), "receiver %v registry should contain '%s', but was: %s" , helper , string (ingestJson ), string (regData ))
285+ }
284286
285- var lastError strings.Builder
286- assert .Conditionf (c , func () bool {
287- return getFromSocket (t , & lastError , monitorSocket1 , "stats" )
288- }, "failed to connect to monitoring socket1, stats endpoint, last error was: %s" , & lastError )
289- assert .Conditionf (c , func () bool {
290- return getFromSocket (t , & lastError , monitorSocket1 , "inputs" )
291- }, "failed to connect to monitoring socket1, inputs endpoint, last error was: %s" , & lastError )
292- assert .Conditionf (c , func () bool {
293- return getFromSocket (t , & lastError , monitorSocket2 , "stats" )
294- }, "failed to connect to monitoring socket2, stats endpoint, last error was: %s" , & lastError )
295- assert .Conditionf (c , func () bool {
296- return getFromSocket (t , & lastError , monitorSocket2 , "inputs" )
297- }, "failed to connect to monitoring socket2, inputs endpoint, last error was: %s" , & lastError )
298-
299- ingest1Json , err := json .Marshal (ingest1 )
300- require .NoError (c , err )
301- ingest2Json , err := json .Marshal (ingest2 )
302- require .NoError (c , err )
303-
304- reg1Path := filepath .Join (dir1 , "/data/registry/filebeat/log.json" )
305- require .FileExists (c , reg1Path , "receiver 1 filebeat registry should exist" )
306- reg1Data , err := os .ReadFile (reg1Path )
307- require .NoError (c , err )
308- require .Containsf (c , string (reg1Data ), string (ingest1Json ), "receiver 1 registry should contain '%s', but was: %s" , string (ingest1Json ), string (reg1Data ))
309- require .NotContainsf (c , string (reg1Data ), string (ingest2Json ), "receiver 1 registry should not contain '%s', but was: %s" , string (ingest2Json ), string (reg1Data ))
310-
311- reg2Path := filepath .Join (dir2 , "/data/registry/filebeat/log.json" )
312- require .FileExists (c , reg2Path , "receiver 2 filebeat registry should exist" )
313- reg2Data , err := os .ReadFile (reg2Path )
314- require .NoError (c , err )
315- require .Containsf (c , string (reg2Data ), string (ingest2Json ), "receiver 2 registry should contain '%s', but was: %s" , string (ingest2Json ), string (reg2Data ))
316- require .NotContainsf (c , string (reg2Data ), string (ingest1Json ), "receiver 2 registry should not contain '%s', but was: %s" , string (ingest1Json ), string (reg2Data ))
287+ for i := range nReceivers {
288+ for j := range nReceivers {
289+ if i == j {
290+ continue
291+ }
292+ h1 := helpers [i ]
293+ h2 := helpers [j ]
294+ assert .NotEqualf (c , allMetaData [i ], allMetaData [j ], "meta data files between %v and %v should be different" , h1 , h2 )
295+ assert .NotContainsf (c , allRegData [i ], allRegData [j ], "receiver %v registry should not contain data from %v registry" , h1 , h2 )
296+ }
297+ }
317298 },
318299 })
319300}
@@ -392,14 +373,14 @@ func TestReceiverDegraded(t *testing.T) {
392373 }
393374}
394375
395- func genSocketPath () string {
396- randData := make ([] byte , 16 )
397- for i := range len ( randData ) {
398- randData [ i ] = uint8 ( rand . UintN ( 255 )) //nolint:gosec // 0-255 fits in a uint8
399- }
400- socketName := base64 . URLEncoding . EncodeToString ( randData ) + ".sock"
401- socketDir : = os .TempDir ( )
402- return filepath . Join ( socketDir , socketName )
376+ func genSocketPath (t * testing. T ) string {
377+ t . Helper ( )
378+ socketName , err := uuid . NewV4 ()
379+ require . NoError ( t , err )
380+ // Use os.TempDir() for short Unix socket paths
381+ sockPath := filepath . Join ( os . TempDir (), socketName . String () + ".sock" )
382+ t . Cleanup ( func () { _ = os .Remove ( sockPath ) } )
383+ return sockPath
403384}
404385
405386func getFromSocket (t * testing.T , sb * strings.Builder , socketPath string , endpoint string ) bool {
@@ -409,8 +390,8 @@ func getFromSocket(t *testing.T, sb *strings.Builder, socketPath string, endpoin
409390 }
410391 client := http.Client {
411392 Transport : & http.Transport {
412- DialContext : func (_ context.Context , _ , _ string ) (net.Conn , error ) {
413- return net .Dial ( "unix" , socketPath )
393+ DialContext : func (ctx context.Context , _ , _ string ) (net.Conn , error ) {
394+ return ( & net.Dialer {}). DialContext ( ctx , "unix" , socketPath )
414395 },
415396 },
416397 }
@@ -630,3 +611,17 @@ func TestReceiverHook(t *testing.T) {
630611 // one for beat metrics, one for input metrics and one for getting the registry.
631612 oteltest .TestReceiverHook (t , & cfg , NewFactory (), receiverSettings , 3 )
632613}
614+
615+ func hostFromSocket (socket string ) string {
616+ if runtime .GOOS == "windows" {
617+ return "npipe:///" + filepath .Base (socket )
618+ }
619+ return "unix://" + socket
620+ }
621+
622+ func writeFile (t require.TestingT , path string , data string ) {
623+ f , err := os .OpenFile (path , os .O_APPEND | os .O_CREATE | os .O_WRONLY , 0o644 )
624+ require .NoErrorf (t , err , "Could not open file %s" , path )
625+ _ , err = f .WriteString (data + "\n " )
626+ require .NoErrorf (t , err , "Could not write %s to file %s" , data , path )
627+ }
0 commit comments