Skip to content

Commit 137c099

Browse files
mattlordmhamza15
authored andcommitted
VReplication: Estimate lag when workflow fully throttled (vitessio#16577)
Signed-off-by: Matt Lord <[email protected]>
1 parent a0d4776 commit 137c099

File tree

3 files changed

+85
-26
lines changed

3 files changed

+85
-26
lines changed

go/test/endtoend/vreplication/vreplication_test.go

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"io"
2323
"net/http"
2424
"runtime"
25+
"strconv"
2526
"strings"
2627
"sync"
2728
"testing"
@@ -57,7 +58,7 @@ var (
5758
targetKsOpts = make(map[string]string)
5859
httpClient = throttlebase.SetupHTTPClient(time.Second)
5960
sourceThrottlerAppName = throttlerapp.VStreamerName
60-
targetThrottlerAppName = throttlerapp.VReplicationName
61+
targetThrottlerAppName = throttlerapp.VPlayerName
6162
)
6263

6364
const (
@@ -1196,6 +1197,8 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
11961197
// we expect the additional rows to **not appear** in the materialized view
11971198
for _, tab := range customerTablets {
11981199
waitForRowCountInTablet(t, tab, keyspace, workflow, 5)
1200+
// Confirm that we updated the stats on the target tablets as expected.
1201+
confirmVReplicationThrottling(t, tab, sourceKs, workflow, sourceThrottlerAppName)
11991202
}
12001203
})
12011204
t.Run("unthrottle-app-product", func(t *testing.T) {
@@ -1229,6 +1232,8 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
12291232
// rows to **not appear** in the materialized view.
12301233
for _, tab := range customerTablets {
12311234
waitForRowCountInTablet(t, tab, keyspace, workflow, 8)
1235+
// Confirm that we updated the stats on the target tablets as expected.
1236+
confirmVReplicationThrottling(t, tab, sourceKs, workflow, targetThrottlerAppName)
12321237
}
12331238
})
12341239
t.Run("unthrottle-app-customer", func(t *testing.T) {
@@ -1784,3 +1789,52 @@ func waitForInnoDBHistoryLength(t *testing.T, tablet *cluster.VttabletProcess, e
17841789
func releaseInnoDBRowHistory(t *testing.T, dbConn *mysql.Conn) {
17851790
execQuery(t, dbConn, "rollback")
17861791
}
1792+
1793+
// confirmVReplicationThrottling confirms that the throttling related metrics reflect that
1794+
// the workflow is being throttled as expected, via the expected app name, and that this
1795+
// is impacting the lag as expected.
1796+
// The tablet passed should be a target tablet for the given workflow while the keyspace
1797+
// name provided should be the source keyspace as the target tablet stats note the stream's
1798+
// source keyspace and shard.
1799+
func confirmVReplicationThrottling(t *testing.T, tab *cluster.VttabletProcess, keyspace, workflow string, appname throttlerapp.Name) {
1800+
const (
1801+
sleepTime = 5 * time.Second
1802+
zv = int64(0)
1803+
)
1804+
time.Sleep(sleepTime) // To be sure that we accrue some lag
1805+
1806+
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
1807+
require.NoError(t, err)
1808+
require.NotEqual(t, "{}", jsVal)
1809+
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
1810+
throttledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.%s`, workflow, appname)).Int()
1811+
require.Greater(t, throttledCount, zv, "JSON value: %s", jsVal)
1812+
1813+
val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
1814+
require.NoError(t, err)
1815+
require.NotEqual(t, "", val)
1816+
throttledCountTotal, err := strconv.ParseInt(val, 10, 64)
1817+
require.NoError(t, err)
1818+
require.GreaterOrEqual(t, throttledCountTotal, throttledCount, "Value: %s", val)
1819+
1820+
// We do not calculate replication lag for the vcopier as it's not replicating
1821+
// events.
1822+
if appname != throttlerapp.VCopierName {
1823+
jsVal, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSeconds"})
1824+
require.NoError(t, err)
1825+
require.NotEqual(t, "{}", jsVal)
1826+
// The JSON value looks like this: {"product.0.cproduct.4": 6}
1827+
vreplLagSeconds := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.%s\.*`, keyspace, workflow)).Int()
1828+
require.NoError(t, err)
1829+
// Take off 1 second to deal with timing issues in the test.
1830+
minLagSecs := int64(int64(sleepTime.Seconds()) - 1)
1831+
require.GreaterOrEqual(t, vreplLagSeconds, minLagSecs, "JSON value: %s", jsVal)
1832+
1833+
val, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSecondsMax"})
1834+
require.NoError(t, err)
1835+
require.NotEqual(t, "", val)
1836+
vreplLagSecondsMax, err := strconv.ParseInt(val, 10, 64)
1837+
require.NoError(t, err)
1838+
require.GreaterOrEqual(t, vreplLagSecondsMax, vreplLagSeconds, "Value: %s", val)
1839+
}
1840+
}

go/vt/vttablet/tabletmanager/vreplication/vplayer.go

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -453,33 +453,34 @@ func (vp *vplayer) recordHeartbeat() error {
453453
func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
454454
defer vp.vr.dbClient.Rollback()
455455

456+
estimateLag := func() {
457+
behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
458+
vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9)
459+
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second)
460+
}
461+
456462
// If we're not running, set ReplicationLagSeconds to be very high.
457463
// TODO(sougou): if we also stored the time of the last event, we
458464
// can estimate this value more accurately.
459465
defer vp.vr.stats.ReplicationLagSeconds.Store(math.MaxInt64)
460466
defer vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), math.MaxInt64)
461-
var sbm int64 = -1
467+
var lagSecs int64
462468
for {
463469
if ctx.Err() != nil {
464470
return ctx.Err()
465471
}
466472
// Check throttler.
467473
if !vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)) {
468474
_ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName)
475+
estimateLag()
469476
continue
470477
}
471478

472479
items, err := relay.Fetch()
473480
if err != nil {
474481
return err
475482
}
476-
// No events were received. This likely means that there's a network partition.
477-
// So, we should assume we're falling behind.
478-
if len(items) == 0 {
479-
behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
480-
vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9)
481-
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second)
482-
}
483+
483484
// Empty transactions are saved at most once every idleTimeout.
484485
// This covers two situations:
485486
// 1. Fetch was idle for idleTimeout.
@@ -496,12 +497,21 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
496497
return nil
497498
}
498499
}
500+
501+
lagSecs = -1
499502
for i, events := range items {
500503
for j, event := range events {
501504
if event.Timestamp != 0 {
502-
vp.lastTimestampNs = event.Timestamp * 1e9
503-
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
504-
sbm = event.CurrentTime/1e9 - event.Timestamp
505+
// If the event is a heartbeat sent while throttled then do not update
506+
// the lag based on it.
507+
// If the batch consists only of throttled heartbeat events then we cannot
508+
// determine the actual lag, as the vstreamer is fully throttled, and we
509+
// will estimate it after processing the batch.
510+
if !(event.Type == binlogdatapb.VEventType_HEARTBEAT && event.Throttled) {
511+
vp.lastTimestampNs = event.Timestamp * 1e9
512+
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
513+
lagSecs = event.CurrentTime/1e9 - event.Timestamp
514+
}
505515
}
506516
mustSave := false
507517
switch event.Type {
@@ -532,11 +542,12 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
532542
}
533543
}
534544

535-
if sbm >= 0 {
536-
vp.vr.stats.ReplicationLagSeconds.Store(sbm)
537-
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(sbm)*time.Second)
545+
if lagSecs >= 0 {
546+
vp.vr.stats.ReplicationLagSeconds.Store(lagSecs)
547+
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(lagSecs)*time.Second)
548+
} else { // We couldn't determine the lag, so we need to estimate it
549+
estimateLag()
538550
}
539-
540551
}
541552
}
542553

go/vt/vttablet/tabletserver/vstreamer/vstreamer.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"vitess.io/vitess/go/mysql/collations"
3131
"vitess.io/vitess/go/mysql/replication"
3232
"vitess.io/vitess/go/sqltypes"
33-
"vitess.io/vitess/go/timer"
3433
"vitess.io/vitess/go/vt/binlog"
3534
"vitess.io/vitess/go/vt/dbconfigs"
3635
"vitess.io/vitess/go/vt/log"
@@ -283,11 +282,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
283282
defer hbTimer.Stop()
284283

285284
injectHeartbeat := func(throttled bool) error {
286-
now := time.Now().UnixNano()
287285
select {
288286
case <-ctx.Done():
289287
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
290288
default:
289+
now := time.Now().UnixNano()
291290
err := bufferAndTransmit(&binlogdatapb.VEvent{
292291
Type: binlogdatapb.VEventType_HEARTBEAT,
293292
Timestamp: now / 1e9,
@@ -299,8 +298,6 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
299298
}
300299

301300
throttleEvents := func(throttledEvents chan mysql.BinlogEvent) {
302-
throttledHeartbeatsRateLimiter := timer.NewRateLimiter(HeartbeatTime)
303-
defer throttledHeartbeatsRateLimiter.Stop()
304301
for {
305302
// check throttler.
306303
if !vs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vs.throttlerApp) {
@@ -309,12 +306,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
309306
case <-ctx.Done():
310307
return
311308
default:
312-
// do nothing special
309+
// Do nothing special.
313310
}
314-
throttledHeartbeatsRateLimiter.Do(func() error {
315-
return injectHeartbeat(true)
316-
})
317-
// we won't process events, until we're no longer throttling
318311
continue
319312
}
320313
select {
@@ -386,7 +379,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
386379
case <-ctx.Done():
387380
return nil
388381
case <-hbTimer.C:
389-
if err := injectHeartbeat(false); err != nil {
382+
ok := vs.vse.throttlerClient.ThrottleCheckOK(ctx, vs.throttlerApp)
383+
if err := injectHeartbeat(!ok); err != nil {
390384
if err == io.EOF {
391385
return nil
392386
}

0 commit comments

Comments
 (0)