-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
142 lines (118 loc) · 5.74 KB
/
auth.py
File metadata and controls
142 lines (118 loc) · 5.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
Shared-secret HMAC challenge-response handshake.
Why this module exists
----------------------
The original server let anyone who could reach the TCP port send commands.
For a purely local ``127.0.0.1`` lab that is fine, but the moment a student
binds the server to a non-loopback interface, anyone on the network can
connect and shell-exec on the client machine.
This module adds a minimum authentication layer built out of stdlib only:
1. Operator and client share a secret (passphrase, random bytes, ...).
2. When a new connection arrives, the server generates a 16-byte random
``challenge`` and sends it to the client.
3. The client computes ``HMAC-SHA256(secret, challenge)`` and sends the
digest back.
4. The server computes the same HMAC and compares with the client's reply
using ``hmac.compare_digest``, which is timing-safe.
5. If the digests match, the server sends ``AUTH_OK`` and the connection is
allowed to proceed. Otherwise it sends ``AUTH_FAIL`` and closes.
What this protocol teaches
--------------------------
- **HMAC semantics**: why we use ``HMAC(secret, challenge)`` instead of
sending the secret directly or using a naive ``H(secret || challenge)``
(the naive hash-then-concatenate construction is vulnerable to
length-extension attacks; HMAC is not).
- **Challenge freshness**: using ``secrets.token_bytes(16)`` on every
connection means the wire protocol never repeats, so a captured exchange
cannot be replayed on a new connection.
- **Timing-safe comparison**: Python's ``==`` on bytes short-circuits at
the first differing byte, so comparison time reveals how many leading
bytes of the attacker's guess match the real digest. An attacker who can
send many guesses and measure microsecond differences can recover the
correct digest one byte at a time. ``hmac.compare_digest`` always
inspects every byte in constant time, removing this side channel.
- **Defence in depth**: even with TLS (see ``tls_utils.py``), adding an
application-layer auth step protects against misconfigured or disabled
TLS.
What this protocol does NOT teach
---------------------------------
Real production auth uses mutual TLS with certificate pinning or a proper
key-exchange protocol (Noise, TLS 1.3 with client certs, SSH keys). This
module is a deliberate simplification so the full handshake fits in
roughly 75 lines of substantive Python code.
For production-grade inspiration check:
- The Noise Protocol Framework ``https://noiseprotocol.org/``
- OpenSSH's key-exchange (``ssh -vvv``)
- The TLS 1.3 handshake RFC 8446
"""
from __future__ import annotations
import hmac
import secrets
from hashlib import sha256
from protocol import FrameType, ProtocolError, SocketLike, recv_frame, send_frame
#: Size in bytes of the random challenge the server issues. 16 bytes == 128
#: bits of entropy, which is overkill for replay-resistance and cheap to
#: send.
CHALLENGE_SIZE = 16
#: HMAC digest size for SHA-256, cached so we never hardcode 32.
HMAC_SIZE = sha256().digest_size # == 32
def compute_response(secret: bytes, challenge: bytes) -> bytes:
"""
Return the HMAC-SHA256 of ``challenge`` under ``secret``.
Both inputs are raw bytes. Callers that have a passphrase should
encode it to UTF-8 first. The return value is a 32-byte digest.
"""
return hmac.new(secret, challenge, sha256).digest()
def server_handshake(sock: SocketLike, secret: bytes) -> None:
"""
Run the server side of the handshake.
Sends an ``AUTH_CHALLENGE``, reads the client's ``AUTH_RESPONSE``, and
either sends ``AUTH_OK`` on success or ``AUTH_FAIL`` and raises
``ProtocolError`` on failure. The caller should treat the exception as
fatal for that connection.
"""
challenge = secrets.token_bytes(CHALLENGE_SIZE)
send_frame(sock, FrameType.AUTH_CHALLENGE, challenge)
reply = recv_frame(sock)
if reply.frame_type is not FrameType.AUTH_RESPONSE:
send_frame(sock, FrameType.AUTH_FAIL, b"expected AUTH_RESPONSE")
raise ProtocolError(
f"auth: expected AUTH_RESPONSE, got {reply.frame_type.name}"
)
if len(reply.payload) != HMAC_SIZE:
send_frame(sock, FrameType.AUTH_FAIL, b"bad response length")
raise ProtocolError(
f"auth: response length {len(reply.payload)} != {HMAC_SIZE}"
)
expected = compute_response(secret, challenge)
if not hmac.compare_digest(expected, reply.payload):
send_frame(sock, FrameType.AUTH_FAIL, b"digest mismatch")
raise ProtocolError("auth: digest mismatch")
send_frame(sock, FrameType.AUTH_OK, b"")
def client_handshake(sock: SocketLike, secret: bytes) -> None:
"""
Run the client side of the handshake.
Reads the server's ``AUTH_CHALLENGE``, sends the HMAC response, and
waits for ``AUTH_OK``. Raises ``ProtocolError`` on any deviation. The
caller should treat the exception as fatal for that connection.
"""
challenge_frame = recv_frame(sock)
if challenge_frame.frame_type is not FrameType.AUTH_CHALLENGE:
raise ProtocolError(
f"auth: expected AUTH_CHALLENGE, got {challenge_frame.frame_type.name}"
)
if len(challenge_frame.payload) != CHALLENGE_SIZE:
raise ProtocolError(
f"auth: challenge length {len(challenge_frame.payload)} != {CHALLENGE_SIZE}"
)
response = compute_response(secret, challenge_frame.payload)
send_frame(sock, FrameType.AUTH_RESPONSE, response)
verdict = recv_frame(sock)
if verdict.frame_type is FrameType.AUTH_OK:
return
if verdict.frame_type is FrameType.AUTH_FAIL:
reason = verdict.payload.decode("utf-8", errors="replace")
raise ProtocolError(f"auth rejected by server: {reason}")
raise ProtocolError(
f"auth: unexpected verdict frame {verdict.frame_type.name}"
)