-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpolicy_engine.py
More file actions
99 lines (81 loc) · 3.23 KB
/
policy_engine.py
File metadata and controls
99 lines (81 loc) · 3.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from urllib.parse import urlparse
import ipaddress
from constants import RuleName
from models import AuthorizeRequest, PolicyDecision, DecisionType
#阶段7只让“会执行的工具”进入 allow
ALLOWED_TOOLS = {"read_file", "http_fetch"}
SAFE_READ_PREFIX = "/safe/"
ALLOWED_HTTP_DOMAINS = {"example.com", "api.example.com"}
def _is_private_or_loopback_host(hostname: str | None) -> bool:
if not hostname:
return True
lowered = hostname.lower()
if lowered in {"localhost"}:
return True
try:
ip = ipaddress.ip_address(lowered)
return ip.is_private or ip.is_loopback or ip.is_link_local
except ValueError:
return False
def evaluate(request: AuthorizeRequest) -> PolicyDecision:
if request.tool_name not in ALLOWED_TOOLS:
return PolicyDecision(
request_id=request.request_id,
decision=DecisionType.DENY,
reason="tool_not_allowed",
match_rules=[RuleName.TOOL_WHITELIST.value],
)
if request.tool_name == "read_file":
path = request.normalized_args.get("path")
if not isinstance(path, str):
return PolicyDecision(
request_id=request.request_id,
decision=DecisionType.DENY,
reason="invalid_path_type",
match_rules=[RuleName.READ_FILE_PATH_TYPE.value],
)
if not path.startswith(SAFE_READ_PREFIX):
return PolicyDecision(
request_id=request.request_id,
decision=DecisionType.DENY,
reason="path_outside_safe_dir",
match_rules=[RuleName.READ_FILE_PATH_PREFIX.value],
)
if request.tool_name == "http_fetch":
url = request.normalized_args.get("url")
if not isinstance(url, str):
return PolicyDecision(
request_id=request.request_id,
decision=DecisionType.DENY,
reason="invalid_url_type",
match_rules=[RuleName.HTTP_FETCH_URL_TYPE.value],
)
parsed = urlparse(url)
hostname = parsed.hostname
if parsed.scheme not in {"http", "https"}:
return PolicyDecision(
request_id=request.request_id,
decision=DecisionType.DENY,
reason="invalid_url_scheme",
match_rules=[RuleName.HTTP_FETCH_SCHEME.value],
)
if _is_private_or_loopback_host(hostname):
return PolicyDecision(
request_id=request.request_id,
decision=DecisionType.DENY,
reason="private_or_loopback_target",
match_rules=[RuleName.HTTP_FETCH_PRIVATE_TARGET.value]
)
if hostname not in ALLOWED_HTTP_DOMAINS:
return PolicyDecision(
request_id=request.request_id,
decision=DecisionType.DENY,
reason="domain_not_allowed",
match_rules=[RuleName.HTTP_FETCH_DOMAIN_WHITELIST.value],
)
return PolicyDecision(
request_id=request.request_id,
decision=DecisionType.ALLOW,
reason="allowed",
match_rules=[RuleName.BASE_ALLOW.value],
)