-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathblock.py
319 lines (288 loc) · 11.8 KB
/
block.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
from crypto_lib import *
from crypto_op import *
from datetime import datetime
import os
import mmap
class BlockFile:
"""The block file class, which holds a file pointer."""
def __init__(self, block_filename):
self.block_filename = block_filename
with open(block_filename, 'rb', buffering=16 * 1024 * 1024) as f:
size = os.path.getsize(f.name)
self.blockchain = mmap.mmap(f.fileno(), size, access=mmap.ACCESS_READ)
def get_next_block(self):
while True:
block = Block(self.blockchain)
if block.is_ready:
yield block
else:
break
class BlockHeader:
def __init__(self, blockchain):
self.version = uint4(blockchain)
self.previous_hash = hash32(blockchain)
self.merkle_hash = hash32(blockchain)
self.time = uint4(blockchain)
self.bits = uint4(blockchain)
self.nonce = uint4(blockchain)
def to_string(self):
sb = []
sb.append("Version: %d" % self.version)
sb.append("Previous Hash: %s" % hashStr(self.previous_hash))
sb.append("Merkle Root: %s" % hashStr(self.merkle_hash))
sb.append("Time stamp: %s" % self.decode_time(self.time))
sb.append("Difficulty: %d" % self.bits)
sb.append("Nonce: %s" % self.nonce)
return sb
def decode_time(self, time):
utc_time = datetime.utcfromtimestamp(time)
return utc_time.strftime("%Y-%m-%d %H:%M:%S.%f+00:00 (UTC)")
class Block:
def __init__(self, blockchain):
self.blockchain = blockchain
self.is_ready = True
self.magic_num = 0
self.block_size = 0
self.block_header = ''
self.tx_count = 0
self.txs = []
if self.has_length(8):
self.magic_num = uint4(self.blockchain)
self.block_size = uint4(self.blockchain)
else:
self.is_ready = False
return
if self.has_length(self.block_size):
self.set_header()
self.tx_count = varint(self.blockchain)
self.txs = []
else:
self.is_ready = False
self.tx_pos = self.blockchain.tell()
# Parse all the TX in the block here.
for i in range(0, self.tx_count):
tx = Tx(self.blockchain)
self.txs.append(tx)
def get_block_size(self):
return self.block_size
def is_ready(self):
return self.is_ready;
def has_length(self, size):
cur_pos = self.blockchain.tell()
self.blockchain.seek(0, 2)
total_file_size = self.blockchain.tell()
self.blockchain.seek(cur_pos)
if total_file_size - cur_pos < size:
return False
else:
return True
def set_header(self):
self.block_header = BlockHeader(self.blockchain)
def to_string(self):
sb = []
sb.append("Magic No: \t%8x" % self.magic_num)
sb.append("Block size: \t%d" % self.block_size)
sb.append("### Block Header ###")
sb.extend(self.block_header.to_string())
sb.append("### Tx Count: %d ###" % self.tx_count)
for tx in self.txs:
sb.extend(tx.to_string())
sb.append("###### End of all %d transactins #######" % self.tx_count)
sb.append('')
return sb
class Tx:
def __init__(self, blockchain):
start_pos = blockchain.tell()
self.version = uint4(blockchain)
check_pos = blockchain.tell()
# Segwit - https://en.bitcoin.it/wiki/Transaction
# BIP141 - https://github.com/bitcoin/bips/blob/master/bip-0141.mediawiki#specification
tx_in_pos = blockchain.tell()
marker = uint1(blockchain)
flag = uint1(blockchain)
# The data structure is a little bit different after the segwit.
is_segwit = False
if marker != 0 or flag != 1:
blockchain.seek(check_pos)
else:
is_segwit = True
tx_in_pos = blockchain.tell()
self.inCount = varint(blockchain)
self.inputs = []
self.seq = 1
for i in range(0, self.inCount):
self.inputs.append(TxInput(blockchain, i))
self.outCount = varint(blockchain)
self.outputs = []
if self.outCount > 0:
for i in range(0, self.outCount):
self.outputs.append(TxOutput(blockchain, i))
segwit_pos = blockchain.tell()
# For segwit
if is_segwit:
for i in range(0, self.inCount):
num_op = varint(blockchain)
for n in range(0, num_op):
op_code = varint(blockchain)
_ = hashStr(blockchain.read(op_code))
raw_lock_time = blockchain.read(4)
self.lock_time = struct.unpack('I', raw_lock_time)[0]
cur_pos = blockchain.tell()
blockchain.seek(start_pos)
if is_segwit:
raw_version = blockchain.read(4)
blockchain.read(2)
raw_in_out = blockchain.read(segwit_pos - tx_in_pos)
self.raw_bytes = raw_version + raw_in_out + raw_lock_time
else:
self.raw_bytes = blockchain.read(cur_pos - start_pos)
blockchain.seek(cur_pos)
self.tx_hash = hash_tx(self.raw_bytes)
def to_string(self):
sb = []
sb.append("=" * 20 + " No. %s " % self.seq + "Transaction " + "=" * 20)
sb.append("=%s=" % self.tx_hash)
sb.append("Tx Version: %d" % self.version)
sb.append("Inputs: %d" % self.inCount)
for i in self.inputs:
sb.extend(i.to_string())
sb.append("Outputs: %d" % self.outCount)
for o in self.outputs:
sb.extend(o.to_string())
sb.append("Lock Time: %d" % self.lock_time)
return sb
class TxInput:
def __init__(self, blockchain, idx):
self.idx = idx
self.prev_hash = hashStr(hash32(blockchain))
self.tx_outId = uint4(blockchain)
self.script_len = varint(blockchain)
self.script_raw = blockchain.read(self.script_len)
self.seqNo = uint4(blockchain)
# coinbase's script is arbitary.
if 0xffffffff == self.tx_outId: # Coinbase
self.segments = hashStr(self.script_raw)
else:
self.segments = segment(self.script_raw)
self.decode_script_sig(self.script_raw)
def to_string(self):
sb = []
sb.append(" Tx Previous has: %s - %d" % (self.prev_hash, self.idx))
sb.append(" Script Length: %d" % self.script_len)
sb.append(" %s" % self.hex_str)
sb.append(" PubKey: %s" % self.pub_key)
sb.append(" Sequence: %8x" % self.seqNo)
return sb
def decode_script_sig(self, script_raw:bytes):
""" Custom logic here. """
sb = []
self.hex_str = hashStr(script_raw)
# segwit
if len(self.hex_str) == 0:
self.pub_key = ""
return
if 0xffffffff == self.tx_outId: # Coinbase
self.pub_key = str(bytes.fromhex(self.hex_str))
return
script_len = int(self.hex_str[0:2], 16)
script_len *= 2
script = self.hex_str[2:2 + script_len]
sb.append(" Script: " + script)
try:
if SIGHASH_ALL != int(self.hex_str[script_len:script_len + 2], 16): # should be 0x01
self.pub_key = ""
else:
self.pub_key = self.hex_str[2 + script_len + 2:2 + script_len + 2 + 66]
except:
self.pub_key = ""
def decode_out_idx(self, idx):
sb = []
if idx == 0xffffffff:
sb.append(" [Coinbase] Text: %s" % self.prev_hash)
else:
sb.append(" Prev. Tx Hash: %s" % self.prev_hash)
return "%8x" % idx, sb
class TxOutput:
""" Handle specific output in the transaction. """
def __init__(self, blockchain, idx):
self.idx = idx
self.value = uint8(blockchain)
self.script_len = varint(blockchain)
self.script_raw = blockchain.read(self.script_len)
self.addr = "UNKNOWN"
self.segments = segment(self.script_raw)
self.decode_script_sig(self.script_raw)
def to_string(self):
sb = []
sb.append("Value: %d" % self.value + " Satoshi")
sb.append("Script Len: %d" % self.script_len)
sb.append("ScriptPubkey: %s" % self.addr)
sb.append("Addr: %s" % self.addr)
return sb
def decode_script_sig(self, script_raw:bytes):
''' Custom analysis logic here. '''
hexstr = hashStr(script_raw)
self.hex_str = hexstr
# Get the first two bytes.
# which might some problem.
# https://www.blockchain.com/btc/tx/7bd54def72825008b4ca0f4aeff13e6be2c5fe0f23430629a9d484a1ac2a29b8
try:
op_idx = int(hexstr[0:2], 16)
except:
self.type = "EXCEPTION"
self.addr = "UNKNOWN"
return
try:
op_code = OPCODE_NAMES[op_idx]
except KeyError:
if op_idx == 65:
self.type = "P2PK"
# Obsoleted pay to pubkey directly
# For detail see: https://en.bitcoin.it/wiki/Script#Obsolete_pay-to-pubkey_transaction
pub_key_len = op_idx
op_code_tail = OPCODE_NAMES[int(hexstr[2 + pub_key_len * 2:2 + pub_key_len * 2 + 2], 16)]
self.pubkey_human = "Pubkey OP_CODE: None Bytes:%s tail_op_code:%s %d" % (
pub_key_len, op_code_tail, op_idx)
self.addr = pubkey_to_address(hexstr[2:2 + pub_key_len * 2])[0]
else:
# Some times people will push data directly
# e.g: https://www.blockchain.com/btc/tx/d65bb24f6289dad27f0f7e75e80e187d9b189a82dcf5a86fb1c6f8ff2b2c190f
self.type = "UN"
pub_key_len = op_idx
self.pubkey_human = "PUSH_DATA:%s" % hexstr[2:2 + pub_key_len * 2]
self.addr = "UNKNOWN"
return
try:
if op_code == "OP_DUP":
self.type = "P2PKHA"
# P2PKHA pay to pubkey hash mode
# For detail see: https://en.bitcoin.it/wiki/Script#Standard_Transaction_to_Bitcoin_address_.28pay-to-pubkey-hash.29
op_code2 = OPCODE_NAMES[int(hexstr[2:4], 16)]
pub_key_len = int(hexstr[4:6], 16)
op_code_tail2 = OPCODE_NAMES[int(hexstr[6 + pub_key_len * 2:6 + pub_key_len * 2 + 2], 16)]
op_code_tail_last = OPCODE_NAMES[int(hexstr[6 + pub_key_len * 2 + 2:6 + pub_key_len * 2 + 4], 16)]
self.pubkey_human = "%s %s %s %s %s" % (
op_code, op_code2, hexstr[6:6 + pub_key_len * 2], op_code_tail2, op_code_tail_last)
self.addr = gen_addr(hexstr[6:6 + pub_key_len * 2])[0]
elif op_code == "OP_HASH160":
self.type = "P2SH"
# P2SHA pay to script hash
# https://en.bitcoin.it/wiki/Transaction#Pay-to-Script-Hash
pub_key_len = int(hexstr[2:4], 16)
op_code_tail = OPCODE_NAMES[int(hexstr[4 + pub_key_len * 2:4 + pub_key_len * 2 + 2], 16)]
hash_code = hexstr[4:4 + pub_key_len * 2]
self.pubkey_human = "%s %s %s" % (op_code, hash_code, op_code_tail)
self.addr = hash_code
elif op_code == "OP_RETURN":
self.type = "OP_RETURN"
pub_key_len = int(hexstr[2:4], 16)
hash_code = hexstr[4:4 + pub_key_len * 2]
self.pubkey_human = "OP_RETURN %s" % (hash_code)
self.addr = hash_code
else: # TODO extend for multi-signature parsing
self.type = "UN"
self.pubkey_human = "Need to extend multi-signaturer parsing %x" % int(hexstr[0:2], 16) + op_code
self.addr = "UNKNOWN"
except:
self.type = "ERROR"
self.addr = "UNKNOWN"