Files
modbus/rtu_transport.go
2025-11-17 18:26:04 +08:00

560 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package modbus
import (
"fmt"
"io"
"log"
"os"
"time"
)
const (
maxRTUFrameLength int = 256
)
type rtuTransport struct {
logger *logger
link rtuLink
timeout time.Duration
lastActivity time.Time
t35 time.Duration
t1 time.Duration
}
type rtuLink interface {
Close() error
Read([]byte) (int, error)
Write([]byte) (int, error)
SetDeadline(time.Time) error
}
// Returns a new RTU transport.
func newRTUTransport(link rtuLink, addr string, speed uint, timeout time.Duration, customLogger *log.Logger) (rt *rtuTransport) {
rt = &rtuTransport{
logger: newLogger(fmt.Sprintf("rtu-transport(%s)", addr), customLogger),
link: link,
timeout: timeout,
t1: serialCharTime(speed),
}
if speed >= 19200 {
// for baud rates equal to or greater than 19200 bauds, a fixed value of
// 1750 uS is specified for t3.5.
rt.t35 = 1750 * time.Microsecond
} else {
// for lower baud rates, the inter-frame delay should be 3.5 character times
rt.t35 = (serialCharTime(speed) * 35) / 10
}
return
}
// Closes the rtu link.
func (rt *rtuTransport) Close() (err error) {
err = rt.link.Close()
return
}
// Runs a request across the rtu link and returns a response.
func (rt *rtuTransport) ExecuteRequest(req *pdu) (res *pdu, err error) {
var ts time.Time
var t time.Duration
var n int
// set an i/o deadline on the link
err = rt.link.SetDeadline(time.Now().Add(rt.timeout))
if err != nil {
return
}
// if the line was active less than 3.5 char times ago,
// let t3.5 expire before transmitting
t = time.Since(rt.lastActivity.Add(rt.t35))
if t < 0 {
time.Sleep(t * (-1))
}
ts = time.Now()
// build an RTU ADU out of the request object and
// send the final ADU+CRC on the wire
n, err = rt.link.Write(rt.assembleRTUFrame(req))
fmt.Printf("send: %X\n", rt.assembleRTUFrame(req))
if err != nil {
fmt.Printf("write error: %s", err.Error())
return
}
// estimate how long the serial line was busy for.
// note that on most platforms, Write() will be buffered and return
// immediately rather than block until the buffer is drained
rt.lastActivity = ts.Add(time.Duration(n) * rt.t1)
// observe inter-frame delays
time.Sleep(rt.lastActivity.Add(rt.t35).Sub(time.Now()))
// read the response back from the wire
res, err = rt.readRTUFrame()
if err == ErrBadCRC || err == ErrProtocolError || err == ErrShortFrame {
// wait for and flush any data coming off the link to allow
// devices to re-sync
time.Sleep(time.Duration(maxRTUFrameLength) * rt.t1)
discard(rt.link)
}
// mark the time if we heard anything back
if err != ErrRequestTimedOut {
rt.lastActivity = time.Now()
}
return
}
func (rt *rtuTransport) ExecuteRequestWithRes(req *pdu) (res *pdu, err error) {
var ts time.Time
var t time.Duration
var n int
// set an i/o deadline on the link
err = rt.link.SetDeadline(time.Now().Add(rt.timeout))
if err != nil {
return
}
// if the line was active less than 3.5 char times ago,
// let t3.5 expire before transmitting
t = time.Since(rt.lastActivity.Add(rt.t35))
if t < 0 {
time.Sleep(t * (-1))
}
ts = time.Now()
// build an RTU ADU out of the request object and
// send the final ADU+CRC on the wire
n, err = rt.link.Write(rt.assembleRTUFrame(req))
if err != nil {
return
}
// estimate how long the serial line was busy for.
// note that on most platforms, Write() will be buffered and return
// immediately rather than block until the buffer is drained
rt.lastActivity = ts.Add(time.Duration(n) * rt.t1)
// observe inter-frame delays
time.Sleep(rt.lastActivity.Add(rt.t35).Sub(time.Now()))
// read the response back from the wire
res, err = rt.readRTUFrameWithRes()
if err == ErrBadCRC || err == ErrProtocolError || err == ErrShortFrame {
// wait for and flush any data coming off the link to allow
// devices to re-sync
time.Sleep(time.Duration(maxRTUFrameLength) * rt.t1)
discard(rt.link)
}
// mark the time if we heard anything back
if err != ErrRequestTimedOut {
rt.lastActivity = time.Now()
}
return
}
// Reads a request from the rtu link.
func (rt *rtuTransport) ReadRequest() (req *pdu, err error) {
// reading requests from RTU links is currently unsupported
err = fmt.Errorf("unimplemented")
return
}
// Writes a response to the rtu link.
func (rt *rtuTransport) WriteResponse(res *pdu) (err error) {
var n int
// build an RTU ADU out of the request object and
// send the final ADU+CRC on the wire
n, err = rt.link.Write(rt.assembleRTUFrame(res))
if err != nil {
return
}
rt.lastActivity = time.Now().Add(rt.t1 * time.Duration(n))
return
}
// Waits for, reads and decodes a frame from the rtu link.
func (rt *rtuTransport) readRTUFrame() (res *pdu, err error) {
var rxbuf []byte
var byteCount int
var bytesNeeded int
var crc crc
// var dataLength uint16
rxbuf = make([]byte, maxRTUFrameLength)
// read the serial ADU header: unit id (1 byte), function code (1 byte) and
// PDU length/exception code (1 byte)
byteCount, err = io.ReadFull(rt.link, rxbuf[0:3])
if (byteCount > 0 || err == nil) && byteCount != 3 {
err = ErrShortFrame
return
}
if uint8(rxbuf[1]) == fcCustomize {
fmt.Printf("receive: %v\n", rxbuf[0:3])
}
if err != nil && err != io.ErrUnexpectedEOF {
return
}
// handle custom function code 0x41 with special format:
// [单元ID] [功能码] [起始地址(2字节)] [字节数(2字节)] [数据...] [CRC]
if uint8(rxbuf[1]) == fcCustomize {
// 1. 读取剩余的自定义头部 (4 字节: rxbuf[2] 到 rxbuf[5])
// 覆盖 rxbuf[2](上一次读取的第三个字节)以及 rxbuf[3], rxbuf[4], rxbuf[5]
byteCount, err = io.ReadFull(rt.link, rxbuf[3:6]) // <-- 关键修正:从索引 2 开始读取 4 字节
// 修正短帧检查逻辑
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
err = ErrShortFrame
}
return
}
// 此时 byteCount 应该总是 4。
// 2. 提取数据长度 (关键修正点)
// 数据长度位于 rxbuf[4] 和 rxbuf[5]
dataLength := bytesToUint16(BIG_ENDIAN, rxbuf[4:6])
bytesNeeded = int(dataLength)
fmt.Println(bytesNeeded)
// 3. 计算总共需要读取的字节数
// 数据域长度 + 2 字节 CRC
bytesToRead := bytesNeeded + 2
// 4. 计算总帧大小: 6 (Header) + DataLength + 2 (CRC)
totalFrameSize := 6 + bytesToRead
TotalHeaderLength := 6
// 5. 动态分配或调整缓冲区
if totalFrameSize > maxRTUFrameLength {
// save already read data
header := make([]byte, 6)
copy(header, rxbuf[0:TotalHeaderLength]) // 复制 rxbuf[0] 到 rxbuf[5]
// resize buffer to accommodate larger frame
rxbuf = make([]byte, totalFrameSize)
// copy back the header
copy(rxbuf[0:TotalHeaderLength], header)
}
// 6. 读取数据域和 CRC
// 从 rxbuf[6] (TotalHeaderLength) 开始读取 DataLength + CRC (2 字节)
readStartIdx := TotalHeaderLength // 6
byteCount, err = io.ReadFull(rt.link, rxbuf[readStartIdx:totalFrameSize])
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
err = ErrShortFrame
}
return
}
if byteCount != bytesToRead {
rt.logger.Warningf("expected %v bytes, received %v", bytesToRead, byteCount)
err = ErrShortFrame
return
}
// 7. CRC 校验
crcEndIndex := totalFrameSize - 2 // CRC 校验范围的结束索引 (不包含)
fmt.Println("crc end index", crcEndIndex)
crc.init()
crc.add(rxbuf[0:crcEndIndex]) // 校验范围从 rxbuf[0] 到数据域结束
// 比较接收到的 CRC
// recvCRCLow := rxbuf[crcEndIndex]
// recvCRCHigh := rxbuf[crcEndIndex+1]
// fmt.Println("len: ", len(rxbuf[6:crcEndIndex]))
// if !crc.isEqual(recvCRCLow, recvCRCHigh) {
// err = ErrBadCRC
// fmt.Println("crc: ", recvCRCLow, recvCRCHigh, "byte needed: ", bytesNeeded)
// return
// }
// 8. 构造 PDU
// Payload 包含:自定义数据 (4 bytes) + Data (N bytes)
// 切片范围:从 rxbuf[2] (自定义数据开始) 到数据域结束 (crcEndIndex)
res = &pdu{
unitId: rxbuf[0],
functionCode: rxbuf[1],
payload: rxbuf[2:crcEndIndex],
}
return
}
// standard modbus protocol handling
// figure out how many further bytes to read
bytesNeeded, err = expectedResponseLenth(uint8(rxbuf[1]), uint8(rxbuf[2]))
if err != nil {
return
}
// we need to read 2 additional bytes of CRC after the payload
bytesNeeded += 2
// never read more than the max allowed frame length
if byteCount+bytesNeeded > maxRTUFrameLength {
err = ErrProtocolError
return
}
byteCount, err = io.ReadFull(rt.link, rxbuf[3:3+bytesNeeded])
if err != nil && err != io.ErrUnexpectedEOF {
return
}
if byteCount != bytesNeeded {
rt.logger.Warningf("expected %v bytes, received %v", bytesNeeded, byteCount)
err = ErrShortFrame
return
}
// compute the CRC on the entire frame, excluding the CRC
crc.init()
crc.add(rxbuf[0 : 3+bytesNeeded-2])
// compare CRC values
if !crc.isEqual(rxbuf[3+bytesNeeded-2], rxbuf[3+bytesNeeded-1]) {
err = ErrBadCRC
return
}
res = &pdu{
unitId: rxbuf[0],
functionCode: rxbuf[1],
// pass the byte count + trailing data as payload, withtout the CRC
payload: rxbuf[2 : 3+bytesNeeded-2],
}
return
}
func (rt *rtuTransport) readRTUFrameWithRes() (res *pdu, err error) {
var rxbuf []byte
var byteCount int
var bytesNeeded int
var crc crc
rxbuf = make([]byte, 4096)
// read the serial ADU header: unit id (1 byte), function code (1 byte) and
// PDU length/exception code (1 byte)
byteCount, err = io.ReadFull(rt.link, rxbuf[0:3])
if (byteCount > 0 || err == nil) && byteCount != 3 {
err = ErrShortFrame
return
}
if err != nil && err != io.ErrUnexpectedEOF {
return
}
// figure out how many further bytes to read
bytesNeeded, err = expectedResponseLenth(uint8(rxbuf[1]), uint8(rxbuf[2]))
if err != nil {
return
}
// we need to read 2 additional bytes of CRC after the payload
bytesNeeded += 2
// never read more than the max allowed frame length
if byteCount+bytesNeeded > maxRTUFrameLength {
err = ErrProtocolError
return
}
byteCount, err = io.ReadFull(rt.link, rxbuf[3:3+bytesNeeded])
if err != nil && err != io.ErrUnexpectedEOF {
return
}
if byteCount != bytesNeeded {
rt.logger.Warningf("expected %v bytes, received %v", bytesNeeded, byteCount)
err = ErrShortFrame
return
}
// compute the CRC on the entire frame, excluding the CRC
crc.init()
crc.add(rxbuf[0 : 3+bytesNeeded-2])
// compare CRC values
if !crc.isEqual(rxbuf[3+bytesNeeded-2], rxbuf[3+bytesNeeded-1]) {
err = ErrBadCRC
return
}
// 标准modbus响应已读取完成现在读取自定义数据
// 设置5秒超时来读取自定义数据
customDataTimeout := 5 * time.Second
err = rt.link.SetDeadline(time.Now().Add(customDataTimeout))
if err != nil {
return
}
// 使用临时缓冲区循环读取自定义数据
tempBuf := make([]byte, 256) // 每次读取最多256字节
totalRead := 0
startPos := 3 + bytesNeeded
for {
// 检查缓冲区是否还有空间
if startPos+totalRead+len(tempBuf) > len(rxbuf) {
// 如果缓冲区不够,扩展它
newBuf := make([]byte, len(rxbuf)*2)
copy(newBuf, rxbuf)
rxbuf = newBuf
}
// 尝试读取数据
n, readErr := rt.link.Read(tempBuf)
if n > 0 {
// 将读取的数据复制到rxbuf
copy(rxbuf[startPos+totalRead:startPos+totalRead+n], tempBuf[:n])
totalRead += n
}
// 如果遇到超时错误,说明没有更多数据了
if readErr != nil {
if os.IsTimeout(readErr) {
// 超时是正常的,说明自定义数据读取完成
break
}
// 其他错误需要检查是否是EOF数据读取完成
if readErr == io.EOF {
break
}
// 对于其他错误,如果已经读取了一些数据,继续处理
if totalRead == 0 {
err = readErr
return
}
break
}
// 如果读取的字节数少于请求的,说明没有更多数据了
if n < len(tempBuf) {
break
}
}
// 返回标准响应的payload地址+值4字节+ 自定义数据
// 标准响应的payload在 rxbuf[2:3+bytesNeeded-2] 位置
standardPayloadStart := 2
standardPayloadEnd := 3 + bytesNeeded - 2
// 构建完整的payload标准响应payload + 自定义数据
completePayload := make([]byte, 0, standardPayloadEnd-standardPayloadStart+totalRead)
completePayload = append(completePayload, rxbuf[standardPayloadStart:standardPayloadEnd]...)
if totalRead > 0 {
completePayload = append(completePayload, rxbuf[startPos:startPos+totalRead]...)
}
res = &pdu{
unitId: rxbuf[0],
functionCode: rxbuf[1],
// payload包含标准响应的payload + 自定义数据
payload: completePayload,
}
return
}
// Turns a PDU object into bytes.
func (rt *rtuTransport) assembleRTUFrame(p *pdu) (adu []byte) {
var crc crc
adu = append(adu, p.unitId)
adu = append(adu, p.functionCode)
adu = append(adu, p.payload...)
// run the ADU through the CRC generator
crc.init()
crc.add(adu)
// append the CRC to the ADU
adu = append(adu, crc.value()...)
return
}
// Computes the expected length of a modbus RTU response.
func expectedResponseLenth(responseCode uint8, responseLength uint8) (byteCount int, err error) {
switch responseCode {
case fcReadHoldingRegisters,
fcReadInputRegisters,
fcReadCoils,
fcReadDiscreteInputs,
0x41:
byteCount = int(responseLength)
case fcWriteSingleRegister,
fcWriteMultipleRegisters,
fcWriteSingleCoil,
fcWriteMultipleCoils:
byteCount = 3
case fcMaskWriteRegister:
byteCount = 5
case fcReadHoldingRegisters | 0x80,
fcReadInputRegisters | 0x80,
fcReadCoils | 0x80,
fcReadDiscreteInputs | 0x80,
fcWriteSingleRegister | 0x80,
fcWriteMultipleRegisters | 0x80,
fcWriteSingleCoil | 0x80,
fcWriteMultipleCoils | 0x80,
fcMaskWriteRegister | 0x80,
0x41 | 0x80:
byteCount = 0
default:
err = ErrProtocolError
}
return
}
// Discards the contents of the link's rx buffer, eating up to 1kB of data.
// Note that on a serial line, this call may block for up to serialConf.Timeout
// i.e. 10ms.
func discard(link rtuLink) {
var rxbuf = make([]byte, 1024)
link.SetDeadline(time.Now().Add(500 * time.Microsecond))
io.ReadFull(link, rxbuf)
return
}
// Returns how long it takes to send 1 byte on a serial line at the
// specified baud rate.
func serialCharTime(rate_bps uint) (ct time.Duration) {
// note: an RTU byte on the wire is:
// - 1 start bit,
// - 8 data bits,
// - 1 parity or stop bit
// - 1 stop bit
ct = (11) * time.Second / time.Duration(rate_bps)
return
}