1
+ #!/usr/bin/env python
2
+ # -*- coding: utf-8 -*-
3
+ """
4
+ Time : 2024/10/12 下午2:13
5
+ Author : xuzh
6
+ Project : hemera_indexer
7
+ """
8
+ import logging
9
+ from typing import Any , Dict , List , Optional , Sequence , Tuple , Union , cast
10
+
11
+ import eth_abi
12
+ from ens .utils import get_abi_output_types
13
+ from eth_abi import abi
14
+ from eth_abi .codec import ABICodec
15
+ from eth_typing import HexStr
16
+ from eth_utils import encode_hex , to_hex
17
+ from hexbytes import HexBytes
18
+ from web3 ._utils .abi import (
19
+ exclude_indexed_event_inputs ,
20
+ get_abi_input_types ,
21
+ get_indexed_event_inputs ,
22
+ map_abi_data ,
23
+ named_tree ,
24
+ )
25
+ from web3 ._utils .contracts import decode_transaction_data
26
+ from web3 ._utils .normalizers import BASE_RETURN_NORMALIZERS
27
+ from web3 .types import ABIEvent , ABIFunction
28
+
29
+ from common .utils .format_utils import bytes_to_hex_str , convert_bytes_to_hex , convert_dict , hex_str_to_bytes
30
+ from indexer .utils .abi import (
31
+ abi_address_to_hex ,
32
+ abi_bytes_to_bytes ,
33
+ abi_string_to_text ,
34
+ codec ,
35
+ event_log_abi_to_topic ,
36
+ function_abi_to_4byte_selector_str ,
37
+ get_types_from_abi_type_list ,
38
+ )
39
+
40
+ abi_codec = ABICodec (eth_abi .registry .registry )
41
+
42
+
43
+ class Event :
44
+ def __init__ (self , event_abi : ABIEvent ):
45
+ self ._event_abi = event_abi
46
+ self ._signature = event_log_abi_to_topic (event_abi )
47
+
48
+ def get_abi (self ) -> ABIEvent :
49
+ return self ._event_abi
50
+
51
+ def get_signature (self ) -> str :
52
+ return self ._signature
53
+
54
+ def decode_log (self , log ) -> Optional [Dict [str , Any ]]:
55
+ return decode_log (self ._event_abi , log )
56
+
57
+ def decode_log_ignore_indexed (self , log ) -> Optional [Dict [str , Any ]]:
58
+ return decode_log_ignore_indexed (self ._event_abi , log )
59
+
60
+
61
+ def decode_log_ignore_indexed (
62
+ fn_abi : ABIEvent ,
63
+ log ,
64
+ ) -> Optional [Dict [str , Any ]]:
65
+ from indexer .domain .log import Log
66
+
67
+ if not isinstance (log , Log ):
68
+ raise ValueError (f"log: { log } is not a Log instance" )
69
+
70
+ data_types = get_indexed_event_inputs (fn_abi ) + exclude_indexed_event_inputs (fn_abi )
71
+ decoded_data = decode_data ([t ["type" ] for t in data_types ], log .get_topic_with_data ())
72
+ data = named_tree (data_types , decoded_data )
73
+ return data
74
+
75
+
76
+ def decode_log (
77
+ fn_abi : ABIEvent ,
78
+ log ,
79
+ ) -> Optional [Dict [str , Any ]]:
80
+ from indexer .domain .log import Log
81
+
82
+ if not isinstance (log , Log ):
83
+ raise ValueError (f"log: { log } is not a Log instance" )
84
+
85
+ try :
86
+ indexed_types = get_indexed_event_inputs (fn_abi )
87
+ for indexed_type in indexed_types :
88
+ if indexed_type ["type" ] == "string" :
89
+ indexed_type ["type" ] = "bytes32"
90
+
91
+ data_types = exclude_indexed_event_inputs (fn_abi )
92
+
93
+ decode_indexed = decode_data (get_types_from_abi_type_list (indexed_types ), log .get_bytes_topics ())
94
+ indexed = named_tree (indexed_types , decode_indexed )
95
+
96
+ decoded_data = decode_data (get_types_from_abi_type_list (data_types ), log .get_bytes_data ())
97
+ data = named_tree (data_types , decoded_data )
98
+ except Exception as e :
99
+ logging .warning (f"Failed to decode log: { e } , log: { log } " )
100
+ return None
101
+
102
+ return {** indexed , ** data }
103
+
104
+
105
+ class Function :
106
+ def __init__ (self , function_abi : ABIFunction ):
107
+ self ._function_abi = function_abi
108
+ self ._signature = function_abi_to_4byte_selector_str (function_abi )
109
+ self ._inputs_type = get_abi_input_types (function_abi )
110
+ self ._outputs_type = get_abi_output_types (function_abi )
111
+
112
+ def get_abi (self ) -> ABIFunction :
113
+ return self ._function_abi
114
+
115
+ def get_signature (self ) -> str :
116
+ return self ._signature
117
+
118
+ def get_inputs_type (self ) -> List [str ]:
119
+ return self ._inputs_type
120
+
121
+ def get_outputs_type (self ) -> List [str ]:
122
+ return self ._outputs_type
123
+
124
+ def decode_data (self , data : str ) -> Optional [Dict [str , Any ]]:
125
+ try :
126
+ decoded = decode_data (self ._inputs_type , hex_str_to_bytes (data )[4 :])
127
+ decoded = named_tree (self ._function_abi ["inputs" ], decoded )
128
+ return decoded
129
+ except Exception as e :
130
+ logging .warning (f"Failed to decode transaction input data: { e } , input data: { data } " )
131
+ return None
132
+
133
+
134
+ def decode_transaction_data (
135
+ fn_abi : ABIFunction ,
136
+ data : str ,
137
+ ) -> Optional [Dict [str , Any ]]:
138
+ try :
139
+ types = get_abi_input_types (fn_abi )
140
+ decoded = decode_data (types , hex_str_to_bytes (data [4 :]))
141
+ decoded = named_tree (fn_abi ["inputs" ], decoded )
142
+ return decoded
143
+ except Exception as e :
144
+ logging .warning (f"Failed to decode transaction input data: { e } , input data: { data } " )
145
+ return None
146
+
147
+
148
+ def decode_data (decode_type : Union [Sequence [str ], List [str ], str ], data : bytes ) -> Tuple [Any , ...]:
149
+ if isinstance (decode_type , str ):
150
+ data = abi_codec .decode ([decode_type ], data )
151
+ elif isinstance (decode_type , list ):
152
+ for tpe in decode_type :
153
+ if not isinstance (tpe , str ):
154
+ raise ValueError (f"Invalid decode_type: { decode_type } is not a List[str]" )
155
+ try :
156
+ data = abi_codec .decode (decode_type , data )
157
+ except Exception as e :
158
+ print (f"Failed to decode data: { e } " )
159
+ else :
160
+ raise ValueError (f"Invalid decode_type: { decode_type } , it should be str or list[str]" )
161
+ return data
162
+
163
+
164
+ def encode_data (
165
+ abi : ABIFunction ,
166
+ arguments : Sequence [Any ],
167
+ data : str = None ,
168
+ ) -> HexStr :
169
+ argument_types = get_abi_input_types (abi )
170
+
171
+ normalizers = [
172
+ abi_address_to_hex ,
173
+ abi_bytes_to_bytes ,
174
+ abi_string_to_text ,
175
+ ]
176
+
177
+ normalized_arguments = map_abi_data (
178
+ normalizers ,
179
+ argument_types ,
180
+ arguments ,
181
+ )
182
+ encoded_arguments = codec .encode (
183
+ argument_types ,
184
+ normalized_arguments ,
185
+ )
186
+ if data :
187
+ return to_hex (HexBytes (data ) + encoded_arguments )
188
+ else :
189
+ return encode_hex (encoded_arguments )
190
+
191
+
192
+ def decode_log_data (types , data_str ):
193
+ data_hex_str = hex_str_to_bytes (data_str )
194
+ decoded_abi = decode_data (types , data_hex_str )
195
+
196
+ encoded_abi = []
197
+ decoded_abi_real = []
198
+ for index in range (len (types )):
199
+ encoded_abi .append (bytes_to_hex_str (abi .encode (types [index : index + 1 ], decoded_abi [index : index + 1 ])))
200
+
201
+ if types [index ].startswith ("byte" ):
202
+ if type (decoded_abi [index ]) is tuple :
203
+ encode_tuple = []
204
+ for element in decoded_abi [index ]:
205
+ encode_tuple .append (bytes_to_hex_str (element ))
206
+ decoded_abi_real .append (encode_tuple )
207
+ else :
208
+ decoded_abi_real .append (bytes_to_hex_str (decoded_abi [index ]))
209
+ else :
210
+ decoded_abi_real .append (str (decoded_abi [index ]))
211
+
212
+ return decoded_abi_real , encoded_abi
213
+
214
+
215
+ def decode_function (function_abi_json , data_str , output_str ):
216
+ if data_str is not None and len (data_str ) > 0 :
217
+ input = decode_transaction_data (
218
+ cast (ABIFunction , function_abi_json ),
219
+ data_str ,
220
+ normalizers = BASE_RETURN_NORMALIZERS ,
221
+ )
222
+ input = convert_dict (convert_bytes_to_hex (input ))
223
+ else :
224
+ input = []
225
+
226
+ if output_str is not None and len (output_str ) > 0 :
227
+ types = get_abi_output_types (cast (ABIFunction , function_abi_json ))
228
+ data = hex_str_to_bytes (output_str )
229
+ value = decode_data (types , data )
230
+ output = named_tree (function_abi_json ["outputs" ], value )
231
+ output = convert_dict (convert_bytes_to_hex (output ))
232
+ else :
233
+ output = []
234
+ return input , output
0 commit comments