Skip to content
Closed
6 changes: 6 additions & 0 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package smartconnpool

import (
"context"
"fmt"
"slices"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -509,8 +510,12 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) {
// to other clients, wait until one of the connections is returned
if conn == nil {
start := time.Now()

oldPool := fmt.Sprintf("%+v", pool)
conn, err = pool.wait.waitForConn(ctx, nil)
if err != nil {
log.Errorf("===================== ERROR: waitForConn err: %s", err.Error())
log.Errorf("ctx:\n%+v\nOld pool: \n%s\n=================\nNew pool: \n%+v", ctx, oldPool, pool)
return nil, ErrTimeout
}
pool.recordWait(start)
Expand Down Expand Up @@ -568,6 +573,7 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (
start := time.Now()
conn, err = pool.wait.waitForConn(ctx, setting)
if err != nil {
log.Errorf("===================== getWithSetting ERROR: waitForConn err: %s", err.Error())
return nil, ErrTimeout
}
pool.recordWait(start)
Expand Down
9 changes: 8 additions & 1 deletion go/pools/smartconnpool/waitlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package smartconnpool
import (
"context"
"sync"
"time"

"vitess.io/vitess/go/list"
"vitess.io/vitess/go/vt/log"
)

// waiter represents a client waiting for a connection in the waitlist
Expand Down Expand Up @@ -108,7 +110,12 @@ func (wl *waitlist[C]) expire(force bool) {
func (wl *waitlist[D]) tryReturnConn(conn *Pooled[D]) bool {
// fast path: if there's nobody waiting there's nothing to do
if wl.list.Len() == 0 {
return false
// HACK: we're gonna sleep for a bit and try again, to see if there are still no waiters
time.Sleep(10 * time.Millisecond)
if wl.list.Len() == 0 {
log.Error("======================= WE HIT THE FAST PATH, WE'RE RETURNING")
return false
}
}
// split the slow path into a separate function to enable inlining
return wl.tryReturnConnSlow(conn)
Expand Down
44 changes: 23 additions & 21 deletions go/vt/logutil/throttled.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,32 @@ var (
)

func (tl *ThrottledLogger) log(logF logFunc, format string, v ...any) {
now := time.Now()
// now := time.Now()

tl.mu.Lock()
defer tl.mu.Unlock()
logWaitTime := tl.maxInterval - (now.Sub(tl.lastlogTime))
if logWaitTime < 0 {
tl.lastlogTime = now
logF(2, fmt.Sprintf(tl.name+": "+format, v...))
return
}
// If this is the first message to be skipped, start a goroutine
// to log and reset skippedCount
if tl.skippedCount == 0 {
go func(d time.Duration) {
<-time.After(d)
tl.mu.Lock()
defer tl.mu.Unlock()
// Because of the go func(), we lose the stack trace,
// so we just use the current line for this.
logF(0, fmt.Sprintf("%v: skipped %v log messages", tl.name, tl.skippedCount))
tl.skippedCount = 0
}(logWaitTime)
}
tl.skippedCount++
// logWaitTime := tl.maxInterval - (now.Sub(tl.lastlogTime))
// if logWaitTime < 0 {
// tl.lastlogTime = now
logF(2, fmt.Sprintf(tl.name+": "+format, v...))
// return
// }
// // If this is the first message to be skipped, start a goroutine
// // to log and reset skippedCount
//
// if tl.skippedCount == 0 {
// go func(d time.Duration) {
// <-time.After(d)
// tl.mu.Lock()
// defer tl.mu.Unlock()
// // Because of the go func(), we lose the stack trace,
// // so we just use the current line for this.
// logF(0, fmt.Sprintf("%v: skipped %v log messages", tl.name, tl.skippedCount))
// tl.skippedCount = 0
// }(logWaitTime)
// }
//
// tl.skippedCount++
}

// Infof logs an info if not throttled.
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
return nil, err
}
defer conn.Recycle()

res, err := qre.execDBConn(conn.Conn, sql, true)
if err != nil {
return nil, err
Expand Down
Loading