From 5c4a43152bdb50e72884aa7aba716757134bb98e Mon Sep 17 00:00:00 2001 From: Jonas Frost Date: Sat, 17 Sep 2022 01:12:22 +0700 Subject: [PATCH] Restructured processTxQueue() and cleanup When processTxQueue() hits an account limit it bails, with this change it will skip the transactions with limited accounts and continue as far down the queue as it can, for each tick. General cleanup (sorry) --- hive-interface.js | 97 ++++++++++++++++++++++++++--------------------- 1 file changed, 53 insertions(+), 44 deletions(-) diff --git a/hive-interface.js b/hive-interface.js index 6984bbe..fda1c7f 100644 --- a/hive-interface.js +++ b/hive-interface.js @@ -40,7 +40,7 @@ class Hive { return new Promise(async (resolve, reject) => { try { resolve(await this.rpcCall(client => client.rc.getRCMana(account_name), 'get_rc_mana', [account_name])); - } catch(err) { + } catch(err) { if(!utils.isTxError(err)) utils.log(`All nodes failed making API call [rc_api.get_rc_mana].`, 1, 'Red'); @@ -53,7 +53,7 @@ class Hive { return new Promise(async (resolve, reject) => { try { resolve(await this.rpcCall(client => client.call(api, method_name, params), method_name, params)); - } catch(err) { + } catch(err) { if(!utils.isTxError(err)) utils.log(`All nodes failed making API call [${api}.${method_name}].`, 1, 'Red'); @@ -66,7 +66,7 @@ class Hive { return new Promise(async (resolve, reject) => { try { resolve(await this.rpcCall(client => client.database.call(method_name, params), method_name, params)); - } catch(err) { + } catch(err) { if(!utils.isTxError(err)) utils.log(`All nodes failed making API call [${method_name}].`, 1, 'Red'); @@ -82,7 +82,7 @@ class Hive { } catch(err) { if(!utils.isTxError(err)) utils.log(`All nodes failed broadcasting operation [${method_name}].`, 1, 'Red'); - + reject(err); } }); @@ -149,7 +149,7 @@ class Hive { try { return resolve(await call(client)); - } catch(err) { + } catch(err) { err.is_tx_error = utils.isTxError(err); if(err.is_tx_error) return reject(err); @@ -168,7 +168,7 @@ class Hive { updateClientErrors(client) { // Check if the client has had errors within the last 10 minutes if(client.last_error_date && client.last_error_date > Date.now() - 10 * 60 * 1000) - client.errors++; + client.errors++; else client.errors = 1; @@ -193,7 +193,7 @@ class Hive { // Left for backwards compatibility async custom_json(id, json, account, key, use_active) { var data = { - id, + id, json: JSON.stringify(json), required_auths: use_active ? [account] : [], required_posting_auths: use_active ? [] : [account] @@ -216,12 +216,12 @@ class Hive { // Left for backwards compatibility async customJsonNoQueue(id, json, account, key, use_active) { - + if(this.checkAccountUsageLimit(account)) return this.custom_json(id, json, account, key, use_active); var data = { - id: id, + id: id, json: JSON.stringify(json), required_auths: use_active ? [account] : [], required_posting_auths: use_active ? [] : [account] @@ -418,21 +418,27 @@ class Hive { }); } + getAccountUsage(account) { + return this.accounts_used.has(account) + ? this.accounts_used.get(account) + : 0 + } + + setAccountUsage(account,count) { + this.accounts_used.set(account,count) + } + // Check if you have already broadcast 4 transactions from an account, in which case return true. // Otherwise, increment the count and return false. checkAccountUsageLimit(account) { - if(this.accounts_used.has(account)) { - let count = this.accounts_used.get(account); - if(4 <= count) - return true; - else - this.accounts_used.set(account, count++); + const count = this.getAccountUsage(account) + + if(count <= 4) { + this.setAccountUsage(account,count+1) + return true } - else - { - this.accounts_used.set(account, 1); - } - return false; + + return false; } async queueTx(data, key, tx_call) { @@ -440,32 +446,35 @@ class Hive { } async processTxQueue() { - this.clear_ctr++; - // Every 3 seconds, or one block, clear the transaction counts - if(this.clear_ctr >= 3) - { - for (const key of this.accounts_used.keys()) { - this.accounts_used.set(key, 0); - } - this.clear_ctr = 0; + if( ++this.clear_ctr >= 3 ) { + // Every 3 seconds, or one block, clear the transaction usage map + this.accounts_used = new Map() + this.clear_ctr = 0 } - // If the queue is empty, exit here - if(this.tx_queue.length <= 0) return; - let exit = false; - let next_account = this.tx_queue[0].data.required_auths.length > 0 ? this.tx_queue[0].data.required_auths[0] : this.tx_queue[0].data.required_posting_auths[0]; - exit = this.checkAccountUsageLimit(next_account); - - while(exit == false) - { - const item = this.tx_queue.shift(); - utils.log(`Processing queue item ${item.data.id}`, 3); - - item.tx_call(item.data, item.key); - if(this.tx_queue.length <= 0) exit = true; - else { - next_account = this.tx_queue[0].data.required_auths.length > 0 ? this.tx_queue[0].data.required_auths[0] : this.tx_queue[0].data.required_posting_auths[0]; - exit = this.checkAccountUsageLimit(next_account); + + const requeuedItems = [] + + // If the queue contains items, then keep processing them + while( this.tx_queue.length > 0 ) { + const item = this.tx_queue.shift(); + + // Use required_auth if specified, fall back to required_posting_auths + const account = item.data.required_auths[0] || item.data.required_posting_auths[0]; + + if( !this.checkAccountUsageLimit(account) ) { + // if this account has reached its limit, then + // requeue item and wait for next tick + utils.log(`Account usage limit reached for ${account}`, 3); + requeuedItems.push(item) } + + utils.log(`Processing queue item ${item.data.id}`, 3); + item.tx_call(item.data, item.key); + } + + if( requeuedItems.length > 0 ) { + // add requeued items back to the front of the queue for next run + this.tx_queue.unshift(...requeuedItems) } } }