diff --git a/binlog_streamer.go b/binlog_streamer.go index 611188d79..5351e474e 100644 --- a/binlog_streamer.go +++ b/binlog_streamer.go @@ -15,21 +15,66 @@ import ( const caughtUpThreshold = 10 * time.Second -type BinlogStreamer struct { - DB *sql.DB - DBConfig *DatabaseConfig - MyServerId uint32 - ErrorHandler ErrorHandler - Filter CopyFilter +type BinlogPosition struct { + // A binlog position emitted by the binlog-streamer consists of two parts: + // First, the last emitted event position, which refers to the event that + // we received from the MySQL master and that we hand to clients. Second, + // a position from which we can resume a binlog-streamer. + // Ideally, these two values would be the same, but in reality they are + // not, because some events are streamed in a series (e.g. DML events + // require a table-map events to be seen before). + // As a result, we always stream event positions as a pair - if a binlog + // streamer is resumed from an event that is not safe to resume from, we + // resume from the most recent (earlier) event from which we can safely + // resume and simply suppress emitting these events up to the point of the + // last event returned. + // + // the actual binlog position of an event emitted by the streamer + EventPosition mysql.Position + // the position from which one needs to point the streamer if we want to + // resume from after this event + ResumePosition mysql.Position +} + +func NewResumableBinlogPosition(pos mysql.Position) BinlogPosition { + return BinlogPosition{pos, pos} +} - TableSchema TableSchemaCache +func (p BinlogPosition) Compare(o BinlogPosition) int { + // comparison always happens on the actual event + return p.EventPosition.Compare(o.EventPosition) +} - binlogSyncer *replication.BinlogSyncer - binlogStreamer *replication.BinlogStreamer - lastStreamedBinlogPosition mysql.Position - targetBinlogPosition mysql.Position - lastProcessedEventTime time.Time - lastLagMetricEmittedTime time.Time +func (b BinlogPosition) String() string { + return fmt.Sprintf("Position(event %s, resume at %s)", b.EventPosition, b.ResumePosition) +} + +type BinlogStreamer struct { + DB *sql.DB + DBConfig *DatabaseConfig + MyServerId uint32 + ErrorHandler ErrorHandler + Filter CopyFilter + + TableSchema TableSchemaCache + + binlogSyncer *replication.BinlogSyncer + binlogStreamer *replication.BinlogStreamer + // what is the last event that we ever received from the streamer + lastStreamedBinlogPosition mysql.Position + // what is the last event that we received and from which it is possible + // to resume + lastResumeBinlogPosition mysql.Position + // if we have resumed from an earlier position than where we last streamed + // to (that is, if lastResumeBinlogPosition is before + // lastStreamedBinlogPosition when resuming), up to what event should we + // suppress emitting events + suppressEmitUpToBinlogPosition mysql.Position + // up to what position to we want to continue streaming (if a stop was + // requested) + targetBinlogPosition mysql.Position + lastProcessedEventTime time.Time + lastLagMetricEmittedTime time.Time stopRequested bool @@ -77,40 +122,49 @@ func (s *BinlogStreamer) createBinlogSyncer() error { return nil } -func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (mysql.Position, error) { +func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (BinlogPosition, error) { s.ensureLogger() currentPosition, err := ShowMasterStatusBinlogPosition(s.DB) if err != nil { s.logger.WithError(err).Error("failed to read current binlog position") - return mysql.Position{}, err + return BinlogPosition{}, err } - return s.ConnectBinlogStreamerToMysqlFrom(currentPosition) + return s.ConnectBinlogStreamerToMysqlFrom(NewResumableBinlogPosition(currentPosition)) } -func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition mysql.Position) (mysql.Position, error) { +func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition BinlogPosition) (BinlogPosition, error) { s.ensureLogger() err := s.createBinlogSyncer() if err != nil { - return mysql.Position{}, err + return BinlogPosition{}, err } - s.lastStreamedBinlogPosition = startFromBinlogPosition + if startFromBinlogPosition.EventPosition.Compare(startFromBinlogPosition.ResumePosition) < 0 { + err = fmt.Errorf("invalid resume position %s: last event must not be before resume position", startFromBinlogPosition) + return BinlogPosition{}, err + } + + s.lastStreamedBinlogPosition = startFromBinlogPosition.EventPosition + s.suppressEmitUpToBinlogPosition = startFromBinlogPosition.EventPosition + s.lastResumeBinlogPosition = startFromBinlogPosition.ResumePosition s.logger.WithFields(logrus.Fields{ - "file": s.lastStreamedBinlogPosition.Name, - "pos": s.lastStreamedBinlogPosition.Pos, + "stream.file": s.lastStreamedBinlogPosition.Name, + "stream.pos": s.lastStreamedBinlogPosition.Pos, + "resume.file": s.lastResumeBinlogPosition.Name, + "resume.pos": s.lastResumeBinlogPosition.Pos, }).Info("starting binlog streaming") - s.binlogStreamer, err = s.binlogSyncer.StartSync(s.lastStreamedBinlogPosition) + s.binlogStreamer, err = s.binlogSyncer.StartSync(s.lastResumeBinlogPosition) if err != nil { s.logger.WithError(err).Error("unable to start binlog streamer") - return mysql.Position{}, err + return BinlogPosition{}, err } - return s.lastStreamedBinlogPosition, err + return startFromBinlogPosition, err } func (s *BinlogStreamer) Run() { @@ -233,6 +287,10 @@ func (s *BinlogStreamer) updateLastStreamedPosAndTime(ev *replication.BinlogEven eventTime := time.Unix(int64(ev.Header.Timestamp), 0) s.lastProcessedEventTime = eventTime + if resumablePosition, evIsResumable := s.getResumePositionForEvent(ev); evIsResumable { + s.lastResumeBinlogPosition = resumablePosition + } + if time.Since(s.lastLagMetricEmittedTime) >= time.Second { lag := time.Since(eventTime) metrics.Gauge("BinlogStreamer.Lag", lag.Seconds(), nil, 1.0) @@ -240,6 +298,33 @@ func (s *BinlogStreamer) updateLastStreamedPosAndTime(ev *replication.BinlogEven } } +func (s *BinlogStreamer) getResumePositionForEvent(ev *replication.BinlogEvent) (resumablePosition mysql.Position, evIsResumable bool) { + // resuming from a RowsEvent is not possible, as it may be followed by + // another rows-event without a subsequent TableMapEvent. Thus, if we have + // a rows-event, we need to keep resuming from whatever last non-rows-event + // + // The same is true for TableMapEvents themselves, as we cannot resume right + // after the event: we need to re-stream the event itself to get ready for + // a RowsEvent + switch ev.Event.(type) { + case *replication.RowsEvent, *replication.TableMapEvent: + // it's not resumable - we need to return whatever was save to resume + // from before + resumablePosition = s.lastResumeBinlogPosition + default: + // it is safe to resume from here + evIsResumable = true + resumablePosition = mysql.Position{ + // The filename is only changed and visible during the RotateEvent, which + // is handled transparently in Run(). + Name: s.lastStreamedBinlogPosition.Name, + Pos: ev.Header.LogPos, + } + } + + return +} + func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent) error { eventTime := time.Unix(int64(ev.Header.Timestamp), 0) rowsEvent := ev.Event.(*replication.RowsEvent) @@ -256,12 +341,24 @@ func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent) error { Pos: ev.Header.LogPos, } + // we may still be searching for the first event to stream to listeners, if + // we resumed reading upstream events from an earlier event + if pos.Compare(s.suppressEmitUpToBinlogPosition) <= 0 { + return nil + } + + resumePosition, _ := s.getResumePositionForEvent(ev) + binlogPosition := BinlogPosition{ + EventPosition: pos, + ResumePosition: resumePosition, + } + table := s.TableSchema.Get(string(rowsEvent.Table.Schema), string(rowsEvent.Table.Table)) if table == nil { return nil } - dmlEvs, err := NewBinlogDMLEvents(table, ev, pos) + dmlEvs, err := NewBinlogDMLEvents(table, ev, binlogPosition) if err != nil { return err } diff --git a/dml_events.go b/dml_events.go index a260b3788..763d57141 100644 --- a/dml_events.go +++ b/dml_events.go @@ -8,7 +8,6 @@ import ( "github.com/shopspring/decimal" - "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/schema" ) @@ -43,13 +42,13 @@ type DMLEvent interface { OldValues() RowData NewValues() RowData PaginationKey() (uint64, error) - BinlogPosition() mysql.Position + BinlogPosition() BinlogPosition } // The base of DMLEvent to provide the necessary methods. type DMLEventBase struct { table *TableSchema - pos mysql.Position + pos BinlogPosition } func (e *DMLEventBase) Database() string { @@ -64,7 +63,7 @@ func (e *DMLEventBase) TableSchema() *TableSchema { return e.table } -func (e *DMLEventBase) BinlogPosition() mysql.Position { +func (e *DMLEventBase) BinlogPosition() BinlogPosition { return e.pos } @@ -73,7 +72,7 @@ type BinlogInsertEvent struct { *DMLEventBase } -func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) { insertEvents := make([]DMLEvent, len(rowsEvent.Rows)) for i, row := range rowsEvent.Rows { @@ -117,7 +116,7 @@ type BinlogUpdateEvent struct { *DMLEventBase } -func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) { // UPDATE events have two rows in the RowsEvent. The first row is the // entries of the old record (for WHERE) and the second row is the // entries of the new record (for SET). @@ -177,7 +176,7 @@ func (e *BinlogDeleteEvent) NewValues() RowData { return nil } -func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) { deleteEvents := make([]DMLEvent, len(rowsEvent.Rows)) for i, row := range rowsEvent.Rows { @@ -205,7 +204,7 @@ func (e *BinlogDeleteEvent) PaginationKey() (uint64, error) { return paginationKeyFromEventData(e.table, e.oldValues) } -func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos BinlogPosition) ([]DMLEvent, error) { rowsEvent := ev.Event.(*replication.RowsEvent) for _, row := range rowsEvent.Rows { diff --git a/ferry.go b/ferry.go index 65750a74e..4288fb5f6 100644 --- a/ferry.go +++ b/ferry.go @@ -493,7 +493,7 @@ func (f *Ferry) Start() error { // miss some records that are inserted between the time the // DataIterator determines the range of IDs to copy and the time that // the starting binlog coordinates are determined. - var pos siddontangmysql.Position + var pos BinlogPosition var err error if f.StateToResumeFrom != nil { pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.MinBinlogPosition()) @@ -504,9 +504,9 @@ func (f *Ferry) Start() error { return err } - // If we don't set this now, there is a race condition where Ghostferry + // If we don't set this now, there is a race condition where ghostferry // is terminated with some rows copied but no binlog events are written. - // This guarentees that we are able to restart from a valid location. + // This guarantees that we are able to restart from a valid location. f.StateTracker.UpdateLastWrittenBinlogPosition(pos) if f.inlineVerifier != nil { f.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(pos) @@ -749,7 +749,7 @@ func (f *Ferry) Progress() *Progress { s.Throttled = f.Throttler.Throttled() // Binlog Progress - s.LastSuccessfulBinlogPos = f.BinlogStreamer.lastStreamedBinlogPosition + s.LastSuccessfulBinlogPos = f.BinlogStreamer.GetLastStreamedBinlogPosition() s.BinlogStreamerLag = time.Now().Sub(f.BinlogStreamer.lastProcessedEventTime).Seconds() s.FinalBinlogPos = f.BinlogStreamer.targetBinlogPosition diff --git a/sharding/test/copy_filter_test.go b/sharding/test/copy_filter_test.go index d1b93e9c9..1585d8451 100644 --- a/sharding/test/copy_filter_test.go +++ b/sharding/test/copy_filter_test.go @@ -7,7 +7,6 @@ import ( "github.com/Shopify/ghostferry" "github.com/Shopify/ghostferry/sharding" - "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/schema" "github.com/stretchr/testify/suite" @@ -142,7 +141,7 @@ func (t *CopyFilterTestSuite) TestShardingValueTypes() { } for _, tenantId := range tenantIds { - dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), mysql.Position{}) + dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), ghostferry.BinlogPosition{}) applicable, err := t.filter.ApplicableEvent(dmlEvents[0]) t.Require().Nil(err) t.Require().True(applicable, fmt.Sprintf("value %t wasn't applicable", tenantId)) @@ -150,7 +149,7 @@ func (t *CopyFilterTestSuite) TestShardingValueTypes() { } func (t *CopyFilterTestSuite) TestInvalidShardingValueTypesErrors() { - dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), ghostferry.BinlogPosition{}) _, err = t.filter.ApplicableEvent(dmlEvents[0]) t.Require().Equal("parsing new sharding key: invalid type %!t(string=1)", err.Error()) } diff --git a/state_tracker.go b/state_tracker.go index c2eab7404..9746af222 100644 --- a/state_tracker.go +++ b/state_tracker.go @@ -5,8 +5,6 @@ import ( "math" "sync" "time" - - "github.com/siddontang/go-mysql/mysql" ) // StateTracker design @@ -36,13 +34,13 @@ type SerializableState struct { LastSuccessfulPaginationKeys map[string]uint64 CompletedTables map[string]bool - LastWrittenBinlogPosition mysql.Position - LastStoredBinlogPositionForInlineVerifier mysql.Position + LastWrittenBinlogPosition BinlogPosition + LastStoredBinlogPositionForInlineVerifier BinlogPosition BinlogVerifyStore BinlogVerifySerializedStore } -func (s *SerializableState) MinBinlogPosition() mysql.Position { - nilPosition := mysql.Position{} +func (s *SerializableState) MinBinlogPosition() BinlogPosition { + nilPosition := BinlogPosition{} if s.LastWrittenBinlogPosition == nilPosition { return s.LastStoredBinlogPositionForInlineVerifier } @@ -82,8 +80,8 @@ type StateTracker struct { BinlogRWMutex *sync.RWMutex CopyRWMutex *sync.RWMutex - lastWrittenBinlogPosition mysql.Position - lastStoredBinlogPositionForInlineVerifier mysql.Position + lastWrittenBinlogPosition BinlogPosition + lastStoredBinlogPositionForInlineVerifier BinlogPosition lastSuccessfulPaginationKeys map[string]uint64 completedTables map[string]bool @@ -113,14 +111,14 @@ func NewStateTrackerFromSerializedState(speedLogCount int, serializedState *Seri return s } -func (s *StateTracker) UpdateLastWrittenBinlogPosition(pos mysql.Position) { +func (s *StateTracker) UpdateLastWrittenBinlogPosition(pos BinlogPosition) { s.BinlogRWMutex.Lock() defer s.BinlogRWMutex.Unlock() s.lastWrittenBinlogPosition = pos } -func (s *StateTracker) UpdateLastStoredBinlogPositionForInlineVerifier(pos mysql.Position) { +func (s *StateTracker) UpdateLastStoredBinlogPositionForInlineVerifier(pos BinlogPosition) { s.BinlogRWMutex.Lock() defer s.BinlogRWMutex.Unlock() diff --git a/status_deprecated.go b/status_deprecated.go index 7e9bab288..412193f24 100644 --- a/status_deprecated.go +++ b/status_deprecated.go @@ -75,7 +75,7 @@ func FetchStatusDeprecated(f *Ferry, v Verifier) *StatusDeprecated { status.AutomaticCutover = f.Config.AutomaticCutover status.BinlogStreamerStopRequested = f.BinlogStreamer.stopRequested - status.LastSuccessfulBinlogPos = f.BinlogStreamer.lastStreamedBinlogPosition + status.LastSuccessfulBinlogPos = f.BinlogStreamer.GetLastStreamedBinlogPosition() status.TargetBinlogPos = f.BinlogStreamer.targetBinlogPosition status.Throttled = f.Throttler.Throttled() diff --git a/test/go/binlog_streamer_test.go b/test/go/binlog_streamer_test.go index cb2891bd7..c69e6872f 100644 --- a/test/go/binlog_streamer_test.go +++ b/test/go/binlog_streamer_test.go @@ -8,6 +8,7 @@ import ( "github.com/Shopify/ghostferry" "github.com/Shopify/ghostferry/testhelpers" + "github.com/siddontang/go-mysql/mysql" "github.com/stretchr/testify/suite" ) @@ -95,8 +96,8 @@ func (this *BinlogStreamerTestSuite) TestBinlogStreamerSetsBinlogPositionOnDMLEv this.binlogStreamer.AddEventListener(func(evs []ghostferry.DMLEvent) error { eventAsserted = true this.Require().Equal(1, len(evs)) - this.Require().True(strings.HasPrefix(evs[0].BinlogPosition().Name, "mysql-bin.")) - this.Require().True(evs[0].BinlogPosition().Pos > 0) + this.Require().True(strings.HasPrefix(evs[0].BinlogPosition().EventPosition.Name, "mysql-bin.")) + this.Require().True(evs[0].BinlogPosition().EventPosition.Pos > 0) this.binlogStreamer.FlushAndStop() return nil }) @@ -115,6 +116,16 @@ func (this *BinlogStreamerTestSuite) TestBinlogStreamerSetsBinlogPositionOnDMLEv this.Require().True(eventAsserted) } +func (this *BinlogStreamerTestSuite) TestResumingFromInvalidResumePositionAfterEventPosition() { + pos := ghostferry.BinlogPosition{ + EventPosition: mysql.Position{"mysql-bin.00002", 10}, + ResumePosition: mysql.Position{"mysql-bin.00002", 11}, + } + _, err := this.binlogStreamer.ConnectBinlogStreamerToMysqlFrom(pos) + this.Require().NotNil(err) + this.Require().Contains(err.Error(), "last event must not be before resume position") +} + func TestBinlogStreamerTestSuite(t *testing.T) { testhelpers.SetupTest() suite.Run(t, &BinlogStreamerTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}}) diff --git a/test/go/dml_events_test.go b/test/go/dml_events_test.go index b3d4e4bc6..b64a60fdb 100644 --- a/test/go/dml_events_test.go +++ b/test/go/dml_events_test.go @@ -4,7 +4,6 @@ import ( "testing" "github.com/Shopify/ghostferry" - "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/schema" "github.com/stretchr/testify/suite" @@ -61,7 +60,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQuery() { }, } - dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(2, len(dmlEvents)) @@ -80,7 +79,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventWithWrongColumnsReturnsErro Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -95,7 +94,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventMetadata() { Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) this.Require().Equal("test_schema", dmlEvents[0].Database()) @@ -115,7 +114,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventGeneratesUpdateQuery() { }, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(2, len(dmlEvents)) @@ -134,7 +133,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventWithWrongColumnsReturnsErro Rows: [][]interface{}{{1000}, {1000}}, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -152,7 +151,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventWithNull() { }, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -167,7 +166,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventMetadata() { Rows: [][]interface{}{{1000}, {1001}}, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) this.Require().Equal("test_schema", dmlEvents[0].Database()) @@ -185,7 +184,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventGeneratesDeleteQuery() { }, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(2, len(dmlEvents)) @@ -206,7 +205,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventWithNull() { }, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -221,7 +220,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventWithWrongColumnsReturnsErro Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -236,7 +235,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventMetadata() { Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) this.Require().Equal("test_schema", dmlEvents[0].Database()) diff --git a/test/go/state_tracker_test.go b/test/go/state_tracker_test.go index a39192a14..c67e6baf2 100644 --- a/test/go/state_tracker_test.go +++ b/test/go/state_tracker_test.go @@ -14,56 +14,56 @@ type StateTrackerTestSuite struct { func (s *StateTrackerTestSuite) TestMinBinlogPosition() { serializedState := &ghostferry.SerializableState{ - LastWrittenBinlogPosition: mysql.Position{ + LastWrittenBinlogPosition: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "mysql-bin.00003", Pos: 4, - }, + }), - LastStoredBinlogPositionForInlineVerifier: mysql.Position{ + LastStoredBinlogPositionForInlineVerifier: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "mysql-bin.00003", Pos: 10, - }, + }), } - s.Require().Equal(serializedState.MinBinlogPosition(), mysql.Position{"mysql-bin.00003", 4}) + s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00003", 4}) serializedState = &ghostferry.SerializableState{ - LastWrittenBinlogPosition: mysql.Position{ + LastWrittenBinlogPosition: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "mysql-bin.00003", Pos: 4, - }, + }), - LastStoredBinlogPositionForInlineVerifier: mysql.Position{ + LastStoredBinlogPositionForInlineVerifier: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "mysql-bin.00002", Pos: 10, - }, + }), } - s.Require().Equal(serializedState.MinBinlogPosition(), mysql.Position{"mysql-bin.00002", 10}) + s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00002", 10}) serializedState = &ghostferry.SerializableState{ - LastWrittenBinlogPosition: mysql.Position{ + LastWrittenBinlogPosition: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "", Pos: 0, - }, + }), - LastStoredBinlogPositionForInlineVerifier: mysql.Position{ + LastStoredBinlogPositionForInlineVerifier: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "mysql-bin.00002", Pos: 10, - }, + }), } - s.Require().Equal(serializedState.MinBinlogPosition(), mysql.Position{"mysql-bin.00002", 10}) + s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00002", 10}) serializedState = &ghostferry.SerializableState{ - LastStoredBinlogPositionForInlineVerifier: mysql.Position{ + LastStoredBinlogPositionForInlineVerifier: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "", Pos: 0, - }, + }), - LastWrittenBinlogPosition: mysql.Position{ + LastWrittenBinlogPosition: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "mysql-bin.00002", Pos: 10, - }, + }), } - s.Require().Equal(serializedState.MinBinlogPosition(), mysql.Position{"mysql-bin.00002", 10}) + s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00002", 10}) } func TestStateTrackerTestSuite(t *testing.T) { diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb index 6122d3cca..13ecb4413 100644 --- a/test/integration/interrupt_resume_test.rb +++ b/test/integration/interrupt_resume_test.rb @@ -121,8 +121,10 @@ def test_interrupt_resume_will_not_emit_binlog_position_for_inline_verifier_if_n dumped_state = ghostferry.run_expecting_interrupt assert_basic_fields_exist_in_dumped_state(dumped_state) - assert_equal "", dumped_state["LastStoredBinlogPositionForInlineVerifier"]["Name"] - assert_equal 0, dumped_state["LastStoredBinlogPositionForInlineVerifier"]["Pos"] + assert_equal "", dumped_state["LastStoredBinlogPositionForInlineVerifier"]["EventPosition"]["Name"] + assert_equal 0, dumped_state["LastStoredBinlogPositionForInlineVerifier"]["EventPosition"]["Pos"] + assert_equal "", dumped_state["LastStoredBinlogPositionForInlineVerifier"]["ResumePosition"]["Name"] + assert_equal 0, dumped_state["LastStoredBinlogPositionForInlineVerifier"]["ResumePosition"]["Pos"] end def test_interrupt_resume_inline_verifier_with_datawriter @@ -304,4 +306,56 @@ def test_interrupt_resume_inline_verifier_will_verify_additional_rows_changed_on error_line = ghostferry.error_lines.last assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: #{chosen_id} ] ", error_line["msg"] end + + def test_interrupt_resume_between_consecutive_rows_events + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) + + # create a series of rows-events that do not have interleaved table-map + # events. This is the case when multiple rows are affected in a single + # DML event. + # Since we are racing between applying rows and sending the shutdown event, + # we emit a whole bunch of them + num_batches = 20 + num_values_per_batch = 1000 + row_id = 0 + ghostferry.on_status(Ghostferry::Status::BINLOG_STREAMING_STARTED) do + for _batch_id in 0..num_batches do + insert_sql = "INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES " + for value_in_batch in 0..num_values_per_batch do + row_id += 1 + insert_sql += ", " if value_in_batch > 0 + insert_sql += "('data#{row_id}')" + end + source_db.query(insert_sql) + end + end + + ghostferry.on_status(Ghostferry::Status::AFTER_BINLOG_APPLY) do + # while we are emitting events in the loop above, try to inject a shutdown + # signal, hoping to interrupt between applying an INSERT and receiving the + # next table-map event + if row_id > 20 + ghostferry.term_and_wait_for_exit + end + end + + dumped_state = ghostferry.run_expecting_interrupt + + # We can verify if the race occurred (and we successfully worked around it) + # by looking at the dumped state (the LastWrittenBinlogPosition field should + # have different EventPosition and ResumePosition values). + # + # If this starts to make the test unreliable, we may want to remove this or + # further tweak the batch values. + resume_state = dumped_state["LastWrittenBinlogPosition"] + refute_equal resume_state["EventPosition"], resume_state["ResumePosition"] + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + # if we did not resume at a proper state, this invocation of ghostferry + # will crash, complaining that a rows-event is referring to an unknown + # table + ghostferry.run(dumped_state) + + assert_test_table_is_identical + end end