Skip to content

Commit

Permalink
fix: max event listeners (#3)
Browse files Browse the repository at this point in the history
* chore: update

* chore: update

* chore: update
  • Loading branch information
kybarg authored Jul 2, 2024
1 parent 6dc6d70 commit 03f7628
Showing 1 changed file with 89 additions and 96 deletions.
185 changes: 89 additions & 96 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,35 +72,29 @@ class SSP extends EventEmitter {
async open(port, options = {}) {
const serialOptions = { ...PORT_OPTIONS, ...options }

try {
this.port = new SerialPort({ path: port, ...serialOptions, autoOpen: true })
this.port = new SerialPort({ path: port, ...serialOptions, autoOpen: true })

await Promise.race([once(this.port, 'open'), once(this.port, 'close')])
await once(this.port, 'open')

this.port.on('open', () => {
this.emit('OPEN')
})
this.port.on('open', () => {
this.emit('OPEN')
})

this.port.on('close', () => {
this.emit('CLOSE')
})
this.port.on('close', () => {
this.emit('CLOSE')
})

this.port.on('error', error => {
this.emit('ERROR', error)
this.eventEmitter.emit('error', error)
})

this.parser = this.port.pipe(new SSPParser())
this.parser.on('data', buffer => {
this.eventEmitter.emit('DATA', buffer)
})

return
} catch (error) {
this.port.on('error', error => {
this.emit('ERROR', error)
this.eventEmitter.emit('error', error)
})

throw error
}
this.parser = this.port.pipe(new SSPParser())
this.parser.on('data', buffer => {
this.eventEmitter.emit('DATA', buffer)
})

return
}

async close() {
Expand Down Expand Up @@ -250,84 +244,92 @@ class SSP extends EventEmitter {
}

async sendToDevice(command, txBuffer, txBufferPlain) {
// Set processing state
this.state.processing = true
debug('COM <-', chalk.cyan(txBuffer.toString('hex')), chalk.green(command), this.eCount, Date.now())

const debugData = {
command,
tx: {
createdAt: Date.now(),
encrypted: txBuffer,
plain: txBufferPlain,
},
rx: {
createdAt: null,
encrypted: null,
plain: null,
},
}
for (let i = 0; i < 20; i++) {
// Set processing state
this.state.processing = true
debug('COM <-', chalk.cyan(txBuffer.toString('hex')), chalk.green(command), this.eCount, Date.now())
const debugData = {
command,
tx: {
createdAt: Date.now(),
encrypted: txBuffer,
plain: txBufferPlain,
},
rx: {
createdAt: null,
encrypted: null,
plain: null,
},
}

// Define command timeout
const commandTimeout = setTimeout(() => {
this.eventEmitter.emit('error', new Error('TIMEOUT'))
}, this.config.timeout)
// Define command timeout
const commandTimeout = setTimeout(() => {
this.eventEmitter.emit('error', new Error('TIMEOUT'))
}, this.config.timeout)

try {
// Send command to device
this.port.write(txBuffer)
this.port.drain()
this.commandSendAttempts += 1
try {
// Send command to device
let skipDrain = this.port.write(txBuffer)
if (!skipDrain) this.port.drain()

// Await data from device
const [rxBuffer] = await once(this.eventEmitter, 'DATA')
debugData.rx.createdAt = Date.now()
debugData.rx.encrypted = rxBuffer
this.commandSendAttempts += 1

debug('COM ->', chalk.yellow(rxBuffer.toString('hex')), chalk.green(command), this.eCount, Date.now())
// Await data from device
const [rxBuffer] = await once(this.eventEmitter, 'DATA')
clearTimeout(commandTimeout)
debugData.rx.createdAt = Date.now()
debugData.rx.encrypted = rxBuffer

// Extract packet data bytes omiting packing data
const DATA = extractPacketData(rxBuffer, this.keys.encryptKey, this.eCount)
debugData.rx.plain = Buffer.from([...rxBuffer.slice(0, 2), DATA.length, ...DATA, ...CRC16([rxBuffer[1], DATA.length, ...DATA])])
debug('COM ->', chalk.yellow(rxBuffer.toString('hex')), chalk.green(command), this.eCount, Date.now())

// Check if sequence flag mismatch
if (txBuffer[1] !== rxBuffer[1]) {
throw new Error('Sequence flag mismatch')
}
// Extract packet data bytes omiting packing data
const DATA = extractPacketData(rxBuffer, this.keys.encryptKey, this.eCount)
debugData.rx.plain = Buffer.from([...rxBuffer.slice(0, 2), DATA.length, ...DATA, ...CRC16([rxBuffer[1], DATA.length, ...DATA])])

// Increment counter if encrypted command is received
if (this.keys.encryptKey && rxBuffer[3] === 0x7e) {
this.eCount += 1
}
// Check if sequence flag mismatch
if (txBuffer[1] !== rxBuffer[1]) {
throw new Error('Sequence flag mismatch')
}

// Return parsed packet data
return this.parsePacketData(DATA, command)
} catch (error) {
debugData.rx.createdAt = Date.now()

console.log(error)

// Retry sending the same command
// After 20 retries, host assumes that the slave has crashed.
if (this.commandSendAttempts < 20) {
return this.sendToDevice(command, txBuffer, txBufferPlain)
} else {
throw {
success: false,
error: 'Command failed afte 20 retries',
reason: error,
// Increment counter if encrypted command is received
if (this.keys.encryptKey && rxBuffer[3] === 0x7e) {
this.eCount += 1
}
}
} finally {
// Unset processing state and clear command fail timeout
this.state.processing = false
clearTimeout(commandTimeout)

this.emit('DEBUG', debugData)
// Return parsed packet data
return this.parsePacketData(DATA, command)
} catch (error) {
debugData.rx.createdAt = Date.now()
// Retry sending the same command
// After 20 retries, host assumes that the slave has crashed.
if (this.commandSendAttempts == 20) {
throw {
success: false,
error: 'Command failed afte 20 retries',
reason: error,
}
}
} finally {
// Unset processing state and clear command fail timeout
clearTimeout(commandTimeout)
this.state.processing = false
this.emit('DEBUG', debugData)
}
}
}

async poll(status = null) {
// Wait until processing is finished
if (this.state.processing)
await new Promise(resolve => {
const interval = setInterval(() => {
if (!this.state.processing) {
clearInterval(interval)
resolve()
}
}, 1)
})

// If status is true and polling is already in progress, exit early
if (status === true && this.state.polling === true) return

Expand All @@ -339,15 +341,6 @@ class SSP extends EventEmitter {
else if (status === false) {
this.state.polling = false
clearTimeout(this.pollTimeout)
// Wait until processing is finished
await new Promise(resolve => {
const interval = setInterval(() => {
if (!this.state.processing) {
clearInterval(interval)
resolve()
}
}, 100)
})
}

// Poll only if polling is enabled
Expand Down

0 comments on commit 03f7628

Please sign in to comment.