Skip to content

Commit a69f827

Browse files
committed
wip
gotta deal with the fucking makefile shit holy fuck
1 parent cd86663 commit a69f827

File tree

18 files changed

+415
-163
lines changed

18 files changed

+415
-163
lines changed

cmd/gostatsd/config.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,26 @@ import (
44
"github.com/spf13/viper"
55

66
"github.com/atlassian/gostatsd/pkg/statsd"
7+
"github.com/atlassian/gostatsd/pkg/transport"
8+
vtransport "github.com/atlassian/gostatsd/pkg/viper/transport"
79
)
810

911
type Config struct {
1012
ProfileAddr string
1113

12-
Server *statsd.Config
14+
Server *statsd.Config
15+
TransportPool *transport.PoolConfig
1316
}
1417

1518
func ConfigFromViper(v *viper.Viper) (*Config, error) {
19+
transportPool, err := vtransport.TransportPoolConfigFromViper(v.Sub("transport"))
20+
if err != nil {
21+
return nil, err
22+
}
23+
1624
return &Config{
17-
ProfileAddr: v.GetString(ParamProfile),
18-
Server: &statsd.Config{},
25+
ProfileAddr: v.GetString(ParamProfile),
26+
Server: &statsd.Config{},
27+
TransportPool: transportPool,
1928
}, nil
2029
}

cmd/gostatsd/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func constructServer(v *viper.Viper, cfg *Config) (*statsd.Server, error) {
9595
logger := logrus.StandardLogger()
9696

9797
// HTTP client pool
98-
pool := transport.NewTransportPool(logger, v)
98+
pool := transport.NewTransportPool(logger, cfg.TransportPool)
9999

100100
// Cached instances
101101
var cachedInstances gostatsd.CachedInstances

cmd/lambda-extension/config.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package main
2+
3+
import (
4+
"github.com/spf13/viper"
5+
6+
"github.com/atlassian/gostatsd/pkg/statsd"
7+
"github.com/atlassian/gostatsd/pkg/transport"
8+
vtransport "github.com/atlassian/gostatsd/pkg/viper/transport"
9+
)
10+
11+
type Config struct {
12+
Server *statsd.Config
13+
TransportPool *transport.PoolConfig
14+
}
15+
16+
func ConfigFromViper(v *viper.Viper) (*Config, error) {
17+
v.SetDefault("transport", map[string]any{})
18+
transportPool, err := vtransport.TransportPoolConfigFromViper(v.Sub("transport"))
19+
if err != nil {
20+
return nil, err
21+
}
22+
23+
return &Config{
24+
Server: &statsd.Config{},
25+
TransportPool: transportPool,
26+
}, nil
27+
}

cmd/lambda-extension/main.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ const (
3535
GostatsdForwarderMode = "forwarder"
3636
)
3737

38-
func GetConfiguration() (*viper.Viper, error) {
38+
func GetViper() (*viper.Viper, error) {
3939
v := viper.New()
4040
util.InitViper(v, "")
4141

@@ -70,7 +70,7 @@ func GetConfiguration() (*viper.Viper, error) {
7070
return v, nil
7171
}
7272

73-
func NewServer(v *viper.Viper, logger logrus.FieldLogger) *statsd.Server {
73+
func NewServer(v *viper.Viper, cfg *Config, logger logrus.FieldLogger) *statsd.Server {
7474
// create server in forwarder mode
7575
s := &statsd.Server{
7676
InternalTags: v.GetStringSlice(gostatsd.ParamInternalTags),
@@ -96,7 +96,7 @@ func NewServer(v *viper.Viper, logger logrus.FieldLogger) *statsd.Server {
9696
},
9797
BadLineRateLimitPerSecond: rate.Limit(v.GetFloat64(gostatsd.ParamBadLinesPerMinute) / 60.0),
9898
Viper: v,
99-
TransportPool: transport.NewTransportPool(logger, v),
99+
TransportPool: transport.NewTransportPool(logger, cfg.TransportPool),
100100
}
101101

102102
if v.GetBool(gostatsd.ParamLambdaExtensionManualFlush) {
@@ -109,7 +109,12 @@ func NewServer(v *viper.Viper, logger logrus.FieldLogger) *statsd.Server {
109109
}
110110

111111
func main() {
112-
conf, err := GetConfiguration()
112+
v, err := GetViper()
113+
if err != nil {
114+
panic(err)
115+
}
116+
117+
cfg, err := ConfigFromViper(v)
113118
if err != nil {
114119
panic(err)
115120
}
@@ -126,7 +131,7 @@ func main() {
126131

127132
log.Logger.SetFormatter(&logrus.JSONFormatter{})
128133

129-
if conf.GetBool(ParamVerbose) {
134+
if v.GetBool(ParamVerbose) {
130135
log.Logger.SetLevel(logrus.DebugLevel)
131136
}
132137

@@ -135,18 +140,18 @@ func main() {
135140
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
136141
defer cancel()
137142

138-
server := NewServer(conf, log)
143+
server := NewServer(v, cfg, log)
139144

140145
var opts = make([]extension.ManagerOpt, 0)
141-
telemetryServerAddr := conf.GetString(gostatsd.ParamLambdaExtensionTelemetryAddress)
142-
if conf.GetBool(gostatsd.ParamLambdaExtensionManualFlush) {
146+
telemetryServerAddr := v.GetString(gostatsd.ParamLambdaExtensionTelemetryAddress)
147+
if v.GetBool(gostatsd.ParamLambdaExtensionManualFlush) {
143148
log.Info("Starting extension with manual flush")
144149
opts = append(opts, extension.WithManualFlushEnabled(server.ForwarderFlushCoordinator, telemetryServerAddr))
145150
}
146151

147152
manager := extension.NewManager(
148153
os.Getenv(api.EnvLambdaAPIHostname),
149-
conf.GetString(ParamLambdaFileName),
154+
v.GetString(ParamLambdaFileName),
150155
log,
151156
server,
152157
opts...,

internal/awslambda/extension/manager_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func createStatsDServer(apiAddr string, fc flush.Coordinator, log logrus.FieldLo
131131
MetricsAddr: l.LocalAddr().String(),
132132
ServerMode: "forwarder",
133133
Viper: v,
134-
TransportPool: transport.NewTransportPool(log, v),
134+
TransportPool: transport.NewTransportPool(log, transport.DefaultPoolConfig()),
135135
}
136136
return
137137
}

pkg/backends/cloudwatch/cloudwatch_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
99
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
1010
"github.com/sirupsen/logrus"
11-
"github.com/spf13/viper"
1211
"github.com/stretchr/testify/assert"
1312
"github.com/stretchr/testify/require"
1413

@@ -29,7 +28,7 @@ func (m *mockedCloudwatch) PutMetricData(_ context.Context, input *cloudwatch.Pu
2928
func TestSendMetrics(t *testing.T) {
3029
t.Parallel()
3130

32-
p := transport.NewTransportPool(logrus.New(), viper.New())
31+
p := transport.NewTransportPool(logrus.New(), transport.DefaultPoolConfig())
3332
cli, err := NewClient("ns", "default", gostatsd.TimerSubtypes{}, logrus.New(), p)
3433
require.NoError(t, err)
3534

@@ -86,7 +85,7 @@ func TestSendMetrics(t *testing.T) {
8685
func TestSendMetricDimensions(t *testing.T) {
8786
t.Parallel()
8887

89-
p := transport.NewTransportPool(logrus.New(), viper.New())
88+
p := transport.NewTransportPool(logrus.New(), transport.DefaultPoolConfig())
9089
cli, err := NewClient("ns", "default", gostatsd.TimerSubtypes{}, logrus.New(), p)
9190
require.NoError(t, err)
9291

@@ -192,7 +191,7 @@ func metricsOneOfEach() *gostatsd.MetricMap {
192191
func TestSendHistogram(t *testing.T) {
193192
t.Parallel()
194193

195-
p := transport.NewTransportPool(logrus.New(), viper.New())
194+
p := transport.NewTransportPool(logrus.New(), transport.DefaultPoolConfig())
196195
cli, err := NewClient("ns", "default", gostatsd.TimerSubtypes{}, logrus.New(), p)
197196
require.NoError(t, err)
198197

pkg/backends/datadog/datadog_test.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"time"
1515

1616
"github.com/sirupsen/logrus"
17-
"github.com/spf13/viper"
1817
"github.com/stretchr/testify/assert"
1918
"github.com/stretchr/testify/require"
2019
"github.com/tilinna/clock"
@@ -34,6 +33,12 @@ func advanceTime(c *clock.Mock, ch <-chan struct{}) {
3433
}
3534
}
3635

36+
func defaultTransportPoolConfig() *transport.PoolConfig {
37+
cfg := transport.DefaultPoolConfig()
38+
cfg.DefaultTransport.ClientTimeout = 1 * time.Second
39+
return cfg
40+
}
41+
3742
func TestRetries(t *testing.T) {
3843
t.Parallel()
3944
var requestNum uint32
@@ -54,9 +59,7 @@ func TestRetries(t *testing.T) {
5459
ts := httptest.NewServer(mux)
5560
defer ts.Close()
5661

57-
v := viper.New()
58-
v.Set("transport.default.client-timeout", 1*time.Second)
59-
p := transport.NewTransportPool(logrus.New(), v)
62+
p := transport.NewTransportPool(logrus.New(), defaultTransportPoolConfig())
6063
client, err := NewClient(ts.URL, "apiKey123", "agent", "default", defaultMetricsPerBatch, defaultMaxRequests, true, 2*time.Second, 1*time.Second, gostatsd.TimerSubtypes{}, logrus.New(), p)
6164
require.NoError(t, err)
6265
res := make(chan []error, 1)
@@ -91,9 +94,7 @@ func TestSendMetricsInMultipleBatches(t *testing.T) {
9194
ts := httptest.NewServer(mux)
9295
defer ts.Close()
9396

94-
v := viper.New()
95-
v.Set("transport.default.client-timeout", 1*time.Second)
96-
p := transport.NewTransportPool(logrus.New(), v)
97+
p := transport.NewTransportPool(logrus.New(), defaultTransportPoolConfig())
9798
client, err := NewClient(ts.URL, "apiKey123", "agent", "default", 1, defaultMaxRequests, true, 2*time.Second, 1*time.Second, gostatsd.TimerSubtypes{}, logrus.New(), p)
9899
require.NoError(t, err)
99100
res := make(chan []error, 1)
@@ -144,9 +145,7 @@ func TestSendMetrics(t *testing.T) {
144145
ts := httptest.NewServer(mux)
145146
defer ts.Close()
146147

147-
v := viper.New()
148-
v.Set("transport.default.client-timeout", 1*time.Second)
149-
p := transport.NewTransportPool(logrus.New(), v)
148+
p := transport.NewTransportPool(logrus.New(), defaultTransportPoolConfig())
150149
cli, err := NewClient(ts.URL, "apiKey123", "agent", "default", 1000, defaultMaxRequests, true, 2*time.Second, 1100*time.Millisecond, gostatsd.TimerSubtypes{}, logrus.New(), p)
151150
require.NoError(t, err)
152151

@@ -262,9 +261,7 @@ func TestSendHistogram(t *testing.T) {
262261
ts := httptest.NewServer(mux)
263262
defer ts.Close()
264263

265-
v := viper.New()
266-
v.Set("transport.default.client-timeout", 1*time.Second)
267-
p := transport.NewTransportPool(logrus.New(), v)
264+
p := transport.NewTransportPool(logrus.New(), defaultTransportPoolConfig())
268265
client, err := NewClient(ts.URL, "apiKey123", "agent", "default", 1000, defaultMaxRequests, true, 2*time.Second, 1100*time.Millisecond, gostatsd.TimerSubtypes{}, logrus.New(), p)
269266
require.NoError(t, err)
270267
ctx := clock.Context(context.Background(), clock.NewMock(time.Unix(100, 0)))

pkg/backends/influxdb/influxdb_test.go

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ import (
2424
"github.com/atlassian/gostatsd/pkg/transport"
2525
)
2626

27+
func defaultTransportPoolConfig() *transport.PoolConfig {
28+
cfg := transport.DefaultPoolConfig()
29+
cfg.DefaultTransport.ClientTimeout = 1 * time.Second
30+
return cfg
31+
}
32+
2733
func TestNewClientInvalid(t *testing.T) {
2834
t.Parallel()
2935

@@ -49,7 +55,7 @@ func TestNewClientInvalid(t *testing.T) {
4955
v := viper.New()
5056
for idx, step := range steps {
5157
v.Set("influxdb."+step.key, step.value)
52-
client, err := NewClientFromViper(v, logrus.New(), transport.NewTransportPool(logrus.New(), viper.New()))
58+
client, err := NewClientFromViper(v, logrus.New(), transport.NewTransportPool(logrus.New(), defaultTransportPoolConfig()))
5359
if step.expectedFailure == "" {
5460
require.NoErrorf(t, err, "step %d, key %s", idx, step.key)
5561
require.NotNilf(t, client, "step %d, key %s", idx, step.key)
@@ -67,7 +73,7 @@ func TestNewClientValid(t *testing.T) {
6773
v.Set("influxdb."+paramApiVersion, 1)
6874
v.Set("influxdb."+paramDatabase, "test-db")
6975
v.Set("influxdb."+paramApiEndpoint, "http://localhost")
70-
client, err := NewClientFromViper(v, logrus.New(), transport.NewTransportPool(logrus.New(), viper.New()))
76+
client, err := NewClientFromViper(v, logrus.New(), transport.NewTransportPool(logrus.New(), defaultTransportPoolConfig()))
7177
require.NoError(t, err)
7278
require.NotNil(t, client)
7379
}
@@ -92,9 +98,7 @@ func TestRetries(t *testing.T) {
9298
ts := httptest.NewServer(mux)
9399
defer ts.Close()
94100

95-
v := viper.New()
96-
v.Set("transport.default.client-timeout", 1*time.Second)
97-
p := transport.NewTransportPool(logrus.New(), v)
101+
p := transport.NewTransportPool(logrus.New(), defaultTransportPoolConfig())
98102
cli, err := NewClient(
99103
ts.URL,
100104
true,
@@ -144,9 +148,7 @@ func TestSendMetricsInMultipleBatches(t *testing.T) {
144148
ts := httptest.NewServer(mux)
145149
defer ts.Close()
146150

147-
v := viper.New()
148-
v.Set("transport.default.client-timeout", 1*time.Second)
149-
p := transport.NewTransportPool(logrus.New(), v)
151+
p := transport.NewTransportPool(logrus.New(), defaultTransportPoolConfig())
150152
client, err := NewClient(
151153
ts.URL,
152154
false,
@@ -211,9 +213,7 @@ func TestSendMetrics(t *testing.T) {
211213
ts := httptest.NewServer(mux)
212214
defer ts.Close()
213215

214-
v := viper.New()
215-
v.Set("transport.default.client-timeout", 1*time.Second)
216-
p := transport.NewTransportPool(logrus.New(), v)
216+
p := transport.NewTransportPool(logrus.New(), defaultTransportPoolConfig())
217217
cli, err := NewClient(
218218
ts.URL,
219219
true,
@@ -349,9 +349,7 @@ func TestSendHistogram(t *testing.T) {
349349
ts := httptest.NewServer(mux)
350350
defer ts.Close()
351351

352-
v := viper.New()
353-
v.Set("transport.default.client-timeout", 1*time.Second)
354-
p := transport.NewTransportPool(logrus.New(), v)
352+
p := transport.NewTransportPool(logrus.New(), defaultTransportPoolConfig())
355353
cli, err := NewClient(
356354
ts.URL,
357355
true,
@@ -448,9 +446,7 @@ func TestSendEvent(t *testing.T) {
448446
ts := httptest.NewServer(mux)
449447
defer ts.Close()
450448

451-
v := viper.New()
452-
v.Set("transport.default.client-timeout", 1*time.Second)
453-
p := transport.NewTransportPool(logrus.New(), v)
449+
p := transport.NewTransportPool(logrus.New(), defaultTransportPoolConfig())
454450
cli, err := NewClient(
455451
ts.URL,
456452
true,

0 commit comments

Comments
 (0)