forked from nessshon/tonapi
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreaming_websocket.py
More file actions
74 lines (57 loc) · 2.38 KB
/
streaming_websocket.py
File metadata and controls
74 lines (57 loc) · 2.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import asyncio
from pytonapi.streaming import TonapiStreaming
from pytonapi.types import Network, Opcode, Workchain
# TONAPI key — get one at https://tonconsole.com/
API_KEY = "YOUR_API_KEY"
# Target network — MAINNET or TESTNET
NETWORK = Network.MAINNET
# Account address to track — raw format
ACCOUNT_ID = "0:408da3b28b6c065a593e10391269baaa9c5f8caebc0c69d9f0aabbab2a99256b"
# Duration for each subscription demo (seconds)
DURATION = 30
def make_stop(seconds: float) -> asyncio.Event:
"""Create a stop event that fires after *seconds*."""
event = asyncio.Event()
async def _set_after() -> None:
await asyncio.sleep(seconds)
event.set()
asyncio.ensure_future(_set_after())
return event
async def main() -> None:
async with TonapiStreaming(API_KEY, NETWORK) as streaming:
# 1. Finalized transactions via WebSocket JSON-RPC
# accounts=None subscribes to ALL accounts
# operations filters by opcode (e.g., Opcode.JETTON_TRANSFER)
print("— Subscribing to transactions...")
async for event in streaming.ws.subscribe_transactions(
accounts=[ACCOUNT_ID],
operations=[Opcode.TEXT_COMMENT],
stop=make_stop(DURATION),
):
print(f"TX: {event.account_id} | {event.tx_hash}")
# 2. New blocks via WebSocket
# workchain=None subscribes to blocks from all workchains
print("— Subscribing to blocks...")
async for block in streaming.ws.subscribe_blocks(
workchain=Workchain.MASTERCHAIN,
stop=make_stop(DURATION),
):
print(f"Block: {block.workchain} | {block.seqno}")
# 3. Completed traces via WebSocket
# accounts=None subscribes to ALL traces
print("— Subscribing to traces...")
async for trace in streaming.ws.subscribe_traces(
accounts=[ACCOUNT_ID],
stop=make_stop(DURATION),
):
print(f"Trace: {trace.hash}")
# 4. Pending messages via WebSocket
# accounts filters by involved accounts after emulation
# accounts=None subscribes to ALL pending messages (high volume!)
print("— Subscribing to mempool...")
async for msg in streaming.ws.subscribe_mempool(
stop=make_stop(DURATION),
):
print(f"Mempool: {msg.boc}")
if __name__ == "__main__":
asyncio.run(main())