Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 53 additions & 44 deletions hive-interface.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class Hive {
return new Promise(async (resolve, reject) => {
try {
resolve(await this.rpcCall(client => client.rc.getRCMana(account_name), 'get_rc_mana', [account_name]));
} catch(err) {
} catch(err) {
if(!utils.isTxError(err))
utils.log(`All nodes failed making API call [rc_api.get_rc_mana].`, 1, 'Red');

Expand All @@ -53,7 +53,7 @@ class Hive {
return new Promise(async (resolve, reject) => {
try {
resolve(await this.rpcCall(client => client.call(api, method_name, params), method_name, params));
} catch(err) {
} catch(err) {
if(!utils.isTxError(err))
utils.log(`All nodes failed making API call [${api}.${method_name}].`, 1, 'Red');

Expand All @@ -66,7 +66,7 @@ class Hive {
return new Promise(async (resolve, reject) => {
try {
resolve(await this.rpcCall(client => client.database.call(method_name, params), method_name, params));
} catch(err) {
} catch(err) {
if(!utils.isTxError(err))
utils.log(`All nodes failed making API call [${method_name}].`, 1, 'Red');

Expand All @@ -82,7 +82,7 @@ class Hive {
} catch(err) {
if(!utils.isTxError(err))
utils.log(`All nodes failed broadcasting operation [${method_name}].`, 1, 'Red');

reject(err);
}
});
Expand Down Expand Up @@ -149,7 +149,7 @@ class Hive {

try {
return resolve(await call(client));
} catch(err) {
} catch(err) {
err.is_tx_error = utils.isTxError(err);
if(err.is_tx_error)
return reject(err);
Expand All @@ -168,7 +168,7 @@ class Hive {
updateClientErrors(client) {
// Check if the client has had errors within the last 10 minutes
if(client.last_error_date && client.last_error_date > Date.now() - 10 * 60 * 1000)
client.errors++;
client.errors++;
else
client.errors = 1;

Expand All @@ -193,7 +193,7 @@ class Hive {
// Left for backwards compatibility
async custom_json(id, json, account, key, use_active) {
var data = {
id,
id,
json: JSON.stringify(json),
required_auths: use_active ? [account] : [],
required_posting_auths: use_active ? [] : [account]
Expand All @@ -216,12 +216,12 @@ class Hive {

// Left for backwards compatibility
async customJsonNoQueue(id, json, account, key, use_active) {

if(this.checkAccountUsageLimit(account))
return this.custom_json(id, json, account, key, use_active);

var data = {
id: id,
id: id,
json: JSON.stringify(json),
required_auths: use_active ? [account] : [],
required_posting_auths: use_active ? [] : [account]
Expand Down Expand Up @@ -418,54 +418,63 @@ class Hive {
});
}

getAccountUsage(account) {
return this.accounts_used.has(account)
? this.accounts_used.get(account)
: 0
}

setAccountUsage(account,count) {
this.accounts_used.set(account,count)
}

// Check if you have already broadcast 4 transactions from an account, in which case return true.
// Otherwise, increment the count and return false.
checkAccountUsageLimit(account) {
if(this.accounts_used.has(account)) {
let count = this.accounts_used.get(account);
if(4 <= count)
return true;
else
this.accounts_used.set(account, count++);
const count = this.getAccountUsage(account)

if(count <= 4) {
this.setAccountUsage(account,count+1)
return true
}
else
{
this.accounts_used.set(account, 1);
}
return false;

return false;
}

async queueTx(data, key, tx_call) {
this.tx_queue.push({ data, key, tx_call });
}

async processTxQueue() {
this.clear_ctr++;
// Every 3 seconds, or one block, clear the transaction counts
if(this.clear_ctr >= 3)
{
for (const key of this.accounts_used.keys()) {
this.accounts_used.set(key, 0);
}
this.clear_ctr = 0;
if( ++this.clear_ctr >= 3 ) {
// Every 3 seconds, or one block, clear the transaction usage map
this.accounts_used = new Map()
this.clear_ctr = 0
}
// If the queue is empty, exit here
if(this.tx_queue.length <= 0) return;
let exit = false;
let next_account = this.tx_queue[0].data.required_auths.length > 0 ? this.tx_queue[0].data.required_auths[0] : this.tx_queue[0].data.required_posting_auths[0];
exit = this.checkAccountUsageLimit(next_account);

while(exit == false)
{
const item = this.tx_queue.shift();
utils.log(`Processing queue item ${item.data.id}`, 3);

item.tx_call(item.data, item.key);
if(this.tx_queue.length <= 0) exit = true;
else {
next_account = this.tx_queue[0].data.required_auths.length > 0 ? this.tx_queue[0].data.required_auths[0] : this.tx_queue[0].data.required_posting_auths[0];
exit = this.checkAccountUsageLimit(next_account);

const requeuedItems = []

// If the queue contains items, then keep processing them
while( this.tx_queue.length > 0 ) {
const item = this.tx_queue.shift();

// Use required_auth if specified, fall back to required_posting_auths
const account = item.data.required_auths[0] || item.data.required_posting_auths[0];

if( !this.checkAccountUsageLimit(account) ) {
// if this account has reached its limit, then
// requeue item and wait for next tick
utils.log(`Account usage limit reached for ${account}`, 3);
requeuedItems.push(item)
}

utils.log(`Processing queue item ${item.data.id}`, 3);
item.tx_call(item.data, item.key);
}

if( requeuedItems.length > 0 ) {
// add requeued items back to the front of the queue for next run
this.tx_queue.unshift(...requeuedItems)
}
}
}
Expand Down