diff --git a/foreign/go/client/iggy_client.go b/foreign/go/client/iggy_client.go index dbc8294b51..452eb58bdb 100644 --- a/foreign/go/client/iggy_client.go +++ b/foreign/go/client/iggy_client.go @@ -20,7 +20,7 @@ package client import ( "context" "fmt" - "log" + "log/slog" "sync" "sync/atomic" "time" @@ -32,6 +32,7 @@ import ( type Options struct { protocol iggcon.Protocol tcpOptions []tcp.Option + logger *slog.Logger heartbeatInterval time.Duration } @@ -40,6 +41,7 @@ func GetDefaultOptions() Options { return Options{ protocol: iggcon.Tcp, tcpOptions: nil, + logger: slog.New(slog.DiscardHandler), heartbeatInterval: 5 * time.Second, } } @@ -54,8 +56,21 @@ func WithTcp(tcpOpts ...tcp.Option) Option { } } +// WithLogger sets the logger for the Iggy client and its underlying transport. +// This logger is used by the heartbeat and forwarded to the transport. +// When no logger is provided, all internal log output is silently discarded. +func WithLogger(logger *slog.Logger) Option { + return func(opts *Options) { + if logger == nil { + return + } + opts.logger = logger + } +} + type IggyClient struct { iggcon.Client + logger *slog.Logger cancel context.CancelFunc wg sync.WaitGroup heartbeatInterval time.Duration @@ -75,12 +90,13 @@ func NewIggyClient(options ...Option) (iggcon.Client, error) { var cli iggcon.Client switch opts.protocol { case iggcon.Tcp: - cli = tcp.NewIggyTcpClient(opts.tcpOptions...) + cli = tcp.NewIggyTcpClient(opts.logger, opts.tcpOptions...) default: return nil, fmt.Errorf("unknown protocol type: %v", opts.protocol) } ic := &IggyClient{ Client: cli, + logger: opts.logger, cancel: func() {}, heartbeatInterval: opts.heartbeatInterval, } @@ -110,7 +126,7 @@ func (ic *IggyClient) Connect(ctx context.Context) error { case <-ticker.C: pingCtx, pingCancel := context.WithTimeout(lifetimeCtx, ic.heartbeatInterval/2) if err := ic.Ping(pingCtx); err != nil { - log.Printf("[WARN] heartbeat failed: %v", err) + ic.logger.Warn("heartbeat failed", "error", err) } pingCancel() } diff --git a/foreign/go/client/iggy_client_test.go b/foreign/go/client/iggy_client_test.go new file mode 100644 index 0000000000..cd640b8823 --- /dev/null +++ b/foreign/go/client/iggy_client_test.go @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "bytes" + "log/slog" + "strings" + "testing" +) + +func TestWithLogger_Nil(t *testing.T) { + cli, err := NewIggyClient(WithLogger(nil)) + if err != nil { + t.Fatalf("unexpected error creating client: %v", err) + } + ic := cli.(*IggyClient) + if ic.logger.Handler() != slog.DiscardHandler { + t.Errorf("expected slog.DiscardHandler, got %v", ic.logger.Handler()) + } +} + +func TestWithLogger_SetsClientLogger(t *testing.T) { + var buf bytes.Buffer + logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{ + Level: slog.LevelDebug, + })) + + cli, err := NewIggyClient(WithLogger(logger)) + if err != nil { + t.Fatalf("unexpected error creating client: %v", err) + } + + ic := cli.(*IggyClient) + + ic.logger.Info("probe message", "key", "value") + + output := buf.String() + if !strings.Contains(output, "probe message") { + t.Errorf("expected logger output to contain 'probe message', got: %q", output) + } + if !strings.Contains(output, "key=value") { + t.Errorf("expected logger output to contain 'key=value', got: %q", output) + } +} diff --git a/foreign/go/client/tcp/tcp_core.go b/foreign/go/client/tcp/tcp_core.go index 0d9d922fe2..dcfc540a8e 100644 --- a/foreign/go/client/tcp/tcp_core.go +++ b/foreign/go/client/tcp/tcp_core.go @@ -23,6 +23,7 @@ import ( "crypto/x509" "encoding/binary" "fmt" + "log/slog" "net" "os" "strings" @@ -51,6 +52,7 @@ type IggyTcpClient struct { conn net.Conn mtx sync.Mutex config config + logger *slog.Logger MessageCompression iggcon.IggyMessageCompression leaderRedirectionState iggcon.LeaderRedirectionState clientAddress string @@ -195,7 +197,10 @@ func WithTLSValidateCertificate(validate bool) TLSOption { // NewIggyTcpClient creates a new Iggy TCP client with the given options. // warning: don't use this function directly, use iggycli.NewIggyClient with iggycli.WithTcp instead. -func NewIggyTcpClient(options ...Option) *IggyTcpClient { +func NewIggyTcpClient(logger *slog.Logger, options ...Option) *IggyTcpClient { + if logger == nil { + logger = slog.New(slog.DiscardHandler) + } opts := GetDefaultOptions() for _, opt := range options { if opt != nil { @@ -205,6 +210,7 @@ func NewIggyTcpClient(options ...Option) *IggyTcpClient { return &IggyTcpClient{ config: opts.config, + logger: logger, clientAddress: "", conn: nil, state: iggcon.StateDisconnected, diff --git a/foreign/go/client/tcp/tcp_core_test.go b/foreign/go/client/tcp/tcp_core_test.go index 71256cb4b9..19ac7d992d 100644 --- a/foreign/go/client/tcp/tcp_core_test.go +++ b/foreign/go/client/tcp/tcp_core_test.go @@ -18,10 +18,13 @@ package tcp import ( + "bytes" "context" "encoding/binary" "errors" + "log/slog" "net" + "strings" "testing" "time" @@ -36,8 +39,9 @@ func newTestClient(t *testing.T) (*IggyTcpClient, net.Conn) { t.Helper() serverConn, clientConn := net.Pipe() c := &IggyTcpClient{ - conn: clientConn, - state: iggcon.StateConnected, + conn: clientConn, + state: iggcon.StateConnected, + logger: slog.New(slog.DiscardHandler), } t.Cleanup(func() { err := clientConn.Close() @@ -236,3 +240,22 @@ func TestSendAndFetchResponse_SuccessWithBody(t *testing.T) { t.Errorf("expected state %v, got %v", iggcon.StateConnected, c.state) } } + +func TestNewIggyTcpClient_StoresProvidedLogger(t *testing.T) { + var buf bytes.Buffer + logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{ + Level: slog.LevelDebug, + })) + + c := NewIggyTcpClient(logger) + + c.logger.Info("transport probe", "source", "tcp") + + output := buf.String() + if !strings.Contains(output, "transport probe") { + t.Errorf("expected logger output to contain 'transport probe', got: %q", output) + } + if !strings.Contains(output, "source=tcp") { + t.Errorf("expected logger output to contain 'source=tcp', got: %q", output) + } +} diff --git a/foreign/go/client/tcp/tcp_session_management.go b/foreign/go/client/tcp/tcp_session_management.go index 36bada6882..80047ef2a3 100644 --- a/foreign/go/client/tcp/tcp_session_management.go +++ b/foreign/go/client/tcp/tcp_session_management.go @@ -89,6 +89,7 @@ func (c *IggyTcpClient) HandleLeaderRedirection(ctx context.Context) (bool, erro c, currentAddress, iggcon.Tcp, + c.logger, ) if err != nil { return false, err diff --git a/foreign/go/internal/util/leader_aware.go b/foreign/go/internal/util/leader_aware.go index c9e7cae6c7..a7e58c3175 100644 --- a/foreign/go/internal/util/leader_aware.go +++ b/foreign/go/internal/util/leader_aware.go @@ -21,7 +21,7 @@ import ( "context" "errors" "fmt" - "log" + "log/slog" "net" "strconv" "strings" @@ -31,25 +31,36 @@ import ( // CheckAndRedirectToLeader queries the client for cluster metadata and returns // an address to redirect to (empty string means no redirection needed). -func CheckAndRedirectToLeader(ctx context.Context, c iggcon.Client, currentAddress string, transport iggcon.Protocol) (string, error) { - log.Println("Checking cluster metadata for leader detection") +func CheckAndRedirectToLeader(ctx context.Context, c iggcon.Client, currentAddress string, transport iggcon.Protocol, logger *slog.Logger) (string, error) { + logger.Debug("Checking cluster metadata for leader detection") meta, err := c.GetClusterMetadata(ctx) if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return "", err } - log.Printf("Failed to get cluster metadata: %v, connection will continue on server node %s\n", err, currentAddress) + logger.Warn( + "Failed to get cluster metadata, connection will continue on server node", + "error", err, + "current_address", currentAddress, + ) return "", nil } - log.Printf("Got cluster metadata: %d nodes, cluster: %s\n", len(meta.Nodes), meta.Name) - return processClusterMetadata(meta, currentAddress, transport) + logger.Debug( + "Got cluster metadata", + "nodes", len(meta.Nodes), + "cluster", meta.Name, + ) + return processClusterMetadata(meta, currentAddress, transport, logger) } -func processClusterMetadata(metadata *iggcon.ClusterMetadata, currentAddress string, transport iggcon.Protocol) (string, error) { +func processClusterMetadata(metadata *iggcon.ClusterMetadata, currentAddress string, transport iggcon.Protocol, logger *slog.Logger) (string, error) { if len(metadata.Nodes) == 1 { - log.Printf("Single-node cluster detected (%s), no leader redirection needed\n", metadata.Nodes[0].Name) + logger.Debug( + "Single-node cluster detected, no leader redirection needed", + "node", metadata.Nodes[0].Name, + ) return "", nil } @@ -63,7 +74,10 @@ func processClusterMetadata(metadata *iggcon.ClusterMetadata, currentAddress str } if leader == nil { - log.Printf("No active leader found in cluster metadata, connection will continue on server node %s\n", currentAddress) + logger.Warn( + "No active leader found in cluster metadata, connection will continue on server node", + "current_address", currentAddress, + ) return "", nil } @@ -82,14 +96,23 @@ func processClusterMetadata(metadata *iggcon.ClusterMetadata, currentAddress str } leaderAddress := net.JoinHostPort(leader.IP, strconv.Itoa(int(leaderPort))) - log.Printf("Found leader node: %s at %s (using %s transport)\n", leader.Name, leaderAddress, transport) + logger.Debug( + "Found leader node", + "leader", leader.Name, + "leader_address", leaderAddress, + "transport", transport, + ) if !isSameAddress(currentAddress, leaderAddress) { - log.Printf("Current connection to %s is not the leader, will redirect to %s\n", currentAddress, leaderAddress) + logger.Info( + "Current connection is not the leader, redirecting", + "current_address", currentAddress, + "leader_address", leaderAddress, + ) return leaderAddress, nil } - log.Printf("Already connected to leader at %s\n", currentAddress) + logger.Debug("Already connected to leader", "current_address", currentAddress) return "", nil }