From f2b91fe2773889fcf2f9d0219428546f73c604ea Mon Sep 17 00:00:00 2001 From: Shaun Cooley Date: Sat, 22 Nov 2025 15:45:22 -0800 Subject: [PATCH 01/10] fix: harden Modbus Receive against truncated/extended frames - retry parsing with full buffered data when initial MBAP length is wrong - treat EOF as incomplete data and avoid dropping partial frames - keep discarding truly unparsable packets with diagnostic logging --- plc4go/internal/modbus/MessageCodec.go | 111 ++++++++++++++++++++----- 1 file changed, 92 insertions(+), 19 deletions(-) diff --git a/plc4go/internal/modbus/MessageCodec.go b/plc4go/internal/modbus/MessageCodec.go index 0e88064ebf..a6ee48b860 100644 --- a/plc4go/internal/modbus/MessageCodec.go +++ b/plc4go/internal/modbus/MessageCodec.go @@ -22,6 +22,7 @@ package modbus import ( "context" "encoding/base64" + "io" "github.com/pkg/errors" "github.com/rs/zerolog" @@ -97,43 +98,115 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { } // We need at least 6 bytes in order to know how big the packet is in total - if num, err := ti.GetNumBytesAvailableInBuffer(); (err == nil) && (num >= 6) { - m.log.Debug().Uint32("num", num).Msg("we got num readable bytes") - data, err := ti.PeekReadableBytes(ctx, 6) + num, err := ti.GetNumBytesAvailableInBuffer() + if err == nil { + if num < 6 { + // Don't have enough data yet... + return nil, nil + } + m.log.Trace().Uint32("num", num).Msgf("we got %d readable bytes", num) + + // Peek the MBAP header to extract the txcnid, protocol, and length + header, err := ti.PeekReadableBytes(ctx, 6) if err != nil { - m.log.Warn().Err(err).Msg("error peeking") - // TODO: Possibly clean up ... + m.log.Warn().Err(err).Msg("error peeking header") return nil, nil } - // Get the size of the entire packet - packetSize := (uint32(data[4]) << 8) + uint32(data[5]) + 6 + + // Interpret the length field (big endian) to determine the full packet size + packetSize := (uint32(header[4]) << 8) + uint32(header[5]) + 6 if num < packetSize { m.log.Debug(). Uint32("num", num). - Uint32("packetSize", packetSize).Msg("Not enough bytes. Got: num Need: packetSize") + Uint32("packetSize", packetSize).Msgf("Not enough bytes. Got: %d Need: %d. Waiting for more data...", num, packetSize) return nil, nil } - data, err = ti.Read(ctx, packetSize) + + // Peek read the entire frame + frameSlice, err := ti.PeekReadableBytes(ctx, packetSize) if err != nil { - // TODO: Possibly clean up ... + m.log.Warn().Err(err).Uint32("packetSize", packetSize).Msg("error peeking packet") return nil, nil } + + // Parse the frame ctxForModel := options.GetLoggerContextForModel(ctx, m.log, options.WithPassLoggerToModel(m.passLogToModel)) - tcpAdu, err := model.ModbusADUParse[model.ModbusTcpADU](ctxForModel, data, model.DriverType_MODBUS_TCP, true) + tcpAdu, err := model.ModbusADUParse[model.ModbusTcpADU](ctxForModel, frameSlice, model.DriverType_MODBUS_TCP, true) if err != nil { - dataStr := base64.StdEncoding.EncodeToString(data) + if errors.Is(err, io.EOF) { + // Incomplete frame, keep waiting for the remaining bytes. + m.log.Trace().Uint32("packetSize", packetSize).Msg("partial packet detected, awaiting more data") + return nil, nil + } + + // Did we perhaps only read part of a bigger frame? + if num > packetSize { + // Try reading everything + extendedSlice, extendedErr := ti.PeekReadableBytes(ctx, num) + if extendedErr == nil { + // Try parsing it all + extendedAdu, extendedParseErr := model.ModbusADUParse[model.ModbusTcpADU](ctxForModel, extendedSlice, model.DriverType_MODBUS_TCP, true) + if extendedParseErr == nil { + // Parse succeded... + // NOTE: MBAP length errors are rare in well-behaved devices, but they happen when + // firmware miscomputes the length field (or hard-codes to 6), mixes RTU framing + // with TCP transport, or when intermediate gateways/proxies truncate or merge + // frames. Duplicate reads and out-of-order slicing in buggy drivers can also leave + // stale data that makes the reported length disagree with the actual payload. + + // What is the actual ADU size parsed? + actualSize := uint32(extendedAdu.GetLengthInBytes(ctxForModel)) + if actualSize > num { + // We still need more data + m.log.Trace(). + Uint32("available", num). + Uint32("required", actualSize). + Msg("extended parsed frame requires more bytes, awaiting more data") + return nil, nil + } + + // Consume the actual size in bytes from the buffer (all peeks to here) + if _, consumeErr := ti.Read(ctx, actualSize); consumeErr != nil { + m.log.Debug().Err(consumeErr).Uint32("actualSize", actualSize).Msg("error consuming parsed frame") + return nil, nil + } + + // Success + m.log.Debug(). + Uint32("reportedSize", packetSize). + Uint32("actualSize", actualSize). + Msg("consumed extended frame with corrected size") + return extendedAdu, nil + } + if errors.Is(extendedParseErr, io.EOF) { + m.log.Trace().Uint32("buffered", num).Msg("extended parse incomplete, awaiting more data") + return nil, nil + } + } + } + + // Seems unparsable - log and discard + dataStr := base64.StdEncoding.EncodeToString(frameSlice) m.log.Warn().Err(err). - Str("data", dataStr). // Max PDU size is 253 bytes, and catching parse errors for inspection is important + Str("data", dataStr). Uint32("packetSize", packetSize). - Msg("error parsing") - // TODO: Possibly clean up ... + Msg("error parsing frame, discarding") + + // Discard the unparsable frame from the buffer + if _, discardErr := ti.Read(ctx, packetSize); discardErr != nil { + m.log.Debug().Err(discardErr).Uint32("packetSize", packetSize).Msg("error discarding unparsable frame") + } + return nil, nil + } + + // First parse attempt successful, consume the bytes from the buffer + if _, consumeErr := ti.Read(ctx, packetSize); consumeErr != nil { + m.log.Debug().Err(consumeErr).Uint32("packetSize", packetSize).Msg("error consuming parsed frame") return nil, nil } return tcpAdu, nil - } else if err != nil { - m.log.Warn().Err(err).Msg("Got error reading") - return nil, nil } - // TODO: maybe we return here a not enough error error + + m.log.Warn().Err(err).Msg("Got error reading") return nil, nil } From 77ef19dec9342e40c31d324bf69f83fc05fb3485 Mon Sep 17 00:00:00 2001 From: Shaun Cooley Date: Sat, 22 Nov 2025 22:38:35 -0800 Subject: [PATCH 02/10] fix: watch for and discard trailing CRCs from misbehaving gateways fix: attempt to resynchronize the read stream if desynchronized --- plc4go/internal/modbus/MessageCodec.go | 90 ++++++++++++++++++++++++-- plc4go/spi/utils/base64Stringer.go | 36 +++++++++++ 2 files changed, 121 insertions(+), 5 deletions(-) create mode 100644 plc4go/spi/utils/base64Stringer.go diff --git a/plc4go/internal/modbus/MessageCodec.go b/plc4go/internal/modbus/MessageCodec.go index a6ee48b860..1025d292d4 100644 --- a/plc4go/internal/modbus/MessageCodec.go +++ b/plc4go/internal/modbus/MessageCodec.go @@ -21,7 +21,6 @@ package modbus import ( "context" - "encoding/base64" "io" "github.com/pkg/errors" @@ -32,6 +31,7 @@ import ( _default "github.com/apache/plc4x/plc4go/spi/default" "github.com/apache/plc4x/plc4go/spi/options" "github.com/apache/plc4x/plc4go/spi/transports" + "github.com/apache/plc4x/plc4go/spi/utils" ) //go:generate go tool plc4xGenerator -type=MessageCodec @@ -113,10 +113,27 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { return nil, nil } + // Modbus TCP Protocol ID (bytes 2 and 3) must be 0x0000. + // If it is not, we are desynchronized. Discard 1 byte and retry to find alignment. + if header[2] != 0x00 || header[3] != 0x00 { + m.log.Warn(). + Hex("proto", header[2:4]). + Msg("Invalid Protocol ID (expected 0). Stream desynchronized. Discarding 1 byte to realign.") + + // Burn 1 byte to shift the window and try again next cycle + ti.Read(ctx, 1) + return nil, nil + } + // Interpret the length field (big endian) to determine the full packet size packetSize := (uint32(header[4]) << 8) + uint32(header[5]) + 6 if num < packetSize { + var peekedBytes []byte + if m.log.Debug().Enabled() { + peekedBytes, _ = ti.PeekReadableBytes(ctx, num) + } m.log.Debug(). + Stringer("currentData", utils.Base64Stringer(peekedBytes)). Uint32("num", num). Uint32("packetSize", packetSize).Msgf("Not enough bytes. Got: %d Need: %d. Waiting for more data...", num, packetSize) return nil, nil @@ -135,7 +152,7 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { if err != nil { if errors.Is(err, io.EOF) { // Incomplete frame, keep waiting for the remaining bytes. - m.log.Trace().Uint32("packetSize", packetSize).Msg("partial packet detected, awaiting more data") + m.log.Debug().Uint32("packetSize", packetSize).Stringer("data", utils.Base64Stringer(frameSlice)).Msg("partial packet detected, awaiting more data") return nil, nil } @@ -172,10 +189,16 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { } // Success - m.log.Debug(). + m.log.Warn(). Uint32("reportedSize", packetSize). Uint32("actualSize", actualSize). + Stringer("extendedData", utils.Base64Stringer(extendedSlice)). + Stringer("consumedData", utils.Base64Stringer(extendedSlice[:actualSize])). Msg("consumed extended frame with corrected size") + + // Check for trailing CRC garbage + m.trailingCRCGarbageCheck(ctx, ti, extendedSlice[:actualSize]) + return extendedAdu, nil } if errors.Is(extendedParseErr, io.EOF) { @@ -186,9 +209,8 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { } // Seems unparsable - log and discard - dataStr := base64.StdEncoding.EncodeToString(frameSlice) m.log.Warn().Err(err). - Str("data", dataStr). + Stringer("data", utils.Base64Stringer(frameSlice)). Uint32("packetSize", packetSize). Msg("error parsing frame, discarding") @@ -196,6 +218,10 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { if _, discardErr := ti.Read(ctx, packetSize); discardErr != nil { m.log.Debug().Err(discardErr).Uint32("packetSize", packetSize).Msg("error discarding unparsable frame") } + + // Check if the unparsable junk also had a CRC tail + m.trailingCRCGarbageCheck(ctx, ti, frameSlice) + return nil, nil } @@ -204,9 +230,63 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { m.log.Debug().Err(consumeErr).Uint32("packetSize", packetSize).Msg("error consuming parsed frame") return nil, nil } + + // Check for trailing CRC garbage + m.trailingCRCGarbageCheck(ctx, ti, frameSlice) + return tcpAdu, nil } m.log.Warn().Err(err).Msg("Got error reading") return nil, nil } + +// trailingCRCGarbageCheck checks for trailing Modbus RTU CRC bytes that may have leaked +// into the TCP stream, and discards them to maintain synchronization. +func (m *MessageCodec) trailingCRCGarbageCheck(ctx context.Context, ti transports.TransportInstance, consumedBytes []byte) { + // We only check if we have consumed at least 7 bytes (Header 6 + UnitID 1) + // and if there are at least 2 bytes waiting in the buffer. + if len(consumedBytes) > 6 { + bytesAvailable, err := ti.GetNumBytesAvailableInBuffer() + if err == nil && bytesAvailable >= 2 { + // Peek the next 2 bytes + nextTwoBytes, peekErr := ti.PeekReadableBytes(ctx, 2) + if peekErr == nil { + // Calculate what the CRC *would* be for the RTU portion of the frame we just read. + // RTU Frame = [UnitID] + [PDU] + // This corresponds to consumedBytes[6:] (Skipping 6-byte MBAP header) + rtuPayload := consumedBytes[6:] + expectedCRC := m.calculateModbusCRC(rtuPayload) + + // Compare: If the next 2 bytes in the buffer match the calculated CRC, it's garbage. + if nextTwoBytes[0] == expectedCRC[0] && nextTwoBytes[1] == expectedCRC[1] { + m.log.Warn(). + Stringer("data", utils.Base64Stringer(consumedBytes)). + Hex("crc", nextTwoBytes). + Msg("Detected leaked Modbus RTU CRC at end of TCP frame. Discarding to maintain sync.") + + // Discard the 2 CRC bytes + if _, discardErr := ti.Read(ctx, 2); discardErr != nil { + m.log.Warn().Err(discardErr).Msg("Error discarding CRC bytes") + } + } + } + } + } +} + +func (m *MessageCodec) calculateModbusCRC(data []byte) []byte { + crc := uint16(0xFFFF) + for _, b := range data { + crc ^= uint16(b) + for i := 0; i < 8; i++ { + if (crc & 0x0001) != 0 { + crc = (crc >> 1) ^ 0xA001 + } else { + crc >>= 1 + } + } + } + // Return as Little Endian (Low Byte, High Byte) as per Modbus RTU + return []byte{uint8(crc & 0xFF), uint8(crc >> 8)} +} diff --git a/plc4go/spi/utils/base64Stringer.go b/plc4go/spi/utils/base64Stringer.go new file mode 100644 index 0000000000..098421debb --- /dev/null +++ b/plc4go/spi/utils/base64Stringer.go @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package utils + +import ( + "encoding/base64" +) + +// Base64Stringer is a helper to print byte slices as base64 encoded strings +// and can be used in structured logging, so that String() is only called when +// the log level requires it. +type Base64Stringer []byte + +func (b Base64Stringer) String() string { + if b == nil { + return "" + } + return base64.StdEncoding.EncodeToString(b) +} From 9a256e440635fd3b76e5cbeb172f3e05bd0133d0 Mon Sep 17 00:00:00 2001 From: Shaun Cooley Date: Sun, 23 Nov 2025 01:29:08 -0800 Subject: [PATCH 03/10] fix: additional sanity checks on the MBAP fix: io.EOF is a trap, we checked fragmentation above --- plc4go/internal/modbus/MessageCodec.go | 51 ++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/plc4go/internal/modbus/MessageCodec.go b/plc4go/internal/modbus/MessageCodec.go index 1025d292d4..aa9ef92a74 100644 --- a/plc4go/internal/modbus/MessageCodec.go +++ b/plc4go/internal/modbus/MessageCodec.go @@ -113,6 +113,7 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { return nil, nil } + // --- SANITY CHECK 1: Validate Protocol ID --- // Modbus TCP Protocol ID (bytes 2 and 3) must be 0x0000. // If it is not, we are desynchronized. Discard 1 byte and retry to find alignment. if header[2] != 0x00 || header[3] != 0x00 { @@ -126,8 +127,50 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { } // Interpret the length field (big endian) to determine the full packet size - packetSize := (uint32(header[4]) << 8) + uint32(header[5]) + 6 + payloadLength := (uint32(header[4]) << 8) + uint32(header[5]) + packetSize := payloadLength + 6 + + // --- SANITY CHECK 2: Minimum Length --- + // Payload must contain at least UnitID (1 byte) + FunctionCode (1 byte). + // Any length < 2 is invalid (e.g. 0 or 1). + if payloadLength < 2 { + m.log.Warn(). + Uint32("len", payloadLength). + Msg("Invalid packet length (<2). Stream desynchronized. Discarding 1 byte to realign.") + + // Burn 1 byte to shift the window and try again next cycle + ti.Read(ctx, 1) + return nil, nil + } + + // --- SANITY CHECK 3: High-Probablity Garbage Detection --- + // If the packet is "Huge" (larger than standard Modbus TCP frame), we verify the Function Code. + // Valid Modbus functions are 1-127 (requests) or 129-255 (exceptions). 0 is never valid. + // If we see a huge length AND Function Code 0, it is 100% garbage masquerading as a header. + // Similarly, exceptions are always 9 bytes, so we check that as well. + if payloadLength > 260 { + // Peek the Function Code (Byte 7 of the full frame, or Byte 7 of the header peek if we had enough) + // We already peeked 6 bytes. We need to peek the next 2 (UnitID + Func) to check validity. + if num >= 8 { + extendedPeek, _ := ti.PeekReadableBytes(ctx, 8) + functionCode := extendedPeek[7] + if functionCode == 0 || (functionCode&0x80 == 0x80 && payloadLength != 3) { + m.log.Warn(). + Stringer("data", utils.Base64Stringer(extendedPeek)). + Uint32("len", payloadLength). + Uint8("func", functionCode). + Msg("Huge packet with Invalid Function Code (0). Garbage detected. Discarding 1 byte.") + + // Burn 1 byte to shift the window and try again next cycle + ti.Read(ctx, 1) + return nil, nil + } + } + } + + // Check for TCP fragmentation if num < packetSize { + // Wait for more data (standard TCP fragmentation handling) var peekedBytes []byte if m.log.Debug().Enabled() { peekedBytes, _ = ti.PeekReadableBytes(ctx, num) @@ -150,12 +193,6 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { ctxForModel := options.GetLoggerContextForModel(ctx, m.log, options.WithPassLoggerToModel(m.passLogToModel)) tcpAdu, err := model.ModbusADUParse[model.ModbusTcpADU](ctxForModel, frameSlice, model.DriverType_MODBUS_TCP, true) if err != nil { - if errors.Is(err, io.EOF) { - // Incomplete frame, keep waiting for the remaining bytes. - m.log.Debug().Uint32("packetSize", packetSize).Stringer("data", utils.Base64Stringer(frameSlice)).Msg("partial packet detected, awaiting more data") - return nil, nil - } - // Did we perhaps only read part of a bigger frame? if num > packetSize { // Try reading everything From f8298862ea32c21952737e4e0d8667b911c655a0 Mon Sep 17 00:00:00 2001 From: Shaun Cooley Date: Sun, 23 Nov 2025 12:17:09 -0800 Subject: [PATCH 04/10] fix: deal with TCP keep-alive padding that leaks from the kernel --- plc4go/internal/modbus/MessageCodec.go | 132 +++++++++++-------------- 1 file changed, 58 insertions(+), 74 deletions(-) diff --git a/plc4go/internal/modbus/MessageCodec.go b/plc4go/internal/modbus/MessageCodec.go index aa9ef92a74..e041071d25 100644 --- a/plc4go/internal/modbus/MessageCodec.go +++ b/plc4go/internal/modbus/MessageCodec.go @@ -115,15 +115,9 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { // --- SANITY CHECK 1: Validate Protocol ID --- // Modbus TCP Protocol ID (bytes 2 and 3) must be 0x0000. - // If it is not, we are desynchronized. Discard 1 byte and retry to find alignment. if header[2] != 0x00 || header[3] != 0x00 { - m.log.Warn(). - Hex("proto", header[2:4]). - Msg("Invalid Protocol ID (expected 0). Stream desynchronized. Discarding 1 byte to realign.") - - // Burn 1 byte to shift the window and try again next cycle - ti.Read(ctx, 1) - return nil, nil + // Connection is desynchronized. Attempt to resync or discard the connection. + return m.attemptResync(ctx, ti, header, num) } // Interpret the length field (big endian) to determine the full packet size @@ -134,13 +128,8 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { // Payload must contain at least UnitID (1 byte) + FunctionCode (1 byte). // Any length < 2 is invalid (e.g. 0 or 1). if payloadLength < 2 { - m.log.Warn(). - Uint32("len", payloadLength). - Msg("Invalid packet length (<2). Stream desynchronized. Discarding 1 byte to realign.") - - // Burn 1 byte to shift the window and try again next cycle - ti.Read(ctx, 1) - return nil, nil + // Connection is desynchronized. Attempt to resync or discard the connection. + return m.attemptResync(ctx, ti, header, num) } // --- SANITY CHECK 3: High-Probablity Garbage Detection --- @@ -155,20 +144,13 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { extendedPeek, _ := ti.PeekReadableBytes(ctx, 8) functionCode := extendedPeek[7] if functionCode == 0 || (functionCode&0x80 == 0x80 && payloadLength != 3) { - m.log.Warn(). - Stringer("data", utils.Base64Stringer(extendedPeek)). - Uint32("len", payloadLength). - Uint8("func", functionCode). - Msg("Huge packet with Invalid Function Code (0). Garbage detected. Discarding 1 byte.") - - // Burn 1 byte to shift the window and try again next cycle - ti.Read(ctx, 1) - return nil, nil + // Connection is desynchronized. Attempt to resync or discard the connection. + return m.attemptResync(ctx, ti, header, num) } } } - // Check for TCP fragmentation + // Yield on TCP fragmentation if num < packetSize { // Wait for more data (standard TCP fragmentation handling) var peekedBytes []byte @@ -207,6 +189,7 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { // with TCP transport, or when intermediate gateways/proxies truncate or merge // frames. Duplicate reads and out-of-order slicing in buggy drivers can also leave // stale data that makes the reported length disagree with the actual payload. + // ... looking at you old Moxa and Lantronix "transparent" serial-to-tcp gateways. // What is the actual ADU size parsed? actualSize := uint32(extendedAdu.GetLengthInBytes(ctxForModel)) @@ -233,9 +216,6 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { Stringer("consumedData", utils.Base64Stringer(extendedSlice[:actualSize])). Msg("consumed extended frame with corrected size") - // Check for trailing CRC garbage - m.trailingCRCGarbageCheck(ctx, ti, extendedSlice[:actualSize]) - return extendedAdu, nil } if errors.Is(extendedParseErr, io.EOF) { @@ -249,28 +229,22 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { m.log.Warn().Err(err). Stringer("data", utils.Base64Stringer(frameSlice)). Uint32("packetSize", packetSize). - Msg("error parsing frame, discarding") + Msg("Error parsing frame. Discarding invalid frame.") // Discard the unparsable frame from the buffer if _, discardErr := ti.Read(ctx, packetSize); discardErr != nil { m.log.Debug().Err(discardErr).Uint32("packetSize", packetSize).Msg("error discarding unparsable frame") } - // Check if the unparsable junk also had a CRC tail - m.trailingCRCGarbageCheck(ctx, ti, frameSlice) - return nil, nil } - // First parse attempt successful, consume the bytes from the buffer + // First parse attempt successful, double check we used all the bytes if _, consumeErr := ti.Read(ctx, packetSize); consumeErr != nil { m.log.Debug().Err(consumeErr).Uint32("packetSize", packetSize).Msg("error consuming parsed frame") return nil, nil } - // Check for trailing CRC garbage - m.trailingCRCGarbageCheck(ctx, ti, frameSlice) - return tcpAdu, nil } @@ -278,52 +252,62 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { return nil, nil } -// trailingCRCGarbageCheck checks for trailing Modbus RTU CRC bytes that may have leaked -// into the TCP stream, and discards them to maintain synchronization. -func (m *MessageCodec) trailingCRCGarbageCheck(ctx context.Context, ti transports.TransportInstance, consumedBytes []byte) { - // We only check if we have consumed at least 7 bytes (Header 6 + UnitID 1) - // and if there are at least 2 bytes waiting in the buffer. - if len(consumedBytes) > 6 { - bytesAvailable, err := ti.GetNumBytesAvailableInBuffer() - if err == nil && bytesAvailable >= 2 { - // Peek the next 2 bytes - nextTwoBytes, peekErr := ti.PeekReadableBytes(ctx, 2) - if peekErr == nil { - // Calculate what the CRC *would* be for the RTU portion of the frame we just read. - // RTU Frame = [UnitID] + [PDU] - // This corresponds to consumedBytes[6:] (Skipping 6-byte MBAP header) - rtuPayload := consumedBytes[6:] - expectedCRC := m.calculateModbusCRC(rtuPayload) - - // Compare: If the next 2 bytes in the buffer match the calculated CRC, it's garbage. - if nextTwoBytes[0] == expectedCRC[0] && nextTwoBytes[1] == expectedCRC[1] { +func (m *MessageCodec) attemptResync(ctx context.Context, ti transports.TransportInstance, peekedBytes []byte, availableBytes uint32) (spi.Message, error) { + // Can we peek more? + if availableBytes > uint32(len(peekedBytes)) { + peekedBytes, _ = ti.PeekReadableBytes(ctx, availableBytes) + } + + // Some kernels leak tcp keep alive ethernet frame padding ot the stream. + // If we see a series of zero bytes, just discard them. + if allZeros(peekedBytes) { + len := len(peekedBytes) + m.log.Warn(). + Int("length", len). + Msgf("Stream desynchronized. Discarding %d byte NIL packet to to realign.", len) + + // Discard all zero bytes + ti.Read(ctx, uint32(len)) + + // Keep using the stream + return nil, nil + } + + // Search for a valid-ish MBAP header in the stream + for i := 1; i < len(peekedBytes)-8; i++ { + // Check Protocol ID + if peekedBytes[i+2] == 0x00 && peekedBytes[i+3] == 0x00 { + // Check Length field + payloadLength := (uint32(peekedBytes[i+4]) << 8) + uint32(peekedBytes[i+5]) + if payloadLength >= 2 { + // Check for a valid Function Code + functionCode := peekedBytes[i+7] + if functionCode != 0 && (functionCode < 0x80 || payloadLength == 3) { m.log.Warn(). - Stringer("data", utils.Base64Stringer(consumedBytes)). - Hex("crc", nextTwoBytes). - Msg("Detected leaked Modbus RTU CRC at end of TCP frame. Discarding to maintain sync.") + Int("offset", i). + Uint32("len", payloadLength). + Uint8("func", functionCode). + Msgf("Stream desynchronized. Found potential MBAP header at offset %d. Discarding %d bytes to realign.", i, i) - // Discard the 2 CRC bytes - if _, discardErr := ti.Read(ctx, 2); discardErr != nil { - m.log.Warn().Err(discardErr).Msg("Error discarding CRC bytes") - } + // Discard up to the found header + ti.Read(ctx, uint32(i)) + + // Keep using the stream + return nil, nil } } } } + + m.log.Warn().Msgf("Stream desynchronized. No valid MBAP header found. Giving up on connection.") + return nil, errors.New("stream desynchronized") } -func (m *MessageCodec) calculateModbusCRC(data []byte) []byte { - crc := uint16(0xFFFF) - for _, b := range data { - crc ^= uint16(b) - for i := 0; i < 8; i++ { - if (crc & 0x0001) != 0 { - crc = (crc >> 1) ^ 0xA001 - } else { - crc >>= 1 - } +func allZeros(b []byte) bool { + for _, v := range b { + if v != 0 { + return false } } - // Return as Little Endian (Low Byte, High Byte) as per Modbus RTU - return []byte{uint8(crc & 0xFF), uint8(crc >> 8)} + return true } From dfaedcaa4781afe58d68b97f88e88b717906f59a Mon Sep 17 00:00:00 2001 From: Shaun Cooley Date: Sun, 23 Nov 2025 13:22:53 -0800 Subject: [PATCH 05/10] refactor: attempting to simply logic while still covering desync handling --- plc4go/internal/modbus/MessageCodec.go | 391 ++++++++++++++----------- 1 file changed, 217 insertions(+), 174 deletions(-) diff --git a/plc4go/internal/modbus/MessageCodec.go b/plc4go/internal/modbus/MessageCodec.go index e041071d25..f67654e0e6 100644 --- a/plc4go/internal/modbus/MessageCodec.go +++ b/plc4go/internal/modbus/MessageCodec.go @@ -21,7 +21,7 @@ package modbus import ( "context" - "io" + "fmt" "github.com/pkg/errors" "github.com/rs/zerolog" @@ -84,6 +84,7 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { return nil, errors.New("Transport instance not connected") } + // 1. Fill the buffer if err := ti.FillBuffer(ctx, func(pos uint, currentByte byte, reader transports.ExtendedReader) bool { m.log.Trace().Uint("pos", pos).Uint8("currentByte", currentByte).Msg("filling") numBytesAvailable, err := ti.GetNumBytesAvailableInBuffer() @@ -91,223 +92,265 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { m.log.Debug().Err(err).Msg("error getting available bytes") return false } - m.log.Trace().Uint32("numBytesAvailable", numBytesAvailable).Msg("check available bytes < 6") return numBytesAvailable < 6 }); err != nil { m.log.Debug().Err(err).Msg("error filling buffer") } - // We need at least 6 bytes in order to know how big the packet is in total - num, err := ti.GetNumBytesAvailableInBuffer() - if err == nil { - if num < 6 { - // Don't have enough data yet... - return nil, nil - } - m.log.Trace().Uint32("num", num).Msgf("we got %d readable bytes", num) + // 2. Check buffer status + numBytesAvail, err := ti.GetNumBytesAvailableInBuffer() + if err != nil { + m.log.Warn().Err(err).Msg("Error getting bytes in buffer") + return nil, nil + } - // Peek the MBAP header to extract the txcnid, protocol, and length + // ------------------------------------------------------------------------- + // Discard NIL Packets (Keep-Alives w/padding that leaked into stream) + // ------------------------------------------------------------------------- + for { + if numBytesAvail < 6 { + break + } header, err := ti.PeekReadableBytes(ctx, 6) if err != nil { m.log.Warn().Err(err).Msg("error peeking header") return nil, nil } - // --- SANITY CHECK 1: Validate Protocol ID --- - // Modbus TCP Protocol ID (bytes 2 and 3) must be 0x0000. - if header[2] != 0x00 || header[3] != 0x00 { - // Connection is desynchronized. Attempt to resync or discard the connection. - return m.attemptResync(ctx, ti, header, num) - } - - // Interpret the length field (big endian) to determine the full packet size - payloadLength := (uint32(header[4]) << 8) + uint32(header[5]) - packetSize := payloadLength + 6 + // Check for 6 bytes of zeros + if header[0] == 0 && header[1] == 0 && header[2] == 0 && + header[3] == 0 && header[4] == 0 && header[5] == 0 { - // --- SANITY CHECK 2: Minimum Length --- - // Payload must contain at least UnitID (1 byte) + FunctionCode (1 byte). - // Any length < 2 is invalid (e.g. 0 or 1). - if payloadLength < 2 { - // Connection is desynchronized. Attempt to resync or discard the connection. - return m.attemptResync(ctx, ti, header, num) - } + m.log.Debug().Msg("Detected NIL Packet (Keep-Alive). Discarding 6 bytes.") + if _, err := ti.Read(ctx, 6); err != nil { + m.log.Warn().Err(err).Msg("Error discarding NIL packet") + return nil, nil + } - // --- SANITY CHECK 3: High-Probablity Garbage Detection --- - // If the packet is "Huge" (larger than standard Modbus TCP frame), we verify the Function Code. - // Valid Modbus functions are 1-127 (requests) or 129-255 (exceptions). 0 is never valid. - // If we see a huge length AND Function Code 0, it is 100% garbage masquerading as a header. - // Similarly, exceptions are always 9 bytes, so we check that as well. - if payloadLength > 260 { - // Peek the Function Code (Byte 7 of the full frame, or Byte 7 of the header peek if we had enough) - // We already peeked 6 bytes. We need to peek the next 2 (UnitID + Func) to check validity. - if num >= 8 { - extendedPeek, _ := ti.PeekReadableBytes(ctx, 8) - functionCode := extendedPeek[7] - if functionCode == 0 || (functionCode&0x80 == 0x80 && payloadLength != 3) { - // Connection is desynchronized. Attempt to resync or discard the connection. - return m.attemptResync(ctx, ti, header, num) - } + // Refresh num and loop + numBytesAvail, err = ti.GetNumBytesAvailableInBuffer() + if err != nil { + return nil, nil } + continue } + break + } - // Yield on TCP fragmentation - if num < packetSize { - // Wait for more data (standard TCP fragmentation handling) - var peekedBytes []byte - if m.log.Debug().Enabled() { - peekedBytes, _ = ti.PeekReadableBytes(ctx, num) - } - m.log.Debug(). - Stringer("currentData", utils.Base64Stringer(peekedBytes)). - Uint32("num", num). - Uint32("packetSize", packetSize).Msgf("Not enough bytes. Got: %d Need: %d. Waiting for more data...", num, packetSize) - return nil, nil + if numBytesAvail < 6 { + return nil, nil + } + + // Re-peek the header at the current head + header, err := ti.PeekReadableBytes(ctx, 6) + if err != nil { + m.log.Warn().Err(err).Msg("error peeking header") + return nil, nil + } + + // ------------------------------------------------------------------------- + // MBAP SANITY CHECKS + // ------------------------------------------------------------------------- + + // --- CHECK 1: Protocol ID Validation --- + if header[2] != 0x00 || header[3] != 0x00 { + return m.handleDesync(ctx, "Invalid Protocol ID", map[string]interface{}{ + "p1": header[2], + "p2": header[3], + "data": utils.Base64Stringer(header), + }) + } + + // Length field is big endian encoded WORD + payloadLength := (uint32(header[4]) << 8) + uint32(header[5]) + packetSize := payloadLength + 6 + + // --- CHECK 2: Minimum Length --- + if payloadLength < 2 { + return m.handleDesync(ctx, "Invalid packet length (<2)", map[string]interface{}{ + "len": payloadLength, + "data": utils.Base64Stringer(header), + }) + } + + // --- CHECK 3: Function Code 0 (False Header) --- + if numBytesAvail >= 8 { + peekBytes, err := ti.PeekReadableBytes(ctx, 8) + if err == nil && peekBytes[7] == 0 { + return m.handleDesync(ctx, "Invalid Function Code (0)", map[string]interface{}{ + "len": payloadLength, + "data": utils.Base64Stringer(peekBytes), + }) } + } - // Peek read the entire frame - frameSlice, err := ti.PeekReadableBytes(ctx, packetSize) - if err != nil { - m.log.Warn().Err(err).Uint32("packetSize", packetSize).Msg("error peeking packet") - return nil, nil + // ------------------------------------------------------------------------- + // PARSING + // ------------------------------------------------------------------------- + + // Yield on TCP fragmentation + if numBytesAvail < packetSize { + // Wait for more data (standard TCP fragmentation handling) + var peekedBytes []byte + if m.log.Debug().Enabled() { + peekedBytes, _ = ti.PeekReadableBytes(ctx, numBytesAvail) } + m.log.Debug(). + Stringer("dataFragment", utils.Base64Stringer(peekedBytes)). + Uint32("num", numBytesAvail). + Uint32("packetSize", packetSize).Msgf("Received fragment. Got: %d Need: %d. Waiting for more data...", numBytesAvail, packetSize) + return nil, nil + } - // Parse the frame - ctxForModel := options.GetLoggerContextForModel(ctx, m.log, options.WithPassLoggerToModel(m.passLogToModel)) - tcpAdu, err := model.ModbusADUParse[model.ModbusTcpADU](ctxForModel, frameSlice, model.DriverType_MODBUS_TCP, true) - if err != nil { - // Did we perhaps only read part of a bigger frame? - if num > packetSize { - // Try reading everything - extendedSlice, extendedErr := ti.PeekReadableBytes(ctx, num) - if extendedErr == nil { - // Try parsing it all - extendedAdu, extendedParseErr := model.ModbusADUParse[model.ModbusTcpADU](ctxForModel, extendedSlice, model.DriverType_MODBUS_TCP, true) - if extendedParseErr == nil { - // Parse succeded... - // NOTE: MBAP length errors are rare in well-behaved devices, but they happen when - // firmware miscomputes the length field (or hard-codes to 6), mixes RTU framing - // with TCP transport, or when intermediate gateways/proxies truncate or merge - // frames. Duplicate reads and out-of-order slicing in buggy drivers can also leave - // stale data that makes the reported length disagree with the actual payload. - // ... looking at you old Moxa and Lantronix "transparent" serial-to-tcp gateways. - - // What is the actual ADU size parsed? - actualSize := uint32(extendedAdu.GetLengthInBytes(ctxForModel)) - if actualSize > num { - // We still need more data - m.log.Trace(). - Uint32("available", num). - Uint32("required", actualSize). - Msg("extended parsed frame requires more bytes, awaiting more data") - return nil, nil - } - - // Consume the actual size in bytes from the buffer (all peeks to here) - if _, consumeErr := ti.Read(ctx, actualSize); consumeErr != nil { - m.log.Debug().Err(consumeErr).Uint32("actualSize", actualSize).Msg("error consuming parsed frame") - return nil, nil - } - - // Success - m.log.Warn(). - Uint32("reportedSize", packetSize). - Uint32("actualSize", actualSize). - Stringer("extendedData", utils.Base64Stringer(extendedSlice)). - Stringer("consumedData", utils.Base64Stringer(extendedSlice[:actualSize])). - Msg("consumed extended frame with corrected size") - - return extendedAdu, nil + // Read the entire frame + frameSlice, err := ti.PeekReadableBytes(ctx, packetSize) + if err != nil { + m.log.Warn().Err(err).Msg("Error peeking frame slice") + return nil, nil + } + + // Parse the frame + ctxForModel := options.GetLoggerContextForModel(ctx, m.log, options.WithPassLoggerToModel(m.passLogToModel)) + tcpAdu, err := model.ModbusADUParse[model.ModbusTcpADU](ctxForModel, frameSlice, model.DriverType_MODBUS_TCP, true) + if err != nil { + // Parser wasn't happy at packetSize, if there is more available, try parsing all of it + if numBytesAvail > packetSize { + extendedSlice, extendedErr := ti.PeekReadableBytes(ctx, numBytesAvail) + if extendedErr == nil { + // Try parsing it all + extendedAdu, extendedParseErr := model.ModbusADUParse[model.ModbusTcpADU](ctxForModel, extendedSlice, model.DriverType_MODBUS_TCP, true) + if extendedParseErr == nil { + // Parse succeded... + // NOTE: MBAP length errors are rare in well-behaved devices, but they happen when + // firmware miscomputes the length field (or hard-codes to 6), mixes RTU framing + // with TCP transport, or when intermediate gateways/proxies truncate or merge + // frames. Duplicate reads and out-of-order slicing in buggy drivers can also leave + // stale data that makes the reported length disagree with the actual payload. + // ... looking at you old Moxa and Lantronix "transparent" serial-to-tcp gateways. + + // What is the actual ADU size parsed? + actualSize := uint32(extendedAdu.GetLengthInBytes(ctxForModel)) + if actualSize > numBytesAvail { + return nil, nil } - if errors.Is(extendedParseErr, io.EOF) { - m.log.Trace().Uint32("buffered", num).Msg("extended parse incomplete, awaiting more data") + + m.log.Info(). + Uint32("reportedSize", packetSize). + Uint32("actualSize", actualSize). + Stringer("extendedData", utils.Base64Stringer(extendedSlice)). + Stringer("consumedData", utils.Base64Stringer(extendedSlice[:actualSize])). + Msg("MBAP had wrong/hardcoded length. Consumed extended frame with corrected size") + if _, consumeErr := ti.Read(ctx, actualSize); consumeErr != nil { + m.log.Debug().Err(consumeErr).Msg("error consuming extended frame") return nil, nil } + return extendedAdu, nil } } + } - // Seems unparsable - log and discard - m.log.Warn().Err(err). - Stringer("data", utils.Base64Stringer(frameSlice)). - Uint32("packetSize", packetSize). - Msg("Error parsing frame. Discarding invalid frame.") - - // Discard the unparsable frame from the buffer - if _, discardErr := ti.Read(ctx, packetSize); discardErr != nil { - m.log.Debug().Err(discardErr).Uint32("packetSize", packetSize).Msg("error discarding unparsable frame") - } + // Seems unparsable - log and discard + m.log.Warn().Err(err). + Stringer("data", utils.Base64Stringer(frameSlice)). + Uint32("packetSize", packetSize). + Msg("Error parsing frame. Discarding invalid frame.") - return nil, nil + // Discard the unparsable frame from the buffer + if _, discardErr := ti.Read(ctx, packetSize); discardErr != nil { + m.log.Debug().Err(discardErr).Uint32("packetSize", packetSize).Msg("error discarding unparsable frame") } - // First parse attempt successful, double check we used all the bytes - if _, consumeErr := ti.Read(ctx, packetSize); consumeErr != nil { - m.log.Debug().Err(consumeErr).Uint32("packetSize", packetSize).Msg("error consuming parsed frame") - return nil, nil - } + // Yield + return nil, nil + } - return tcpAdu, nil + // ------------------------------------------------------------------------- + // SUCCESS + // ------------------------------------------------------------------------- + if _, consumeErr := ti.Read(ctx, packetSize); consumeErr != nil { + m.log.Debug().Err(consumeErr).Msg("error consuming parsed frame") + return nil, nil } - m.log.Warn().Err(err).Msg("Got error reading") - return nil, nil + return tcpAdu, nil } -func (m *MessageCodec) attemptResync(ctx context.Context, ti transports.TransportInstance, peekedBytes []byte, availableBytes uint32) (spi.Message, error) { - // Can we peek more? - if availableBytes > uint32(len(peekedBytes)) { - peekedBytes, _ = ti.PeekReadableBytes(ctx, availableBytes) - } +// handleDesync handles stream realignment when an invalid header is detected at the head. +// It strictly scans the available buffer for a valid MBAP header. +// If one is found, it realigns the stream. +// If NO valid header is found in the *entire* buffer, it treats the connection as dead. +func (m *MessageCodec) handleDesync(ctx context.Context, reason string, fields map[string]interface{}) (spi.Message, error) { + ti := m.GetTransportInstance() - // Some kernels leak tcp keep alive ethernet frame padding ot the stream. - // If we see a series of zero bytes, just discard them. - if allZeros(peekedBytes) { - len := len(peekedBytes) - m.log.Warn(). - Int("length", len). - Msgf("Stream desynchronized. Discarding %d byte NIL packet to to realign.", len) + // Get total available bytes + numBytesAvail, err := ti.GetNumBytesAvailableInBuffer() + if err != nil { + return nil, nil // Yield if we can't check buffer + } - // Discard all zero bytes - ti.Read(ctx, uint32(len)) + // Create a logger with context + fields["reason"] = reason + fields["bytesAvail"] = numBytesAvail + opLog := m.log.With(). + Interface("desyncContext", fields). + Logger() + + opLog.Warn().Msg("Desync detected at stream head.") + + // CASE 1: Small Buffer (< 10 bytes). + // We don't have enough data to scan for a full header+function (need ~8-9 bytes min). + // We can't definitively say the stream is dead, so we just Discard 1 and Yield. + if numBytesAvail < 10 { + opLog.Trace().Msg("Small buffer desync. Discarding 1 byte.") + if _, err := ti.Read(ctx, 1); err != nil { + opLog.Debug().Err(err).Msg("Error reading byte during discard") + } + return nil, nil + } - // Keep using the stream + // CASE 2: Scan for Recovery. + // We peek everything we have. + allBytes, err := ti.PeekReadableBytes(ctx, numBytesAvail) + if err != nil { return nil, nil } - // Search for a valid-ish MBAP header in the stream - for i := 1; i < len(peekedBytes)-8; i++ { - // Check Protocol ID - if peekedBytes[i+2] == 0x00 && peekedBytes[i+3] == 0x00 { - // Check Length field - payloadLength := (uint32(peekedBytes[i+4]) << 8) + uint32(peekedBytes[i+5]) - if payloadLength >= 2 { - // Check for a valid Function Code - functionCode := peekedBytes[i+7] - if functionCode != 0 && (functionCode < 0x80 || payloadLength == 3) { - m.log.Warn(). - Int("offset", i). - Uint32("len", payloadLength). - Uint8("func", functionCode). - Msgf("Stream desynchronized. Found potential MBAP header at offset %d. Discarding %d bytes to realign.", i, i) - - // Discard up to the found header - ti.Read(ctx, uint32(i)) - - // Keep using the stream - return nil, nil + // Scan Loop: Start at offset 1 (since offset 0 is known bad). + // We must stop at (num - 8) because we need to inspect the Function Code at (i + 7). + // Index 'i' is the start of the candidate MBAP Header. + for i := uint32(1); i <= numBytesAvail-8; i++ { + // Check Protocol ID (Bytes 2-3 of the candidate header) + if allBytes[i+2] == 0x00 && allBytes[i+3] == 0x00 { + // Check Length (Bytes 4-5) + length := (uint32(allBytes[i+4]) << 8) + uint32(allBytes[i+5]) + + // Check Function Code (Byte 7) + // MBAP(6) + UnitID(1) + FuncCode(1) -> Offset 7 + fc := allBytes[i+7] + + // VALIDATION LOGIC: + // 1. Length >= 2 (Must have UnitID + FuncCode) + // 2. FuncCode != 0 (Modbus function 0 doesn't exist) + // 3. Strict Packet Structure: + // - Standard Function (< 0x80): Allow any length. + // - Exception Function (>= 0x80): PDU is always 2 bytes (Func + Code). + // Modbus TCP Length = UnitID(1) + PDU(2) = 3 bytes. + if length >= 2 && fc != 0 && (fc < 0x80 || length == 3) { + opLog.Debug().Uint32("offset", i).Msg("Found MBAP candidate in stream. Discarding garbage prefix.") + + // Discard 'i' bytes to align the stream to this candidate + if _, err := ti.Read(ctx, i); err != nil { + opLog.Debug().Err(err).Msg("Error discarding garbage during recovery") } + + // Yield. Next Receive() will pick up this valid header. + return nil, nil } } } - m.log.Warn().Msgf("Stream desynchronized. No valid MBAP header found. Giving up on connection.") - return nil, errors.New("stream desynchronized") -} - -func allZeros(b []byte) bool { - for _, v := range b { - if v != 0 { - return false - } - } - return true + // CASE 3: Connection is unrecoverable with the tools we have... + // We scanned the entire available buffer and found NO valid candidates. + // The connection is sending garbage. Destroy it. + return nil, fmt.Errorf("stream desynchronized: %d bytes of garbage with no valid MBAP header found", numBytesAvail) } From 3f48021c2d56e57a9baf75a918b7ea2fe98864cf Mon Sep 17 00:00:00 2001 From: Shaun Cooley Date: Sun, 23 Nov 2025 15:32:36 -0800 Subject: [PATCH 06/10] refactor: more robust consistency checks --- plc4go/internal/modbus/MessageCodec.go | 202 ++++++++++++++++++------- 1 file changed, 144 insertions(+), 58 deletions(-) diff --git a/plc4go/internal/modbus/MessageCodec.go b/plc4go/internal/modbus/MessageCodec.go index f67654e0e6..a6552d4d7e 100644 --- a/plc4go/internal/modbus/MessageCodec.go +++ b/plc4go/internal/modbus/MessageCodec.go @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -22,6 +22,7 @@ package modbus import ( "context" "fmt" + "io" "github.com/pkg/errors" "github.com/rs/zerolog" @@ -100,8 +101,11 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { // 2. Check buffer status numBytesAvail, err := ti.GetNumBytesAvailableInBuffer() if err != nil { - m.log.Warn().Err(err).Msg("Error getting bytes in buffer") - return nil, nil + // Yield if we can't check buffer + if err == io.EOF { + return nil, nil + } + return nil, fmt.Errorf("error getting buffer length") } // ------------------------------------------------------------------------- @@ -152,12 +156,18 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { // MBAP SANITY CHECKS // ------------------------------------------------------------------------- - // --- CHECK 1: Protocol ID Validation --- - if header[2] != 0x00 || header[3] != 0x00 { - return m.handleDesync(ctx, "Invalid Protocol ID", map[string]interface{}{ - "p1": header[2], - "p2": header[3], - "data": utils.Base64Stringer(header), + // We now peek up to 9 bytes to perform the full Consistency Check used in recovery. + // This catches "False Headers" (like the 04 Func example) immediately. + var checkBytes []byte + if numBytesAvail >= 9 { + checkBytes, _ = ti.PeekReadableBytes(ctx, 9) + } else { + checkBytes = header // Just the 6 bytes we have + } + + if !m.checkPacketConsistency(checkBytes) { + return m.handleDesync(ctx, "Sanity Check Failed", map[string]interface{}{ + "data": utils.Base64Stringer(checkBytes), }) } @@ -165,25 +175,6 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { payloadLength := (uint32(header[4]) << 8) + uint32(header[5]) packetSize := payloadLength + 6 - // --- CHECK 2: Minimum Length --- - if payloadLength < 2 { - return m.handleDesync(ctx, "Invalid packet length (<2)", map[string]interface{}{ - "len": payloadLength, - "data": utils.Base64Stringer(header), - }) - } - - // --- CHECK 3: Function Code 0 (False Header) --- - if numBytesAvail >= 8 { - peekBytes, err := ti.PeekReadableBytes(ctx, 8) - if err == nil && peekBytes[7] == 0 { - return m.handleDesync(ctx, "Invalid Function Code (0)", map[string]interface{}{ - "len": payloadLength, - "data": utils.Base64Stringer(peekBytes), - }) - } - } - // ------------------------------------------------------------------------- // PARSING // ------------------------------------------------------------------------- @@ -250,6 +241,9 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { } // Seems unparsable - log and discard + // If the packet PASSED checkPacketConsistency but failed parsing, it is likely a Valid MBAP + // with invalid content (e.g. unsupported function, bad data). + // We should DISCARD it to maintain sync, NOT trigger Desync Recovery. m.log.Warn().Err(err). Stringer("data", utils.Base64Stringer(frameSlice)). Uint32("packetSize", packetSize). @@ -275,8 +269,111 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { return tcpAdu, nil } +// checkPacketConsistency validates a candidate Modbus TCP packet against the protocol spec. +// It enforces strict relationship rules between the MBAP Header Length and the PDU content (Function Code & Byte Count). +// Returns true if the packet looks valid, false if it is definitely garbage. +// NOTE: When new functions are added to the mspec, this needs to be updated to reflect their structure. +func (m *MessageCodec) checkPacketConsistency(data []byte) bool { + // 1. Check Protocol ID immediately (Need 4 bytes) + if len(data) < 4 { + return false + } + // Modbus TCP Protocol ID must be 0 + if data[2] != 0x00 || data[3] != 0x00 { + return false + } + + // 2. Check Header Length (Need 6 bytes) + if len(data) < 6 { + // Not enough data to check length, but Protocol ID was OK. + // We shouldn't reject yet if we just don't have the bytes. + // However, this function is typically called with a slice that *should* contain the header. + return false + } + + length := (uint32(data[4]) << 8) + uint32(data[5]) + if length < 2 { + return false + } + + // 3. Check Function Code (Need 8 bytes: 6 Header + 1 Unit + 1 Func) + if len(data) < 8 { + // We have a valid ProtoID and Length, but not enough data to check content. + // Return true (benefit of the doubt) because we can't prove it's bad yet. + return true + } + + fc := data[7] + if fc == 0 { + return false + } + + // VALIDATION: Check Internal Consistency of Response Structure + // NOTE: These rules are primarily for Modbus TCP RESPONSES (Client Mode). + // If acting as a Server (receiving Requests), some rules (like Write Multiple) would be different. + + if fc >= 0x80 { + // --- EXCEPTION --- + // Structure: [UnitID] [FuncCode+0x80] [ExceptionCode] + // Length must be exactly 3. + return length == 3 + } + + // --- STANDARD FUNCTIONS --- + switch fc { + case 0x01, 0x02, 0x03, 0x04, 0x17, 0x14, 0x15: + // Variable Length Responses + // Rule: Header Length == ByteCount + 3 + // We need 9 bytes to see the ByteCount at offset 8. + if len(data) >= 9 { + byteCount := uint32(data[8]) + if length != byteCount+3 { + return false + } + } + // Implicit Max Check: Max ByteCount 255 -> Max Length 258. + if length > 258 { + return false + } + + case 0x05, 0x06, 0x0F, 0x10: + // Fixed Length Responses (Write Single, Write Multi) + // Structure: [UnitID] [FuncCode] [AddrHi] [AddrLo] [ValHi] [ValLo] + // Length must be exactly 6. + if length != 6 { + // SERVER MODE CAVEAT: + // If we are a Server receiving a Write Multiple Request (FC 15/16), + // the length will be > 6. If we strictly return false here, we break Server mode. + // As a heuristic, if length is > 6 for these codes, we treat it as potentially valid + // (assuming it's a Request) to be safe, unless it's huge. + if length > 260 { + return false + } + // Ideally we would enforce Request structure (Len = ByteCount + 7), + // but that requires more bytes than we might have peeked. + return true + } + + case 0x16: + // Mask Write Register + // Length must be exactly 8. + if length != 8 { + return false + } + + default: + // Other/Custom Functions. + // Max PDU size is 253 -> Length 254. Allow margin. + if length > 260 { + return false + } + } + + return true +} + // handleDesync handles stream realignment when an invalid header is detected at the head. -// It strictly scans the available buffer for a valid MBAP header. +// It strictly scans the available buffer for a valid MBAP header using checkPacketConsistency. // If one is found, it realigns the stream. // If NO valid header is found in the *entire* buffer, it treats the connection as dead. func (m *MessageCodec) handleDesync(ctx context.Context, reason string, fields map[string]interface{}) (spi.Message, error) { @@ -312,40 +409,29 @@ func (m *MessageCodec) handleDesync(ctx context.Context, reason string, fields m // We peek everything we have. allBytes, err := ti.PeekReadableBytes(ctx, numBytesAvail) if err != nil { - return nil, nil + return nil, fmt.Errorf("error peeking bytes during desync recovery") } // Scan Loop: Start at offset 1 (since offset 0 is known bad). - // We must stop at (num - 8) because we need to inspect the Function Code at (i + 7). - // Index 'i' is the start of the candidate MBAP Header. - for i := uint32(1); i <= numBytesAvail-8; i++ { - // Check Protocol ID (Bytes 2-3 of the candidate header) - if allBytes[i+2] == 0x00 && allBytes[i+3] == 0x00 { - // Check Length (Bytes 4-5) - length := (uint32(allBytes[i+4]) << 8) + uint32(allBytes[i+5]) - - // Check Function Code (Byte 7) - // MBAP(6) + UnitID(1) + FuncCode(1) -> Offset 7 - fc := allBytes[i+7] - - // VALIDATION LOGIC: - // 1. Length >= 2 (Must have UnitID + FuncCode) - // 2. FuncCode != 0 (Modbus function 0 doesn't exist) - // 3. Strict Packet Structure: - // - Standard Function (< 0x80): Allow any length. - // - Exception Function (>= 0x80): PDU is always 2 bytes (Func + Code). - // Modbus TCP Length = UnitID(1) + PDU(2) = 3 bytes. - if length >= 2 && fc != 0 && (fc < 0x80 || length == 3) { - opLog.Debug().Uint32("offset", i).Msg("Found MBAP candidate in stream. Discarding garbage prefix.") - - // Discard 'i' bytes to align the stream to this candidate - if _, err := ti.Read(ctx, i); err != nil { - opLog.Debug().Err(err).Msg("Error discarding garbage during recovery") - } + // We verify candidates up to the end of the buffer. + // We stop when we don't have enough bytes left to even check the Protocol ID (4 bytes). + limit := uint32(0) + if numBytesAvail >= 4 { + limit = numBytesAvail - 4 + } - // Yield. Next Receive() will pick up this valid header. - return nil, nil + for i := uint32(1); i <= limit; i++ { + // Use our robust consistency check on the slice starting at i + if m.checkPacketConsistency(allBytes[i:]) { + opLog.Debug().Uint32("offset", i).Msg("Found MBAP candidate in stream. Discarding garbage prefix.") + + // Discard 'i' bytes to align the stream to this candidate + if _, err := ti.Read(ctx, i); err != nil { + opLog.Debug().Err(err).Msg("Error discarding garbage during recovery") } + + // Yield. Next Receive() will pick up this valid header. + return nil, nil } } From 8de94d731ba58c297e199c9f8ad1350b44608d9e Mon Sep 17 00:00:00 2001 From: Shaun Cooley Date: Sun, 23 Nov 2025 16:48:13 -0800 Subject: [PATCH 07/10] fix: Final consistency check case should discard all available bytes --- plc4go/internal/modbus/MessageCodec.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/plc4go/internal/modbus/MessageCodec.go b/plc4go/internal/modbus/MessageCodec.go index a6552d4d7e..c0b3f5d8e1 100644 --- a/plc4go/internal/modbus/MessageCodec.go +++ b/plc4go/internal/modbus/MessageCodec.go @@ -128,7 +128,8 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { m.log.Debug().Msg("Detected NIL Packet (Keep-Alive). Discarding 6 bytes.") if _, err := ti.Read(ctx, 6); err != nil { m.log.Warn().Err(err).Msg("Error discarding NIL packet") - return nil, nil + // If we can't read, we are dead. + return nil, err } // Refresh num and loop @@ -165,6 +166,7 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { checkBytes = header // Just the 6 bytes we have } + // Perform Consistency Check if !m.checkPacketConsistency(checkBytes) { return m.handleDesync(ctx, "Sanity Check Failed", map[string]interface{}{ "data": utils.Base64Stringer(checkBytes), @@ -241,9 +243,11 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { } // Seems unparsable - log and discard - // If the packet PASSED checkPacketConsistency but failed parsing, it is likely a Valid MBAP - // with invalid content (e.g. unsupported function, bad data). - // We should DISCARD it to maintain sync, NOT trigger Desync Recovery. + // NOTE: If checkPacketConsistency PASSED, but Parse FAILED, it means the header + // structure is valid (Length, Proto, Func), but the Content is bad. + // We MUST discard this frame to advance the stream. + // We DO NOT call handleDesync here because we don't want to scan ahead; + // we just want to eat the bad packet and try the next one. m.log.Warn().Err(err). Stringer("data", utils.Base64Stringer(frameSlice)). Uint32("packetSize", packetSize). @@ -252,6 +256,7 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { // Discard the unparsable frame from the buffer if _, discardErr := ti.Read(ctx, packetSize); discardErr != nil { m.log.Debug().Err(discardErr).Uint32("packetSize", packetSize).Msg("error discarding unparsable frame") + return nil, discardErr } // Yield @@ -382,7 +387,7 @@ func (m *MessageCodec) handleDesync(ctx context.Context, reason string, fields m // Get total available bytes numBytesAvail, err := ti.GetNumBytesAvailableInBuffer() if err != nil { - return nil, nil // Yield if we can't check buffer + return nil, err // Yield if we can't check buffer } // Create a logger with context @@ -401,6 +406,7 @@ func (m *MessageCodec) handleDesync(ctx context.Context, reason string, fields m opLog.Trace().Msg("Small buffer desync. Discarding 1 byte.") if _, err := ti.Read(ctx, 1); err != nil { opLog.Debug().Err(err).Msg("Error reading byte during discard") + return nil, err // Return error to kill connection if we can't consume } return nil, nil } @@ -428,6 +434,7 @@ func (m *MessageCodec) handleDesync(ctx context.Context, reason string, fields m // Discard 'i' bytes to align the stream to this candidate if _, err := ti.Read(ctx, i); err != nil { opLog.Debug().Err(err).Msg("Error discarding garbage during recovery") + return nil, err // Return error to kill connection } // Yield. Next Receive() will pick up this valid header. @@ -437,6 +444,13 @@ func (m *MessageCodec) handleDesync(ctx context.Context, reason string, fields m // CASE 3: Connection is unrecoverable with the tools we have... // We scanned the entire available buffer and found NO valid candidates. - // The connection is sending garbage. Destroy it. + // The connection is sending garbage. Would be nice to destroy it, but DefaultCodec + // doesn't have any such ability. For now we'll just consume all available bytes + // and return to make sure we don't spin endlessly. + if _, err := ti.Read(ctx, numBytesAvail); err != nil { + opLog.Debug().Err(err).Msg("Error discarding garbage during recovery") + return nil, err // Return error to kill connection + } + return nil, fmt.Errorf("stream desynchronized: %d bytes of garbage with no valid MBAP header found", numBytesAvail) } From 6735c7d2108beacca378c564e3066a3371b6b3d7 Mon Sep 17 00:00:00 2001 From: Shaun Cooley Date: Sun, 23 Nov 2025 19:20:25 -0800 Subject: [PATCH 08/10] refactor: reduce log spam --- plc4go/internal/modbus/MessageCodec.go | 34 ++++++++++++++++++-------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/plc4go/internal/modbus/MessageCodec.go b/plc4go/internal/modbus/MessageCodec.go index c0b3f5d8e1..c3587975a7 100644 --- a/plc4go/internal/modbus/MessageCodec.go +++ b/plc4go/internal/modbus/MessageCodec.go @@ -95,7 +95,10 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { } return numBytesAvailable < 6 }); err != nil { - m.log.Debug().Err(err).Msg("error filling buffer") + if err != io.EOF { + m.log.Debug().Err(err).Msg("error filling buffer") + } + // Fall through on errors, we might have enough data... } // 2. Check buffer status @@ -157,14 +160,16 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { // MBAP SANITY CHECKS // ------------------------------------------------------------------------- - // We now peek up to 9 bytes to perform the full Consistency Check used in recovery. - // This catches "False Headers" (like the 04 Func example) immediately. + // We now peek up to 9 bytes (or whatever is available) to perform the full Consistency Check. + // IMPORTANT: If we have 8 bytes (Header + Unit + Func), we MUST pass all 8 bytes + // so that checkPacketConsistency can validate the Function Code and Length limits. + // If we only pass 6, it will skip the Function check and potentially accept huge lengths. var checkBytes []byte - if numBytesAvail >= 9 { - checkBytes, _ = ti.PeekReadableBytes(ctx, 9) - } else { - checkBytes = header // Just the 6 bytes we have + peekLen := uint32(9) + if numBytesAvail < 9 { + peekLen = numBytesAvail } + checkBytes, _ = ti.PeekReadableBytes(ctx, peekLen) // Perform Consistency Check if !m.checkPacketConsistency(checkBytes) { @@ -397,13 +402,19 @@ func (m *MessageCodec) handleDesync(ctx context.Context, reason string, fields m Interface("desyncContext", fields). Logger() - opLog.Warn().Msg("Desync detected at stream head.") + // Log on the way out to minimize log spam + dispMsg := "" + dispLevel := zerolog.WarnLevel + defer func() { + opLog.WithLevel(dispLevel).Msg("Desync detected at stream head: " + dispMsg) + }() // CASE 1: Small Buffer (< 10 bytes). // We don't have enough data to scan for a full header+function (need ~8-9 bytes min). // We can't definitively say the stream is dead, so we just Discard 1 and Yield. if numBytesAvail < 10 { - opLog.Trace().Msg("Small buffer desync. Discarding 1 byte.") + dispMsg = "Small buffer - Discard 1" + dispLevel = zerolog.DebugLevel if _, err := ti.Read(ctx, 1); err != nil { opLog.Debug().Err(err).Msg("Error reading byte during discard") return nil, err // Return error to kill connection if we can't consume @@ -429,7 +440,8 @@ func (m *MessageCodec) handleDesync(ctx context.Context, reason string, fields m for i := uint32(1); i <= limit; i++ { // Use our robust consistency check on the slice starting at i if m.checkPacketConsistency(allBytes[i:]) { - opLog.Debug().Uint32("offset", i).Msg("Found MBAP candidate in stream. Discarding garbage prefix.") + dispMsg = fmt.Sprintf("Found MBAP candidate at +%d", i) + dispLevel = zerolog.DebugLevel // Discard 'i' bytes to align the stream to this candidate if _, err := ti.Read(ctx, i); err != nil { @@ -447,6 +459,8 @@ func (m *MessageCodec) handleDesync(ctx context.Context, reason string, fields m // The connection is sending garbage. Would be nice to destroy it, but DefaultCodec // doesn't have any such ability. For now we'll just consume all available bytes // and return to make sure we don't spin endlessly. + dispMsg = fmt.Sprintf("No MBAP candidate found. Discarding all (%d) available bytes", numBytesAvail) + dispLevel = zerolog.InfoLevel if _, err := ti.Read(ctx, numBytesAvail); err != nil { opLog.Debug().Err(err).Msg("Error discarding garbage during recovery") return nil, err // Return error to kill connection From 6727c484a455cd8b0dd8e8523478672f9728af96 Mon Sep 17 00:00:00 2001 From: Shaun Cooley Date: Sun, 23 Nov 2025 19:30:51 -0800 Subject: [PATCH 09/10] fix: handleDesync more robustly handles the padding leak issue --- plc4go/internal/modbus/MessageCodec.go | 45 ++++---------------------- 1 file changed, 7 insertions(+), 38 deletions(-) diff --git a/plc4go/internal/modbus/MessageCodec.go b/plc4go/internal/modbus/MessageCodec.go index c3587975a7..7e8c3391d5 100644 --- a/plc4go/internal/modbus/MessageCodec.go +++ b/plc4go/internal/modbus/MessageCodec.go @@ -111,40 +111,7 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { return nil, fmt.Errorf("error getting buffer length") } - // ------------------------------------------------------------------------- - // Discard NIL Packets (Keep-Alives w/padding that leaked into stream) - // ------------------------------------------------------------------------- - for { - if numBytesAvail < 6 { - break - } - header, err := ti.PeekReadableBytes(ctx, 6) - if err != nil { - m.log.Warn().Err(err).Msg("error peeking header") - return nil, nil - } - - // Check for 6 bytes of zeros - if header[0] == 0 && header[1] == 0 && header[2] == 0 && - header[3] == 0 && header[4] == 0 && header[5] == 0 { - - m.log.Debug().Msg("Detected NIL Packet (Keep-Alive). Discarding 6 bytes.") - if _, err := ti.Read(ctx, 6); err != nil { - m.log.Warn().Err(err).Msg("Error discarding NIL packet") - // If we can't read, we are dead. - return nil, err - } - - // Refresh num and loop - numBytesAvail, err = ti.GetNumBytesAvailableInBuffer() - if err != nil { - return nil, nil - } - continue - } - break - } - + // Need at least 6 bytes for MBAP header if numBytesAvail < 6 { return nil, nil } @@ -178,14 +145,14 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { }) } - // Length field is big endian encoded WORD - payloadLength := (uint32(header[4]) << 8) + uint32(header[5]) - packetSize := payloadLength + 6 - // ------------------------------------------------------------------------- // PARSING // ------------------------------------------------------------------------- + // Length field is big endian encoded WORD + payloadLength := (uint32(header[4]) << 8) + uint32(header[5]) + packetSize := payloadLength + 6 + // Yield on TCP fragmentation if numBytesAvail < packetSize { // Wait for more data (standard TCP fragmentation handling) @@ -432,6 +399,8 @@ func (m *MessageCodec) handleDesync(ctx context.Context, reason string, fields m // Scan Loop: Start at offset 1 (since offset 0 is known bad). // We verify candidates up to the end of the buffer. // We stop when we don't have enough bytes left to even check the Protocol ID (4 bytes). + // ... This deals with any gateway forwarding RTU CRC16 extra bytes or leaky kernels that + // shove TCP keep-alive Ethernet II frame padding bytes into the stream. limit := uint32(0) if numBytesAvail >= 4 { limit = numBytesAvail - 4 From 92cc90928bd92f741d9fc1f569eff92236d51dd4 Mon Sep 17 00:00:00 2001 From: Shaun Cooley Date: Sun, 23 Nov 2025 20:10:33 -0800 Subject: [PATCH 10/10] fix: keep the last 5 bytes to avoid breaking fragmentation --- plc4go/internal/modbus/MessageCodec.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/plc4go/internal/modbus/MessageCodec.go b/plc4go/internal/modbus/MessageCodec.go index 7e8c3391d5..1f71f65b2e 100644 --- a/plc4go/internal/modbus/MessageCodec.go +++ b/plc4go/internal/modbus/MessageCodec.go @@ -423,17 +423,21 @@ func (m *MessageCodec) handleDesync(ctx context.Context, reason string, fields m } } - // CASE 3: Connection is unrecoverable with the tools we have... - // We scanned the entire available buffer and found NO valid candidates. - // The connection is sending garbage. Would be nice to destroy it, but DefaultCodec - // doesn't have any such ability. For now we'll just consume all available bytes - // and return to make sure we don't spin endlessly. - dispMsg = fmt.Sprintf("No MBAP candidate found. Discarding all (%d) available bytes", numBytesAvail) + // CASE 3: We scanned the entire available buffer and found NO valid candidates. + // We want to keep the last 5 bytes, as they might be the start of a header + // (TransID[2] + ProtoID[2] + Len[1 here, 1 missing]) that completes in the + // next TCP packet of at least 1 byte. + bytesToDiscard := numBytesAvail + if numBytesAvail > 5 { + bytesToDiscard = numBytesAvail - 5 + } + + dispMsg = fmt.Sprintf("No MBAP candidate found. Discarding (%d) bytes, keeping 5-byte tail", bytesToDiscard) dispLevel = zerolog.InfoLevel - if _, err := ti.Read(ctx, numBytesAvail); err != nil { + if _, err := ti.Read(ctx, bytesToDiscard); err != nil { opLog.Debug().Err(err).Msg("Error discarding garbage during recovery") return nil, err // Return error to kill connection } - return nil, fmt.Errorf("stream desynchronized: %d bytes of garbage with no valid MBAP header found", numBytesAvail) + return nil, fmt.Errorf("stream desynchronized: discarded %d bytes of garbage", bytesToDiscard) }