diff --git a/client.go b/client.go index 5e5ebc8..9b9184f 100644 --- a/client.go +++ b/client.go @@ -793,7 +793,7 @@ func (mc *ModbusClient) WriteRegisterWithRes(addr uint16, value uint16) (bytes [ req.payload = append(req.payload, uint16ToBytes(mc.endianness, value)...) // run the request across the transport and wait for a response - res, err = mc.executeRequest(req) + res, err = mc.executeRequestWithRes(req) if err != nil { return } @@ -1188,15 +1188,9 @@ func (mc *ModbusClient) readRegistersWithFunctionCode(addr uint16, quantity uint unitId: mc.unitId, } - // if functionCode != fcCustomize { - // err = ErrUnexpectedParameters - // mc.logger.Errorf("unexpected function code (%d)", functionCode) - // return - // } - - if functionCode == 0 { + if functionCode != fcCustomize { err = ErrUnexpectedParameters - mc.logger.Errorf("unexpected register type (%v)", functionCode) + mc.logger.Errorf("unexpected function code (%d)", functionCode) return } @@ -1208,9 +1202,10 @@ func (mc *ModbusClient) readRegistersWithFunctionCode(addr uint16, quantity uint return } - if quantity > 1024 { + // 16 * 16 * 40 + if quantity > 10240 { err = ErrUnexpectedParameters - mc.logger.Error("quantity of registers exceeds 1024") + mc.logger.Error("quantity of registers exceeds 10240") return } @@ -1406,3 +1401,29 @@ func (mc *ModbusClient) executeRequest(req *pdu) (res *pdu, err error) { return } + +func (mc *ModbusClient) executeRequestWithRes(req *pdu) (res *pdu, err error) { + // send the request over the wire, wait for and decode the response + res, err = mc.transport.ExecuteRequestWithRes(req) + if err != nil { + // map i/o timeouts to ErrRequestTimedOut + if os.IsTimeout(err) { + err = ErrRequestTimedOut + } + return + } + + // make sure the source unit id matches that of the request + if (res.functionCode&0x80) == 0x00 && res.unitId != req.unitId { + err = ErrBadUnitId + return + } + // accept errors from gateway devices (using special unit id #255) + if (res.functionCode&0x80) == 0x80 && + (res.unitId != req.unitId && res.unitId != 0xff) { + err = ErrBadUnitId + return + } + + return +} diff --git a/modbus.go b/modbus.go index f527f77..e7021a2 100644 --- a/modbus.go +++ b/modbus.go @@ -41,7 +41,7 @@ const ( fcWriteFileRecord uint8 = 0x15 // customize - fcCustomize uint8 = 0x29 + fcCustomize uint8 = 0x41 // exception codes exIllegalFunction uint8 = 0x01 diff --git a/rtu_transport.go b/rtu_transport.go index fe1190f..01d46cb 100644 --- a/rtu_transport.go +++ b/rtu_transport.go @@ -8,7 +8,7 @@ import ( ) const ( - maxRTUFrameLength int = 256 + maxRTUFrameLength int = 256 ) type rtuTransport struct { @@ -21,10 +21,10 @@ type rtuTransport struct { } type rtuLink interface { - Close() (error) - Read([]byte) (int, error) - Write([]byte) (int, error) - SetDeadline(time.Time) (error) + Close() error + Read([]byte) (int, error) + Write([]byte) (int, error) + SetDeadline(time.Time) error } // Returns a new RTU transport. @@ -58,11 +58,11 @@ func (rt *rtuTransport) Close() (err error) { // Runs a request across the rtu link and returns a response. func (rt *rtuTransport) ExecuteRequest(req *pdu) (res *pdu, err error) { var ts time.Time - var t time.Duration - var n int + var t time.Duration + var n int // set an i/o deadline on the link - err = rt.link.SetDeadline(time.Now().Add(rt.timeout)) + err = rt.link.SetDeadline(time.Now().Add(rt.timeout)) if err != nil { return } @@ -78,7 +78,7 @@ func (rt *rtuTransport) ExecuteRequest(req *pdu) (res *pdu, err error) { // build an RTU ADU out of the request object and // send the final ADU+CRC on the wire - n, err = rt.link.Write(rt.assembleRTUFrame(req)) + n, err = rt.link.Write(rt.assembleRTUFrame(req)) if err != nil { return } @@ -109,10 +109,63 @@ func (rt *rtuTransport) ExecuteRequest(req *pdu) (res *pdu, err error) { return } +func (rt *rtuTransport) ExecuteRequestWithRes(req *pdu) (res *pdu, err error) { + var ts time.Time + var t time.Duration + var n int + + // set an i/o deadline on the link + err = rt.link.SetDeadline(time.Now().Add(rt.timeout)) + if err != nil { + return + } + + // if the line was active less than 3.5 char times ago, + // let t3.5 expire before transmitting + t = time.Since(rt.lastActivity.Add(rt.t35)) + if t < 0 { + time.Sleep(t * (-1)) + } + + ts = time.Now() + + // build an RTU ADU out of the request object and + // send the final ADU+CRC on the wire + n, err = rt.link.Write(rt.assembleRTUFrame(req)) + if err != nil { + return + } + + // estimate how long the serial line was busy for. + // note that on most platforms, Write() will be buffered and return + // immediately rather than block until the buffer is drained + rt.lastActivity = ts.Add(time.Duration(n) * rt.t1) + + // observe inter-frame delays + time.Sleep(rt.lastActivity.Add(rt.t35).Sub(time.Now())) + + // read the response back from the wire + res, err = rt.readRTUFrameWithRes() + + if err == ErrBadCRC || err == ErrProtocolError || err == ErrShortFrame { + // wait for and flush any data coming off the link to allow + // devices to re-sync + time.Sleep(time.Duration(maxRTUFrameLength) * rt.t1) + discard(rt.link) + } + + // mark the time if we heard anything back + if err != ErrRequestTimedOut { + rt.lastActivity = time.Now() + } + + return +} + // Reads a request from the rtu link. func (rt *rtuTransport) ReadRequest() (req *pdu, err error) { // reading requests from RTU links is currently unsupported - err = fmt.Errorf("unimplemented") + err = fmt.Errorf("unimplemented") return } @@ -123,7 +176,7 @@ func (rt *rtuTransport) WriteResponse(res *pdu) (err error) { // build an RTU ADU out of the request object and // send the final ADU+CRC on the wire - n, err = rt.link.Write(rt.assembleRTUFrame(res)) + n, err = rt.link.Write(rt.assembleRTUFrame(res)) if err != nil { return } @@ -135,16 +188,16 @@ func (rt *rtuTransport) WriteResponse(res *pdu) (err error) { // Waits for, reads and decodes a frame from the rtu link. func (rt *rtuTransport) readRTUFrame() (res *pdu, err error) { - var rxbuf []byte - var byteCount int - var bytesNeeded int - var crc crc + var rxbuf []byte + var byteCount int + var bytesNeeded int + var crc crc - rxbuf = make([]byte, maxRTUFrameLength) + rxbuf = make([]byte, maxRTUFrameLength) // read the serial ADU header: unit id (1 byte), function code (1 byte) and // PDU length/exception code (1 byte) - byteCount, err = io.ReadFull(rt.link, rxbuf[0:3]) + byteCount, err = io.ReadFull(rt.link, rxbuf[0:3]) if (byteCount > 0 || err == nil) && byteCount != 3 { err = ErrShortFrame return @@ -160,15 +213,15 @@ func (rt *rtuTransport) readRTUFrame() (res *pdu, err error) { } // we need to read 2 additional bytes of CRC after the payload - bytesNeeded += 2 + bytesNeeded += 2 // never read more than the max allowed frame length - if byteCount + bytesNeeded > maxRTUFrameLength { - err = ErrProtocolError + if byteCount+bytesNeeded > maxRTUFrameLength { + err = ErrProtocolError return } - byteCount, err = io.ReadFull(rt.link, rxbuf[3:3 + bytesNeeded]) + byteCount, err = io.ReadFull(rt.link, rxbuf[3:3+bytesNeeded]) if err != nil && err != io.ErrUnexpectedEOF { return } @@ -180,19 +233,88 @@ func (rt *rtuTransport) readRTUFrame() (res *pdu, err error) { // compute the CRC on the entire frame, excluding the CRC crc.init() - crc.add(rxbuf[0:3 + bytesNeeded - 2]) + crc.add(rxbuf[0 : 3+bytesNeeded-2]) // compare CRC values - if !crc.isEqual(rxbuf[3 + bytesNeeded - 2], rxbuf[3 + bytesNeeded - 1]) { + if !crc.isEqual(rxbuf[3+bytesNeeded-2], rxbuf[3+bytesNeeded-1]) { err = ErrBadCRC return } - res = &pdu{ - unitId: rxbuf[0], - functionCode: rxbuf[1], + res = &pdu{ + unitId: rxbuf[0], + functionCode: rxbuf[1], // pass the byte count + trailing data as payload, withtout the CRC - payload: rxbuf[2:3 + bytesNeeded - 2], + payload: rxbuf[2 : 3+bytesNeeded-2], + } + + return +} + +func (rt *rtuTransport) readRTUFrameWithRes() (res *pdu, err error) { + var rxbuf []byte + var byteCount int + var bytesNeeded int + var crc crc + + rxbuf = make([]byte, 4096) + + // read the serial ADU header: unit id (1 byte), function code (1 byte) and + // PDU length/exception code (1 byte) + byteCount, err = io.ReadFull(rt.link, rxbuf[0:3]) + if (byteCount > 0 || err == nil) && byteCount != 3 { + err = ErrShortFrame + return + } + if err != nil && err != io.ErrUnexpectedEOF { + return + } + + // figure out how many further bytes to read + bytesNeeded, err = expectedResponseLenth(uint8(rxbuf[1]), uint8(rxbuf[2])) + if err != nil { + return + } + + // we need to read 2 additional bytes of CRC after the payload + bytesNeeded += 2 + + // never read more than the max allowed frame length + if byteCount+bytesNeeded > maxRTUFrameLength { + err = ErrProtocolError + return + } + + byteCount, err = io.ReadFull(rt.link, rxbuf[3:3+bytesNeeded]) + if err != nil && err != io.ErrUnexpectedEOF { + return + } + if byteCount != bytesNeeded { + rt.logger.Warningf("expected %v bytes, received %v", bytesNeeded, byteCount) + err = ErrShortFrame + return + } + + // compute the CRC on the entire frame, excluding the CRC + crc.init() + crc.add(rxbuf[0 : 3+bytesNeeded-2]) + + // compare CRC values + if !crc.isEqual(rxbuf[3+bytesNeeded-2], rxbuf[3+bytesNeeded-1]) { + err = ErrBadCRC + return + } + + _, err = io.ReadFull(rt.link, rxbuf[3+bytesNeeded:]) + if err != nil && err != io.ErrUnexpectedEOF { + return + } + + res = &pdu{ + unitId: rxbuf[0], + functionCode: rxbuf[1], + // pass the byte count + trailing data as payload, withtout the CRC + payload: rxbuf[3+bytesNeeded:], } return @@ -200,18 +322,18 @@ func (rt *rtuTransport) readRTUFrame() (res *pdu, err error) { // Turns a PDU object into bytes. func (rt *rtuTransport) assembleRTUFrame(p *pdu) (adu []byte) { - var crc crc + var crc crc - adu = append(adu, p.unitId) - adu = append(adu, p.functionCode) - adu = append(adu, p.payload...) + adu = append(adu, p.unitId) + adu = append(adu, p.functionCode) + adu = append(adu, p.payload...) // run the ADU through the CRC generator crc.init() crc.add(adu) // append the CRC to the ADU - adu = append(adu, crc.value()...) + adu = append(adu, crc.value()...) return } @@ -220,24 +342,31 @@ func (rt *rtuTransport) assembleRTUFrame(p *pdu) (adu []byte) { func expectedResponseLenth(responseCode uint8, responseLength uint8) (byteCount int, err error) { switch responseCode { case fcReadHoldingRegisters, - fcReadInputRegisters, - fcReadCoils, - fcReadDiscreteInputs: byteCount = int(responseLength) + fcReadInputRegisters, + fcReadCoils, + fcReadDiscreteInputs, + 0x41: + byteCount = int(responseLength) case fcWriteSingleRegister, - fcWriteMultipleRegisters, - fcWriteSingleCoil, - fcWriteMultipleCoils: byteCount = 3 - case fcMaskWriteRegister: byteCount = 5 + fcWriteMultipleRegisters, + fcWriteSingleCoil, + fcWriteMultipleCoils: + byteCount = 3 + case fcMaskWriteRegister: + byteCount = 5 case fcReadHoldingRegisters | 0x80, - fcReadInputRegisters | 0x80, - fcReadCoils | 0x80, - fcReadDiscreteInputs | 0x80, - fcWriteSingleRegister | 0x80, - fcWriteMultipleRegisters | 0x80, - fcWriteSingleCoil | 0x80, - fcWriteMultipleCoils | 0x80, - fcMaskWriteRegister | 0x80: byteCount = 0 - default: err = ErrProtocolError + fcReadInputRegisters | 0x80, + fcReadCoils | 0x80, + fcReadDiscreteInputs | 0x80, + fcWriteSingleRegister | 0x80, + fcWriteMultipleRegisters | 0x80, + fcWriteSingleCoil | 0x80, + fcWriteMultipleCoils | 0x80, + fcMaskWriteRegister | 0x80, + 0x41 | 0x80: + byteCount = 0 + default: + err = ErrProtocolError } return diff --git a/tcp_transport.go b/tcp_transport.go index b117bb1..bdeeb0a 100644 --- a/tcp_transport.go +++ b/tcp_transport.go @@ -9,23 +9,23 @@ import ( ) const ( - maxTCPFrameLength int = 260 - mbapHeaderLength int = 7 + maxTCPFrameLength int = 260 + mbapHeaderLength int = 7 ) type tcpTransport struct { - logger *logger - socket net.Conn - timeout time.Duration - lastTxnId uint16 + logger *logger + socket net.Conn + timeout time.Duration + lastTxnId uint16 } // Returns a new TCP transport. func newTCPTransport(socket net.Conn, timeout time.Duration, customLogger *log.Logger) (tt *tcpTransport) { tt = &tcpTransport{ - socket: socket, - timeout: timeout, - logger: newLogger(fmt.Sprintf("tcp-transport(%s)", socket.RemoteAddr()), customLogger), + socket: socket, + timeout: timeout, + logger: newLogger(fmt.Sprintf("tcp-transport(%s)", socket.RemoteAddr()), customLogger), } return @@ -33,7 +33,7 @@ func newTCPTransport(socket net.Conn, timeout time.Duration, customLogger *log.L // Closes the underlying tcp socket. func (tt *tcpTransport) Close() (err error) { - err = tt.socket.Close() + err = tt.socket.Close() return } @@ -41,7 +41,7 @@ func (tt *tcpTransport) Close() (err error) { // Runs a request across the socket and returns a response. func (tt *tcpTransport) ExecuteRequest(req *pdu) (res *pdu, err error) { // set an i/o deadline on the socket (read and write) - err = tt.socket.SetDeadline(time.Now().Add(tt.timeout)) + err = tt.socket.SetDeadline(time.Now().Add(tt.timeout)) if err != nil { return } @@ -49,7 +49,7 @@ func (tt *tcpTransport) ExecuteRequest(req *pdu) (res *pdu, err error) { // increase the transaction ID counter tt.lastTxnId++ - _, err = tt.socket.Write(tt.assembleMBAPFrame(tt.lastTxnId, req)) + _, err = tt.socket.Write(tt.assembleMBAPFrame(tt.lastTxnId, req)) if err != nil { return } @@ -59,30 +59,34 @@ func (tt *tcpTransport) ExecuteRequest(req *pdu) (res *pdu, err error) { return } +func (tt *tcpTransport) ExecuteRequestWithRes(req *pdu) (res *pdu, err error) { + return nil, nil +} + // Reads a request from the socket. func (tt *tcpTransport) ReadRequest() (req *pdu, err error) { - var txnId uint16 + var txnId uint16 // set an i/o deadline on the socket (read and write) - err = tt.socket.SetDeadline(time.Now().Add(tt.timeout)) + err = tt.socket.SetDeadline(time.Now().Add(tt.timeout)) if err != nil { return } - req, txnId, err = tt.readMBAPFrame() + req, txnId, err = tt.readMBAPFrame() if err != nil { return } // store the incoming transaction id - tt.lastTxnId = txnId + tt.lastTxnId = txnId return } // Writes a response to the socket. func (tt *tcpTransport) WriteResponse(res *pdu) (err error) { - _, err = tt.socket.Write(tt.assembleMBAPFrame(tt.lastTxnId, res)) + _, err = tt.socket.Write(tt.assembleMBAPFrame(tt.lastTxnId, res)) if err != nil { return } @@ -93,11 +97,11 @@ func (tt *tcpTransport) WriteResponse(res *pdu) (err error) { // Reads as many MBAP+modbus frames as necessary until either the response // matching tt.lastTxnId is received or an error occurs. func (tt *tcpTransport) readResponse() (res *pdu, err error) { - var txnId uint16 + var txnId uint16 for { // grab a frame - res, txnId, err = tt.readMBAPFrame() + res, txnId, err = tt.readMBAPFrame() // ignore unknown protocol identifiers if err == ErrUnknownProtocolId { @@ -111,9 +115,9 @@ func (tt *tcpTransport) readResponse() (res *pdu, err error) { // ignore unknown transaction identifiers if tt.lastTxnId != txnId { - tt.logger.Warningf("received unexpected transaction id " + - "(expected 0x%04x, received 0x%04x)", - tt.lastTxnId, txnId) + tt.logger.Warningf("received unexpected transaction id "+ + "(expected 0x%04x, received 0x%04x)", + tt.lastTxnId, txnId) continue } @@ -125,33 +129,33 @@ func (tt *tcpTransport) readResponse() (res *pdu, err error) { // Reads an entire frame (MBAP header + modbus PDU) from the socket. func (tt *tcpTransport) readMBAPFrame() (p *pdu, txnId uint16, err error) { - var rxbuf []byte - var bytesNeeded int - var protocolId uint16 - var unitId uint8 + var rxbuf []byte + var bytesNeeded int + var protocolId uint16 + var unitId uint8 // read the MBAP header - rxbuf = make([]byte, mbapHeaderLength) - _, err = io.ReadFull(tt.socket, rxbuf) + rxbuf = make([]byte, mbapHeaderLength) + _, err = io.ReadFull(tt.socket, rxbuf) if err != nil { return } // decode the transaction identifier - txnId = bytesToUint16(BIG_ENDIAN, rxbuf[0:2]) + txnId = bytesToUint16(BIG_ENDIAN, rxbuf[0:2]) // decode the protocol identifier - protocolId = bytesToUint16(BIG_ENDIAN, rxbuf[2:4]) + protocolId = bytesToUint16(BIG_ENDIAN, rxbuf[2:4]) // store the source unit id - unitId = rxbuf[6] + unitId = rxbuf[6] // determine how many more bytes we need to read - bytesNeeded = int(bytesToUint16(BIG_ENDIAN, rxbuf[4:6])) + bytesNeeded = int(bytesToUint16(BIG_ENDIAN, rxbuf[4:6])) // the byte count includes the unit ID field, which we already have bytesNeeded-- // never read more than the max allowed frame length - if bytesNeeded + mbapHeaderLength > maxTCPFrameLength { + if bytesNeeded+mbapHeaderLength > maxTCPFrameLength { err = ErrProtocolError return } @@ -163,8 +167,8 @@ func (tt *tcpTransport) readMBAPFrame() (p *pdu, txnId uint16, err error) { } // read the PDU - rxbuf = make([]byte, bytesNeeded) - _, err = io.ReadFull(tt.socket, rxbuf) + rxbuf = make([]byte, bytesNeeded) + _, err = io.ReadFull(tt.socket, rxbuf) if err != nil { return } @@ -178,9 +182,9 @@ func (tt *tcpTransport) readMBAPFrame() (p *pdu, txnId uint16, err error) { // store unit id, function code and payload in the PDU object p = &pdu{ - unitId: unitId, - functionCode: rxbuf[0], - payload: rxbuf[1:], + unitId: unitId, + functionCode: rxbuf[0], + payload: rxbuf[1:], } return @@ -189,17 +193,17 @@ func (tt *tcpTransport) readMBAPFrame() (p *pdu, txnId uint16, err error) { // Turns a PDU into an MBAP frame (MBAP header + PDU) and returns it as bytes. func (tt *tcpTransport) assembleMBAPFrame(txnId uint16, p *pdu) (payload []byte) { // transaction identifier - payload = uint16ToBytes(BIG_ENDIAN, txnId) + payload = uint16ToBytes(BIG_ENDIAN, txnId) // protocol identifier (always 0x0000) - payload = append(payload, 0x00, 0x00) + payload = append(payload, 0x00, 0x00) // length (covers unit identifier + function code + payload fields) - payload = append(payload, uint16ToBytes(BIG_ENDIAN, uint16(2 + len(p.payload)))...) + payload = append(payload, uint16ToBytes(BIG_ENDIAN, uint16(2+len(p.payload)))...) // unit identifier - payload = append(payload, p.unitId) + payload = append(payload, p.unitId) // function code - payload = append(payload, p.functionCode) + payload = append(payload, p.functionCode) // payload - payload = append(payload, p.payload...) + payload = append(payload, p.payload...) return } diff --git a/transport.go b/transport.go index 3e02013..648f976 100644 --- a/transport.go +++ b/transport.go @@ -1,18 +1,20 @@ package modbus type transportType uint + const ( - modbusRTU transportType = 1 - modbusRTUOverTCP transportType = 2 - modbusRTUOverUDP transportType = 3 - modbusTCP transportType = 4 - modbusTCPOverTLS transportType = 5 - modbusTCPOverUDP transportType = 6 + modbusRTU transportType = 1 + modbusRTUOverTCP transportType = 2 + modbusRTUOverUDP transportType = 3 + modbusTCP transportType = 4 + modbusTCPOverTLS transportType = 5 + modbusTCPOverUDP transportType = 6 ) type transport interface { - Close() (error) + Close() error ExecuteRequest(*pdu) (*pdu, error) - ReadRequest() (*pdu, error) - WriteResponse(*pdu) (error) + ExecuteRequestWithRes(*pdu) (*pdu, error) + ReadRequest() (*pdu, error) + WriteResponse(*pdu) error }