From 8318daf99b3a0e6710e5cc7c3404e19fb5251674 Mon Sep 17 00:00:00 2001 From: Fuyao Date: Fri, 14 Nov 2025 15:19:47 +0800 Subject: [PATCH] =?UTF-8?q?=E8=87=AA=E5=AE=9A=E4=B9=89=E8=AF=BB=E5=8F=96?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3&=E8=87=AA=E5=AE=9A=E4=B9=89=E5=86=99?= =?UTF-8?q?=E5=85=A5=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client.go | 30 +++++++++++- rtu_transport.go | 124 ++++++++++++++++++++++++++++++++++++++++++++++- tcp_transport.go | 92 ++++++++++++++++++----------------- transport.go | 20 ++++---- 4 files changed, 210 insertions(+), 56 deletions(-) diff --git a/client.go b/client.go index 8679c1b..9b9184f 100644 --- a/client.go +++ b/client.go @@ -793,7 +793,7 @@ func (mc *ModbusClient) WriteRegisterWithRes(addr uint16, value uint16) (bytes [ req.payload = append(req.payload, uint16ToBytes(mc.endianness, value)...) // run the request across the transport and wait for a response - res, err = mc.executeRequest(req) + res, err = mc.executeRequestWithRes(req) if err != nil { return } @@ -1205,7 +1205,7 @@ func (mc *ModbusClient) readRegistersWithFunctionCode(addr uint16, quantity uint // 16 * 16 * 40 if quantity > 10240 { err = ErrUnexpectedParameters - mc.logger.Error("quantity of registers exceeds 1024") + mc.logger.Error("quantity of registers exceeds 10240") return } @@ -1401,3 +1401,29 @@ func (mc *ModbusClient) executeRequest(req *pdu) (res *pdu, err error) { return } + +func (mc *ModbusClient) executeRequestWithRes(req *pdu) (res *pdu, err error) { + // send the request over the wire, wait for and decode the response + res, err = mc.transport.ExecuteRequestWithRes(req) + if err != nil { + // map i/o timeouts to ErrRequestTimedOut + if os.IsTimeout(err) { + err = ErrRequestTimedOut + } + return + } + + // make sure the source unit id matches that of the request + if (res.functionCode&0x80) == 0x00 && res.unitId != req.unitId { + err = ErrBadUnitId + return + } + // accept errors from gateway devices (using special unit id #255) + if (res.functionCode&0x80) == 0x80 && + (res.unitId != req.unitId && res.unitId != 0xff) { + err = ErrBadUnitId + return + } + + return +} diff --git a/rtu_transport.go b/rtu_transport.go index 08da0aa..01d46cb 100644 --- a/rtu_transport.go +++ b/rtu_transport.go @@ -109,6 +109,59 @@ func (rt *rtuTransport) ExecuteRequest(req *pdu) (res *pdu, err error) { return } +func (rt *rtuTransport) ExecuteRequestWithRes(req *pdu) (res *pdu, err error) { + var ts time.Time + var t time.Duration + var n int + + // set an i/o deadline on the link + err = rt.link.SetDeadline(time.Now().Add(rt.timeout)) + if err != nil { + return + } + + // if the line was active less than 3.5 char times ago, + // let t3.5 expire before transmitting + t = time.Since(rt.lastActivity.Add(rt.t35)) + if t < 0 { + time.Sleep(t * (-1)) + } + + ts = time.Now() + + // build an RTU ADU out of the request object and + // send the final ADU+CRC on the wire + n, err = rt.link.Write(rt.assembleRTUFrame(req)) + if err != nil { + return + } + + // estimate how long the serial line was busy for. + // note that on most platforms, Write() will be buffered and return + // immediately rather than block until the buffer is drained + rt.lastActivity = ts.Add(time.Duration(n) * rt.t1) + + // observe inter-frame delays + time.Sleep(rt.lastActivity.Add(rt.t35).Sub(time.Now())) + + // read the response back from the wire + res, err = rt.readRTUFrameWithRes() + + if err == ErrBadCRC || err == ErrProtocolError || err == ErrShortFrame { + // wait for and flush any data coming off the link to allow + // devices to re-sync + time.Sleep(time.Duration(maxRTUFrameLength) * rt.t1) + discard(rt.link) + } + + // mark the time if we heard anything back + if err != ErrRequestTimedOut { + rt.lastActivity = time.Now() + } + + return +} + // Reads a request from the rtu link. func (rt *rtuTransport) ReadRequest() (req *pdu, err error) { // reading requests from RTU links is currently unsupported @@ -140,7 +193,7 @@ func (rt *rtuTransport) readRTUFrame() (res *pdu, err error) { var bytesNeeded int var crc crc - rxbuf = make([]byte, 10243) + rxbuf = make([]byte, maxRTUFrameLength) // read the serial ADU header: unit id (1 byte), function code (1 byte) and // PDU length/exception code (1 byte) @@ -198,6 +251,75 @@ func (rt *rtuTransport) readRTUFrame() (res *pdu, err error) { return } +func (rt *rtuTransport) readRTUFrameWithRes() (res *pdu, err error) { + var rxbuf []byte + var byteCount int + var bytesNeeded int + var crc crc + + rxbuf = make([]byte, 4096) + + // read the serial ADU header: unit id (1 byte), function code (1 byte) and + // PDU length/exception code (1 byte) + byteCount, err = io.ReadFull(rt.link, rxbuf[0:3]) + if (byteCount > 0 || err == nil) && byteCount != 3 { + err = ErrShortFrame + return + } + if err != nil && err != io.ErrUnexpectedEOF { + return + } + + // figure out how many further bytes to read + bytesNeeded, err = expectedResponseLenth(uint8(rxbuf[1]), uint8(rxbuf[2])) + if err != nil { + return + } + + // we need to read 2 additional bytes of CRC after the payload + bytesNeeded += 2 + + // never read more than the max allowed frame length + if byteCount+bytesNeeded > maxRTUFrameLength { + err = ErrProtocolError + return + } + + byteCount, err = io.ReadFull(rt.link, rxbuf[3:3+bytesNeeded]) + if err != nil && err != io.ErrUnexpectedEOF { + return + } + if byteCount != bytesNeeded { + rt.logger.Warningf("expected %v bytes, received %v", bytesNeeded, byteCount) + err = ErrShortFrame + return + } + + // compute the CRC on the entire frame, excluding the CRC + crc.init() + crc.add(rxbuf[0 : 3+bytesNeeded-2]) + + // compare CRC values + if !crc.isEqual(rxbuf[3+bytesNeeded-2], rxbuf[3+bytesNeeded-1]) { + err = ErrBadCRC + return + } + + _, err = io.ReadFull(rt.link, rxbuf[3+bytesNeeded:]) + if err != nil && err != io.ErrUnexpectedEOF { + return + } + + res = &pdu{ + unitId: rxbuf[0], + functionCode: rxbuf[1], + // pass the byte count + trailing data as payload, withtout the CRC + payload: rxbuf[3+bytesNeeded:], + } + + return +} + // Turns a PDU object into bytes. func (rt *rtuTransport) assembleRTUFrame(p *pdu) (adu []byte) { var crc crc diff --git a/tcp_transport.go b/tcp_transport.go index b117bb1..bdeeb0a 100644 --- a/tcp_transport.go +++ b/tcp_transport.go @@ -9,23 +9,23 @@ import ( ) const ( - maxTCPFrameLength int = 260 - mbapHeaderLength int = 7 + maxTCPFrameLength int = 260 + mbapHeaderLength int = 7 ) type tcpTransport struct { - logger *logger - socket net.Conn - timeout time.Duration - lastTxnId uint16 + logger *logger + socket net.Conn + timeout time.Duration + lastTxnId uint16 } // Returns a new TCP transport. func newTCPTransport(socket net.Conn, timeout time.Duration, customLogger *log.Logger) (tt *tcpTransport) { tt = &tcpTransport{ - socket: socket, - timeout: timeout, - logger: newLogger(fmt.Sprintf("tcp-transport(%s)", socket.RemoteAddr()), customLogger), + socket: socket, + timeout: timeout, + logger: newLogger(fmt.Sprintf("tcp-transport(%s)", socket.RemoteAddr()), customLogger), } return @@ -33,7 +33,7 @@ func newTCPTransport(socket net.Conn, timeout time.Duration, customLogger *log.L // Closes the underlying tcp socket. func (tt *tcpTransport) Close() (err error) { - err = tt.socket.Close() + err = tt.socket.Close() return } @@ -41,7 +41,7 @@ func (tt *tcpTransport) Close() (err error) { // Runs a request across the socket and returns a response. func (tt *tcpTransport) ExecuteRequest(req *pdu) (res *pdu, err error) { // set an i/o deadline on the socket (read and write) - err = tt.socket.SetDeadline(time.Now().Add(tt.timeout)) + err = tt.socket.SetDeadline(time.Now().Add(tt.timeout)) if err != nil { return } @@ -49,7 +49,7 @@ func (tt *tcpTransport) ExecuteRequest(req *pdu) (res *pdu, err error) { // increase the transaction ID counter tt.lastTxnId++ - _, err = tt.socket.Write(tt.assembleMBAPFrame(tt.lastTxnId, req)) + _, err = tt.socket.Write(tt.assembleMBAPFrame(tt.lastTxnId, req)) if err != nil { return } @@ -59,30 +59,34 @@ func (tt *tcpTransport) ExecuteRequest(req *pdu) (res *pdu, err error) { return } +func (tt *tcpTransport) ExecuteRequestWithRes(req *pdu) (res *pdu, err error) { + return nil, nil +} + // Reads a request from the socket. func (tt *tcpTransport) ReadRequest() (req *pdu, err error) { - var txnId uint16 + var txnId uint16 // set an i/o deadline on the socket (read and write) - err = tt.socket.SetDeadline(time.Now().Add(tt.timeout)) + err = tt.socket.SetDeadline(time.Now().Add(tt.timeout)) if err != nil { return } - req, txnId, err = tt.readMBAPFrame() + req, txnId, err = tt.readMBAPFrame() if err != nil { return } // store the incoming transaction id - tt.lastTxnId = txnId + tt.lastTxnId = txnId return } // Writes a response to the socket. func (tt *tcpTransport) WriteResponse(res *pdu) (err error) { - _, err = tt.socket.Write(tt.assembleMBAPFrame(tt.lastTxnId, res)) + _, err = tt.socket.Write(tt.assembleMBAPFrame(tt.lastTxnId, res)) if err != nil { return } @@ -93,11 +97,11 @@ func (tt *tcpTransport) WriteResponse(res *pdu) (err error) { // Reads as many MBAP+modbus frames as necessary until either the response // matching tt.lastTxnId is received or an error occurs. func (tt *tcpTransport) readResponse() (res *pdu, err error) { - var txnId uint16 + var txnId uint16 for { // grab a frame - res, txnId, err = tt.readMBAPFrame() + res, txnId, err = tt.readMBAPFrame() // ignore unknown protocol identifiers if err == ErrUnknownProtocolId { @@ -111,9 +115,9 @@ func (tt *tcpTransport) readResponse() (res *pdu, err error) { // ignore unknown transaction identifiers if tt.lastTxnId != txnId { - tt.logger.Warningf("received unexpected transaction id " + - "(expected 0x%04x, received 0x%04x)", - tt.lastTxnId, txnId) + tt.logger.Warningf("received unexpected transaction id "+ + "(expected 0x%04x, received 0x%04x)", + tt.lastTxnId, txnId) continue } @@ -125,33 +129,33 @@ func (tt *tcpTransport) readResponse() (res *pdu, err error) { // Reads an entire frame (MBAP header + modbus PDU) from the socket. func (tt *tcpTransport) readMBAPFrame() (p *pdu, txnId uint16, err error) { - var rxbuf []byte - var bytesNeeded int - var protocolId uint16 - var unitId uint8 + var rxbuf []byte + var bytesNeeded int + var protocolId uint16 + var unitId uint8 // read the MBAP header - rxbuf = make([]byte, mbapHeaderLength) - _, err = io.ReadFull(tt.socket, rxbuf) + rxbuf = make([]byte, mbapHeaderLength) + _, err = io.ReadFull(tt.socket, rxbuf) if err != nil { return } // decode the transaction identifier - txnId = bytesToUint16(BIG_ENDIAN, rxbuf[0:2]) + txnId = bytesToUint16(BIG_ENDIAN, rxbuf[0:2]) // decode the protocol identifier - protocolId = bytesToUint16(BIG_ENDIAN, rxbuf[2:4]) + protocolId = bytesToUint16(BIG_ENDIAN, rxbuf[2:4]) // store the source unit id - unitId = rxbuf[6] + unitId = rxbuf[6] // determine how many more bytes we need to read - bytesNeeded = int(bytesToUint16(BIG_ENDIAN, rxbuf[4:6])) + bytesNeeded = int(bytesToUint16(BIG_ENDIAN, rxbuf[4:6])) // the byte count includes the unit ID field, which we already have bytesNeeded-- // never read more than the max allowed frame length - if bytesNeeded + mbapHeaderLength > maxTCPFrameLength { + if bytesNeeded+mbapHeaderLength > maxTCPFrameLength { err = ErrProtocolError return } @@ -163,8 +167,8 @@ func (tt *tcpTransport) readMBAPFrame() (p *pdu, txnId uint16, err error) { } // read the PDU - rxbuf = make([]byte, bytesNeeded) - _, err = io.ReadFull(tt.socket, rxbuf) + rxbuf = make([]byte, bytesNeeded) + _, err = io.ReadFull(tt.socket, rxbuf) if err != nil { return } @@ -178,9 +182,9 @@ func (tt *tcpTransport) readMBAPFrame() (p *pdu, txnId uint16, err error) { // store unit id, function code and payload in the PDU object p = &pdu{ - unitId: unitId, - functionCode: rxbuf[0], - payload: rxbuf[1:], + unitId: unitId, + functionCode: rxbuf[0], + payload: rxbuf[1:], } return @@ -189,17 +193,17 @@ func (tt *tcpTransport) readMBAPFrame() (p *pdu, txnId uint16, err error) { // Turns a PDU into an MBAP frame (MBAP header + PDU) and returns it as bytes. func (tt *tcpTransport) assembleMBAPFrame(txnId uint16, p *pdu) (payload []byte) { // transaction identifier - payload = uint16ToBytes(BIG_ENDIAN, txnId) + payload = uint16ToBytes(BIG_ENDIAN, txnId) // protocol identifier (always 0x0000) - payload = append(payload, 0x00, 0x00) + payload = append(payload, 0x00, 0x00) // length (covers unit identifier + function code + payload fields) - payload = append(payload, uint16ToBytes(BIG_ENDIAN, uint16(2 + len(p.payload)))...) + payload = append(payload, uint16ToBytes(BIG_ENDIAN, uint16(2+len(p.payload)))...) // unit identifier - payload = append(payload, p.unitId) + payload = append(payload, p.unitId) // function code - payload = append(payload, p.functionCode) + payload = append(payload, p.functionCode) // payload - payload = append(payload, p.payload...) + payload = append(payload, p.payload...) return } diff --git a/transport.go b/transport.go index 3e02013..648f976 100644 --- a/transport.go +++ b/transport.go @@ -1,18 +1,20 @@ package modbus type transportType uint + const ( - modbusRTU transportType = 1 - modbusRTUOverTCP transportType = 2 - modbusRTUOverUDP transportType = 3 - modbusTCP transportType = 4 - modbusTCPOverTLS transportType = 5 - modbusTCPOverUDP transportType = 6 + modbusRTU transportType = 1 + modbusRTUOverTCP transportType = 2 + modbusRTUOverUDP transportType = 3 + modbusTCP transportType = 4 + modbusTCPOverTLS transportType = 5 + modbusTCPOverUDP transportType = 6 ) type transport interface { - Close() (error) + Close() error ExecuteRequest(*pdu) (*pdu, error) - ReadRequest() (*pdu, error) - WriteResponse(*pdu) (error) + ExecuteRequestWithRes(*pdu) (*pdu, error) + ReadRequest() (*pdu, error) + WriteResponse(*pdu) error }