-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwallet_binding.py
More file actions
285 lines (230 loc) · 8.91 KB
/
wallet_binding.py
File metadata and controls
285 lines (230 loc) · 8.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
from __future__ import annotations
import base64
import inspect
import json
import os
import re
import urllib.request
from typing import Any, Dict, List, Mapping, Optional, Tuple
from urllib.parse import urlparse
from eth_account import Account
from eth_account.messages import encode_defunct
from eth_utils import keccak
SCHEMA = "ethbind"
VERSION = 2
MESSAGE_TEMPLATE = (
f"{SCHEMA} v{VERSION} - Ethereum wallet binding\n"
"hotkey_ss58: {hotkey_ss58}\n"
"eth_address: {eth_address}\n"
"nonce: {nonce}\n"
"issued_at_unix: {issued_at}\n"
)
R2_ALLOWED_HOST_PATTERNS = [
re.compile(r"^.+\.r2\.dev$"),
re.compile(r"^.+\.r2\.cloudflarestorage\.com$"),
]
def canonical_json_bytes(obj: Any) -> bytes:
"""Deterministic JSON serialization for hashing."""
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=True).encode("utf-8")
def b64url_no_pad(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
def keccak256_b64(obj: Any) -> str:
"""Keccak-256 of the canonical JSON, returned as base64url (43 chars)."""
digest = keccak(canonical_json_bytes(obj))
return b64url_no_pad(digest)
def make_binding_message(*, hotkey_ss58: str, eth_address: str, nonce: str, issued_at: int) -> str:
"""Create the plaintext message that an ETH wallet owner signs to bind to a hotkey."""
return MESSAGE_TEMPLATE.format(
hotkey_ss58=hotkey_ss58,
eth_address=eth_address.lower(),
nonce=nonce,
issued_at=int(issued_at),
)
def recover_eth_address(message: str, signature_hex: str) -> str:
"""Recover the ETH address that produced a personal_sign signature."""
if not signature_hex.startswith("0x"):
signature_hex = "0x" + signature_hex
return Account.recover_message(encode_defunct(text=message), signature=signature_hex)
def sign_binding_message(private_key: str, message: str) -> str:
"""Sign a binding message with an ETH private key. Returns 0x-prefixed signature hex."""
sig = Account.sign_message(encode_defunct(text=message), private_key=private_key)
raw = sig.signature.hex()
if raw.startswith("0x"):
return raw
return "0x" + raw
def sign_and_build_bundle(
*,
hotkey_ss58: str,
private_keys: List[str],
nonce: Optional[str] = None,
issued_at: Optional[int] = None,
) -> Dict[str, Any]:
"""
One-shot: derive addresses from private keys, sign binding messages, build bundle.
The bundle is verified internally before returning.
"""
if nonce is None:
nonce = os.urandom(12).hex()
if issued_at is None:
import time
issued_at = int(time.time())
eth_address_to_signature: Dict[str, str] = {}
for pk in private_keys:
if not pk.startswith("0x"):
pk = "0x" + pk
acct = Account.from_key(pk)
eth_addr = acct.address.lower()
msg = make_binding_message(
hotkey_ss58=hotkey_ss58,
eth_address=eth_addr,
nonce=nonce,
issued_at=issued_at,
)
sig_hex = sign_binding_message(pk, msg)
eth_address_to_signature[eth_addr] = sig_hex
bundle = build_bundle(
hotkey_ss58=hotkey_ss58,
nonce=nonce,
issued_at=issued_at,
eth_address_to_signature=eth_address_to_signature,
)
ok, errors = verify_bundle(bundle, expected_hotkey_ss58=hotkey_ss58)
if not ok:
raise ValueError(f"Bundle self-verification failed: {errors}")
return bundle
def load_private_keys_from_env(env_path: str) -> List[str]:
"""
Load ETH private keys from a .env file.
Supported formats:
ETH_PRIVATE_KEY=0xsingle_key
ETH_PRIVATE_KEY_1=0xfirst_key
ETH_PRIVATE_KEY_2=0xsecond_key
"""
keys: List[str] = []
with open(env_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
name, _, value = line.partition("=")
name = name.strip()
value = value.strip().strip("'\"")
if name == "ETH_PRIVATE_KEY" or name.startswith("ETH_PRIVATE_KEY_"):
if value:
keys.append(value)
return keys
def build_bundle(
*,
hotkey_ss58: str,
nonce: str,
issued_at: int,
eth_address_to_signature: Mapping[str, str],
) -> Dict[str, Any]:
"""Build a bundle containing one or more ETH wallet bindings for a hotkey."""
wallets = []
for addr, sig in eth_address_to_signature.items():
wallets.append({
"eth_address": addr.lower(),
"signature_hex": sig.lower() if sig.startswith("0x") else ("0x" + sig.lower()),
})
return {
"schema": SCHEMA,
"version": VERSION,
"hotkey_ss58": hotkey_ss58,
"nonce": nonce,
"issued_at": int(issued_at),
"wallets": wallets,
}
def verify_bundle(bundle: Dict[str, Any], *, expected_hotkey_ss58: str) -> Tuple[bool, List[str]]:
"""Verify every wallet signature in a bundle."""
errors: List[str] = []
if bundle.get("schema") != SCHEMA:
errors.append("schema mismatch")
if int(bundle.get("version", -1)) != VERSION:
errors.append("version mismatch")
if bundle.get("hotkey_ss58") != expected_hotkey_ss58:
errors.append("hotkey mismatch (bundle does not reference expected hotkey)")
nonce = bundle.get("nonce")
issued_at = int(bundle.get("issued_at", 0))
for w in bundle.get("wallets", []):
addr = w.get("eth_address", "").lower()
sig = w.get("signature_hex", "")
msg = make_binding_message(
hotkey_ss58=expected_hotkey_ss58,
eth_address=addr,
nonce=nonce,
issued_at=issued_at,
)
recovered = recover_eth_address(msg, sig).lower()
if recovered != addr:
errors.append(f"bad signature for {addr} (recovered {recovered})")
return (len(errors) == 0, errors)
def is_r2_host(hostname: str) -> bool:
"""Check if a hostname matches Cloudflare R2 patterns."""
for pattern in R2_ALLOWED_HOST_PATTERNS:
if pattern.match(hostname):
return True
return False
def validate_commit_url(url: str, expected_hotkey: str) -> Tuple[bool, List[str]]:
"""Validate that a commit URL is HTTPS, on R2, and path matches the hotkey."""
errors: List[str] = []
parsed = urlparse(url)
if parsed.scheme != "https":
errors.append(f"URL must be HTTPS (got {parsed.scheme})")
if not parsed.hostname or not is_r2_host(parsed.hostname):
errors.append(f"URL must be on Cloudflare R2 (*.r2.dev or *.r2.cloudflarestorage.com), got {parsed.hostname}")
path = parsed.path.strip("/")
if "/" in path:
errors.append(f"URL path must be a single segment /<hotkey>, got {parsed.path}")
if path != expected_hotkey:
errors.append(f"URL path must match hotkey (expected /{expected_hotkey}, got /{path})")
return (len(errors) == 0, errors)
def make_commit_url(r2_public_base: str, hotkey: str) -> str:
"""Build the public URL for a binding bundle: <base>/<hotkey>"""
return f"{r2_public_base.rstrip('/')}/{hotkey}"
def upload_bundle_to_r2(
*,
account_id: str,
access_key_id: str,
secret_key: str,
bucket: str,
hotkey: str,
bundle: Dict[str, Any],
) -> None:
"""Upload a binding bundle to Cloudflare R2. Object key is the hotkey."""
import boto3
from botocore.config import Config
endpoint = f"https://{account_id}.r2.cloudflarestorage.com"
s3 = boto3.client(
"s3",
endpoint_url=endpoint,
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_key,
region_name="auto",
config=Config(signature_version="s3v4", s3={"addressing_style": "path"}),
)
s3.put_object(
Bucket=bucket,
Key=hotkey,
Body=canonical_json_bytes(bundle),
ContentType="application/json",
)
def download_bundle(url: str, *, timeout_sec: int = 20) -> Dict[str, Any]:
"""Download a binding bundle from any public HTTPS URL."""
req = urllib.request.Request(url, headers={"User-Agent": "ethbind-verifier/1.0"})
with urllib.request.urlopen(req, timeout=timeout_sec) as resp:
raw = resp.read()
return json.loads(raw.decode("utf-8"))
async def _maybe_await(x: Any) -> Any:
return await x if inspect.isawaitable(x) else x
async def set_chain_commitment(subtensor: Any, *, wallet: Any, netuid: int, data: str, **kwargs: Any) -> Any:
"""Write a commitment (the public URL) to chain via subtensor."""
if hasattr(subtensor, "set_commitment"):
fn = subtensor.set_commitment
elif hasattr(subtensor, "commit"):
fn = subtensor.commit
else:
raise AttributeError("Subtensor has neither set_commitment nor commit")
return await _maybe_await(fn(wallet=wallet, netuid=int(netuid), data=data, **kwargs))