Skip to content
Open
23 changes: 20 additions & 3 deletions foreign/go/client/iggy_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package client
import (
"context"
"fmt"
"log"
"log/slog"
"sync"
"sync/atomic"
"time"
Expand All @@ -32,6 +32,7 @@ import (
type Options struct {
protocol iggcon.Protocol
tcpOptions []tcp.Option
logger *slog.Logger

heartbeatInterval time.Duration
}
Expand All @@ -40,6 +41,7 @@ func GetDefaultOptions() Options {
return Options{
protocol: iggcon.Tcp,
tcpOptions: nil,
logger: slog.New(slog.DiscardHandler),
heartbeatInterval: 5 * time.Second,
}
}
Expand All @@ -54,8 +56,22 @@ func WithTcp(tcpOpts ...tcp.Option) Option {
}
}

// WithLogger sets the logger for the Iggy client and its underlying transport.
// This logger is used by the heartbeat and forwarded to the transport as a
// default. When no logger is provided, all internal log output is silently
// discarded.
func WithLogger(logger *slog.Logger) Option {
return func(opts *Options) {
if logger == nil {
return
}
opts.logger = logger
}
}

Comment thread
slash-init marked this conversation as resolved.
type IggyClient struct {
iggcon.Client
logger *slog.Logger
cancel context.CancelFunc
wg sync.WaitGroup
heartbeatInterval time.Duration
Expand All @@ -75,12 +91,13 @@ func NewIggyClient(options ...Option) (iggcon.Client, error) {
var cli iggcon.Client
switch opts.protocol {
case iggcon.Tcp:
cli = tcp.NewIggyTcpClient(opts.tcpOptions...)
cli = tcp.NewIggyTcpClient(opts.logger, opts.tcpOptions...)
default:
return nil, fmt.Errorf("unknown protocol type: %v", opts.protocol)
}
ic := &IggyClient{
Client: cli,
logger: opts.logger,
cancel: func() {},
heartbeatInterval: opts.heartbeatInterval,
}
Expand Down Expand Up @@ -110,7 +127,7 @@ func (ic *IggyClient) Connect(ctx context.Context) error {
case <-ticker.C:
pingCtx, pingCancel := context.WithTimeout(lifetimeCtx, ic.heartbeatInterval/2)
if err := ic.Ping(pingCtx); err != nil {
log.Printf("[WARN] heartbeat failed: %v", err)
ic.logger.Warn("heartbeat failed", "error", err)
}
pingCancel()
}
Expand Down
60 changes: 60 additions & 0 deletions foreign/go/client/iggy_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package client

import (
"bytes"
"log/slog"
"strings"
"testing"
)

func TestWithLogger_Nil(t *testing.T) {
cli, err := NewIggyClient(WithLogger(nil))
if err != nil {
t.Fatalf("unexpected error creating client: %v", err)
}
ic := cli.(*IggyClient)
if ic.logger.Handler() != slog.DiscardHandler {
t.Errorf("expected slog.DiscardHandler, got %v", ic.logger.Handler())
}
}

Comment thread
slash-init marked this conversation as resolved.
func TestWithLogger_SetsClientLogger(t *testing.T) {
var buf bytes.Buffer
logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))

cli, err := NewIggyClient(WithLogger(logger))
if err != nil {
t.Fatalf("unexpected error creating client: %v", err)
}

ic := cli.(*IggyClient)

ic.logger.Info("probe message", "key", "value")

output := buf.String()
if !strings.Contains(output, "probe message") {
t.Errorf("expected logger output to contain 'probe message', got: %q", output)
}
if !strings.Contains(output, "key=value") {
t.Errorf("expected logger output to contain 'key=value', got: %q", output)
}
}
5 changes: 4 additions & 1 deletion foreign/go/client/tcp/tcp_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"crypto/x509"
"encoding/binary"
"fmt"
"log/slog"
"net"
"os"
"strings"
Expand Down Expand Up @@ -51,6 +52,7 @@ type IggyTcpClient struct {
conn net.Conn
mtx sync.Mutex
config config
logger *slog.Logger
MessageCompression iggcon.IggyMessageCompression
leaderRedirectionState iggcon.LeaderRedirectionState
clientAddress string
Expand Down Expand Up @@ -195,7 +197,7 @@ func WithTLSValidateCertificate(validate bool) TLSOption {

// NewIggyTcpClient creates a new Iggy TCP client with the given options.
// warning: don't use this function directly, use iggycli.NewIggyClient with iggycli.WithTcp instead.
func NewIggyTcpClient(options ...Option) *IggyTcpClient {
func NewIggyTcpClient(logger *slog.Logger, options ...Option) *IggyTcpClient {
opts := GetDefaultOptions()
for _, opt := range options {
if opt != nil {
Expand All @@ -205,6 +207,7 @@ func NewIggyTcpClient(options ...Option) *IggyTcpClient {

return &IggyTcpClient{
config: opts.config,
logger: logger,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger: logger,
logger: opts.logger,

clientAddress: "",
conn: nil,
state: iggcon.StateDisconnected,
Expand Down
27 changes: 25 additions & 2 deletions foreign/go/client/tcp/tcp_core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package tcp

import (
"bytes"
"context"
"encoding/binary"
"errors"
"log/slog"
"net"
"strings"
"testing"
"time"

Expand All @@ -36,8 +39,9 @@ func newTestClient(t *testing.T) (*IggyTcpClient, net.Conn) {
t.Helper()
serverConn, clientConn := net.Pipe()
c := &IggyTcpClient{
conn: clientConn,
state: iggcon.StateConnected,
conn: clientConn,
state: iggcon.StateConnected,
logger: slog.New(slog.DiscardHandler),
}
t.Cleanup(func() {
err := clientConn.Close()
Expand Down Expand Up @@ -236,3 +240,22 @@ func TestSendAndFetchResponse_SuccessWithBody(t *testing.T) {
t.Errorf("expected state %v, got %v", iggcon.StateConnected, c.state)
}
}

func TestNewIggyTcpClient_StoresProvidedLogger(t *testing.T) {
var buf bytes.Buffer
logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))

c := NewIggyTcpClient(logger)

c.logger.Info("transport probe", "source", "tcp")

output := buf.String()
if !strings.Contains(output, "transport probe") {
t.Errorf("expected logger output to contain 'transport probe', got: %q", output)
}
if !strings.Contains(output, "source=tcp") {
t.Errorf("expected logger output to contain 'source=tcp', got: %q", output)
}
}
1 change: 1 addition & 0 deletions foreign/go/client/tcp/tcp_session_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (c *IggyTcpClient) HandleLeaderRedirection(ctx context.Context) (bool, erro
c,
currentAddress,
iggcon.Tcp,
c.logger,
)
if err != nil {
return false, err
Expand Down
47 changes: 35 additions & 12 deletions foreign/go/internal/util/leader_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"net"
"strconv"
"strings"
Expand All @@ -31,25 +31,36 @@ import (

// CheckAndRedirectToLeader queries the client for cluster metadata and returns
// an address to redirect to (empty string means no redirection needed).
func CheckAndRedirectToLeader(ctx context.Context, c iggcon.Client, currentAddress string, transport iggcon.Protocol) (string, error) {
log.Println("Checking cluster metadata for leader detection")
func CheckAndRedirectToLeader(ctx context.Context, c iggcon.Client, currentAddress string, transport iggcon.Protocol, logger *slog.Logger) (string, error) {
logger.Debug("Checking cluster metadata for leader detection")
Comment thread
slash-init marked this conversation as resolved.

meta, err := c.GetClusterMetadata(ctx)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return "", err
}
log.Printf("Failed to get cluster metadata: %v, connection will continue on server node %s\n", err, currentAddress)
logger.Warn(
"Failed to get cluster metadata, connection will continue on server node",
"error", err,
"current_address", currentAddress,
)
return "", nil
}

log.Printf("Got cluster metadata: %d nodes, cluster: %s\n", len(meta.Nodes), meta.Name)
return processClusterMetadata(meta, currentAddress, transport)
logger.Debug(
"Got cluster metadata",
"nodes", len(meta.Nodes),
"cluster", meta.Name,
)
return processClusterMetadata(meta, currentAddress, transport, logger)
}

func processClusterMetadata(metadata *iggcon.ClusterMetadata, currentAddress string, transport iggcon.Protocol) (string, error) {
func processClusterMetadata(metadata *iggcon.ClusterMetadata, currentAddress string, transport iggcon.Protocol, logger *slog.Logger) (string, error) {
if len(metadata.Nodes) == 1 {
log.Printf("Single-node cluster detected (%s), no leader redirection needed\n", metadata.Nodes[0].Name)
logger.Debug(
"Single-node cluster detected, no leader redirection needed",
"node", metadata.Nodes[0].Name,
)
return "", nil
}

Expand All @@ -63,7 +74,10 @@ func processClusterMetadata(metadata *iggcon.ClusterMetadata, currentAddress str
}

if leader == nil {
log.Printf("No active leader found in cluster metadata, connection will continue on server node %s\n", currentAddress)
logger.Warn(
"No active leader found in cluster metadata, connection will continue on server node",
"current_address", currentAddress,
)
return "", nil
}

Expand All @@ -82,14 +96,23 @@ func processClusterMetadata(metadata *iggcon.ClusterMetadata, currentAddress str
}

leaderAddress := net.JoinHostPort(leader.IP, strconv.Itoa(int(leaderPort)))
log.Printf("Found leader node: %s at %s (using %s transport)\n", leader.Name, leaderAddress, transport)
logger.Info(
"Found leader node",
"leader", leader.Name,
"address", leaderAddress,
"transport", transport,
)

if !isSameAddress(currentAddress, leaderAddress) {
log.Printf("Current connection to %s is not the leader, will redirect to %s\n", currentAddress, leaderAddress)
logger.Info(
"Current connection is not the leader, redirecting",
"current_address", currentAddress,
"leader_address", leaderAddress,
)
return leaderAddress, nil
}

log.Printf("Already connected to leader at %s\n", currentAddress)
logger.Debug("Already connected to leader at", "current_address", currentAddress)
return "", nil
}

Expand Down