forked from steem-monsters/hive-interface
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhive-engine.js
More file actions
85 lines (70 loc) · 2.61 KB
/
hive-engine.js
File metadata and controls
85 lines (70 loc) · 2.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
const fs = require('fs');
const utils = require('./utils');
const SSC = require('sscjs');
let ssc = null;
module.exports = class HiveEngine {
ssc = null;
_options = {
rpc_url: "https://api.hive-engine.com/rpc",
chain_id: "ssc-mainnet-hive",
save_state: last_block => this.saveState(last_block),
load_state: () => this.loadState(),
state_file: 'state_he.json',
on_op: null
};
constructor(options) {
this._options = Object.assign(this._options, options);
this.ssc = new SSC(this._options.rpc_url);
}
async stream(on_op) {
this._options.on_op = on_op;
let last_block = 0;
// Load saved state (last block read)
if(this._options.load_state)
last_block = await this._options.load_state();
// Start streaming blocks
if(last_block > 0)
this.ssc.streamFromTo(last_block + 1, null, (err, block) => this.processBlock(err, block));
else
this.ssc.stream((err, block) => this.processBlock(err, block));
}
async processBlock(err, block) {
if(err)
utils.log('Error processing block: ' + err);
if(!block)
return;
utils.log('Processing block [' + block.blockNumber + ']...', block.blockNumber % 1000 == 0 ? 1 : 4);
try {
for(var i = 0; i < block.transactions.length; i++)
await this.processTransaction(block.transactions[i], block.blockNumber, new Date(block.timestamp + 'Z'), block.refSteemBlockNumber, block.refSteemBlockId, block.prevRefSteemBlockId);
} catch(err) { utils.log('Error processing block: ' + block.blockNumber + ', Error: ' + err.message); }
if(this._options.save_state)
this._options.save_state(block.blockNumber);
}
async processTransaction(tx, ssc_block_num, ssc_block_time, block_num, block_id, prev_block_id) {
let logs = utils.tryParse(tx.logs);
// The transaction was unsuccessful
if(!logs || logs.errors || !logs.events || logs.events.length == 0)
return;
if(this._options.on_op) {
try {
await this._options.on_op(tx, ssc_block_num, ssc_block_time, block_num, block_id, prev_block_id, utils.tryParse(tx.payload), logs.events);
} catch(err) { utils.log(`Error processing Hive Engine transaction [${tx.transactionId}]: ${err}`, 1, 'Red'); }
}
}
async loadState() {
// Check if state has been saved to disk, in which case load it
if (fs.existsSync(this._options.state_file)) {
let state = JSON.parse(fs.readFileSync(this._options.state_file));
utils.log('Restored saved state: ' + JSON.stringify(state));
return state.last_block;
}
}
saveState(last_block) {
// Save the last block read to disk
fs.writeFile(this._options.state_file, JSON.stringify({ last_block }), function (err) {
if (err)
utils.log(err);
});
}
}