diff --git a/src/controller/index.js b/src/controller/index.js index a56dd7c..228df57 100644 --- a/src/controller/index.js +++ b/src/controller/index.js @@ -37,8 +37,7 @@ class Controller extends ENIP { }; this.workers = { - read: new Queue(compare, queue_max_size), - write: new Queue(compare, queue_max_size), + readWrite: new Queue(compare, queue_max_size), group: new Queue(compare, queue_max_size) }; } @@ -336,7 +335,7 @@ class Controller extends ENIP { * @memberof Controller */ readTag(tag, size = null) { - return this.workers.read.schedule(this._readTag.bind(this), [tag, size], { + return this.workers.readWrite.schedule(this._readTag.bind(this), [tag, size], { priority: 1, timestamp: new Date() }); @@ -352,7 +351,7 @@ class Controller extends ENIP { * @memberof Controller */ writeTag(tag, value = null, size = 0x01) { - return this.workers.write.schedule(this._writeTag.bind(this), [tag, value, size], { + return this.workers.readWrite.schedule(this._writeTag.bind(this), [tag, value, size], { priority: 1, timestamp: new Date() }); @@ -476,7 +475,6 @@ class Controller extends ENIP { async _readTag(tag, size = null) { const MR = tag.generateReadMessageRequest(size); - this.write_cip(MR); const readTagErr = new Error(`TIMEOUT occurred while writing Reading Tag: ${tag.name}.`); @@ -487,12 +485,11 @@ class Controller extends ENIP { if (err) reject(err); resolve(data); }); + this.write_cip(MR); }), 10000, readTagErr - ); - - this.removeAllListeners("Read Tag"); + ).finally(() => this.removeAllListeners("Read Tag")); tag.parseReadMessageResponse(data); } @@ -509,14 +506,11 @@ class Controller extends ENIP { async _writeTag(tag, value = null, size = 0x01) { const MR = tag.generateWriteMessageRequest(value, size); - this.write_cip(MR); - const writeTagErr = new Error(`TIMEOUT occurred while writing Writing Tag: ${tag.name}.`); // Wait for Response await promiseTimeout( new Promise((resolve, reject) => { - // Full Tag Writing this.on("Write Tag", (err, data) => { if (err) reject(err); @@ -532,13 +526,15 @@ class Controller extends ENIP { tag.unstageWriteRequest(); resolve(data); }); + this.write_cip(MR); }), 10000, writeTagErr - ); + ).finally(() => { + this.removeAllListeners("Write Tag"); + this.removeAllListeners("Read Modify Write Tag"); + }); - this.removeAllListeners("Write Tag"); - this.removeAllListeners("Read Modify Write Tag"); } /** @@ -555,7 +551,6 @@ class Controller extends ENIP { // Send Each Multi Service Message for (let msg of messages) { - this.write_cip(msg.data); // Wait for Controller to Respond const data = await promiseTimeout( @@ -565,12 +560,11 @@ class Controller extends ENIP { resolve(data); }); + this.write_cip(msg.data); }), 10000, readTagGroupErr - ); - - this.removeAllListeners("Multiple Service Packet"); + ).finally(() => this.removeAllListeners("Multiple Service Packet")); // Parse Messages group.parseReadMessageResponses(data, msg.tag_ids); @@ -591,7 +585,6 @@ class Controller extends ENIP { // Send Each Multi Service Message for (let msg of messages) { - this.write_cip(msg.data); // Wait for Controller to Respond const data = await promiseTimeout( @@ -601,12 +594,11 @@ class Controller extends ENIP { resolve(data); }); + this.write_cip(msg.data); }), 10000, writeTagGroupErr - ); - - this.removeAllListeners("Multiple Service Packet"); + ).finally(() => this.removeAllListeners("Multiple Service Packet")); group.parseWriteMessageRequests(data, msg.tag_ids); } diff --git a/src/enip/index.js b/src/enip/index.js index 6bdc7c5..ef15394 100644 --- a/src/enip/index.js +++ b/src/enip/index.js @@ -240,6 +240,11 @@ class ENIP extends Socket { const encapsulatedData = header.parse(data); const { statusCode, status, commandCode } = encapsulatedData; + // Handle buffers with multiple responses + if(data.length > 24 + encapsulatedData.length) { + setImmediate(() => this._handleDataEvent(data.subarray(24 + encapsulatedData.length))); + } + if (statusCode !== 0) { console.log(`Error <${statusCode}>:`.red, status.red);