-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_gateway_v01.py
More file actions
108 lines (87 loc) · 3.97 KB
/
test_gateway_v01.py
File metadata and controls
108 lines (87 loc) · 3.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import audit_logger
from constants import ErrorCode, EventType
from models import RequestState, ToolCallRequest,DecisionType
from security_gateway import handle_request
def setup_function():
"""每个测试前重置审计日志"""
audit_logger.reset_events()
def test_deny_non_whitelisted_tool():
"""阶段6:测试非白名单工具调用被拒绝"""
request = ToolCallRequest(
tool_name="delete_file",
args={"path": "/safe/demo.txt"},
)
result = handle_request(request)
events = audit_logger.list_events_by_request_id(result.request_id)
assert result.state == RequestState.DENIED
assert result.decision is not None
assert result.decision.reason == "tool_not_allowed"
assert any(event.event_type == EventType.POLICY_DECIDED.value for event in events)
def test_deny_path_outside_safe_dir():
"""阶段6:允许工具但危险参数被拒绝"""
request =ToolCallRequest(
tool_name="read_file",
args={"path": "/etc/passwd"},
)
result = handle_request(request)
events = audit_logger.list_events_by_request_id(result.request_id)
assert result.state == RequestState.DENIED
assert result.decision is not None
assert result.decision.reason == "path_outside_safe_dir"
assert any(event.event_type == EventType.POLICY_DECIDED.value for event in events)
def test_allow_read_file_in_safe_dir():
"""阶段7:合法请求允许并执行成功"""
request = ToolCallRequest(
tool_name="read_file",
args={"path": "/safe/demo.txt"},
)
result = handle_request(request)
events = audit_logger.list_events_by_request_id(result.request_id)
assert result.state == RequestState.SUCCEEDED
assert result.decision is not None
assert result.decision.decision == DecisionType.ALLOW
assert result.execution_result is not None
assert result.execution_result.success is True
assert result.execution_result.output == "mock-read:/safe/demo.txt"
assert any(event.event_type == EventType.EXECUTION_STARTED.value for event in events)
assert any(event.event_type == EventType.EXECUTION_FINISHED.value for event in events)
def test_failed_on_normalization_error():
"""阶段8:畸形请求进入FAILED状态"""
request = ToolCallRequest(
tool_name="read_file",
args="not-a-dict", # type:错误的参数类型
)
result = handle_request(request)
events = audit_logger.list_events_by_request_id(result.request_id)
assert result.state == RequestState.FAILED
assert result.error is not None
assert result.error.code == "NORMALIZATION_ERROR"
assert any(event.event_type == EventType.REQUEST_FAILED.value for event in events)
def test_failed_on_execution_error():
"""阶段8:执行失败的请求进入FAILED状态"""
request = ToolCallRequest(
tool_name="read_file",
args={"path": "/safe/raise.txt"},
)
result = handle_request(request)
events = audit_logger.list_events_by_request_id(result.request_id)
assert result.state == RequestState.FAILED
assert result.decision is not None
assert result.decision.decision == DecisionType.ALLOW
assert result.execution_result is not None
assert result.execution_result.success is False
assert result.error is not None
assert result.error.code == ErrorCode.READ_FILE_EXEC_ERROR.value
assert any(event.event_type == EventType.EXECUTION_FAILED.value for event in events)
def test_deny_private_http_fetch():
"""阶段9:测试http_fetch访问私有地址被拒绝"""
request = ToolCallRequest(
tool_name="http_fetch",
args={"url": "http://127.0.0.1/admin"},
)
result = handle_request(request)
events = audit_logger.list_events_by_request_id(result.request_id)
assert result.state == RequestState.DENIED
assert result.decision is not None
assert result.decision.reason == "private_or_loopback_target"
assert any(event.event_type == EventType.REQUEST_DENIED.value for event in events)