|
1 |
| -from typing import Optional, Callable, Any, Awaitable |
2 |
| -from pydantic import Field, BaseModel |
3 |
| -import requests |
4 |
| -import time |
5 |
| - |
6 |
| - |
7 |
| -TRANSLATIONS = { |
8 |
| - "en": { |
9 |
| - "network_request_failed": "Network request failed: {error}", |
10 |
| - "request_failed": "Request failed: [{error_type}] {error_msg}", |
11 |
| - "insufficient_balance": "Insufficient balance: Current balance `{balance:.4f}`", |
12 |
| - "unknown_error": "Unknown error", |
13 |
| - "api_key_invalid": "API key validation failed", |
14 |
| - "cost": "Cost: ${cost:.4f}", |
15 |
| - "balance": "Balance: ${balance:.4f}", |
16 |
| - "tokens": "Tokens: {input}+{output}", |
17 |
| - "time_spent": "Time: {time:.2f}s", |
18 |
| - "tokens_per_sec": "{tokens_per_sec:.2f} T/s" |
19 |
| - }, |
20 |
| - "zh": { |
21 |
| - "network_request_failed": "网络请求失败: {error}", |
22 |
| - "request_failed": "请求失败: [{error_type}] {error_msg}", |
23 |
| - "insufficient_balance": "余额不足: 当前余额 `{balance:.4f}`", |
24 |
| - "unknown_error": "未知错误", |
25 |
| - "api_key_invalid": "API密钥验证失败", |
26 |
| - "cost": "费用: ¥{cost:.4f}", |
27 |
| - "balance": "余额: ¥{balance:.4f}", |
28 |
| - "tokens": "Token: {input}+{output}", |
29 |
| - "time_spent": "耗时: {time:.2f}s", |
30 |
| - "tokens_per_sec": "{tokens_per_sec:.2f} T/s" |
31 |
| - } |
32 |
| -} |
| 1 | +""" |
| 2 | +title: Usage Monitor |
| 3 | +author: VariantConst & OVINC CN |
| 4 | +git_url: https://github.com/VariantConst/OpenWebUI-Monitor.git |
| 5 | +version: 0.3.5 |
| 6 | +requirements: httpx |
| 7 | +license: MIT |
| 8 | +""" |
| 9 | + |
| 10 | +import logging |
| 11 | +from typing import Dict, Optional |
| 12 | + |
| 13 | +from httpx import AsyncClient |
| 14 | +from pydantic import BaseModel, Field |
| 15 | + |
| 16 | +logger = logging.getLogger(__name__) |
| 17 | +logger.setLevel(logging.INFO) |
| 18 | + |
| 19 | + |
| 20 | +class CustomException(Exception): |
| 21 | + pass |
| 22 | + |
33 | 23 |
|
34 | 24 | class Filter:
|
35 | 25 | class Valves(BaseModel):
|
36 |
| - API_ENDPOINT: str = Field( |
37 |
| - default="", description="The base URL for the API endpoint." |
38 |
| - ) |
39 |
| - API_KEY: str = Field(default="", description="API key for authentication.") |
40 |
| - priority: int = Field( |
41 |
| - default=5, description="Priority level for the filter operations." |
42 |
| - ) |
43 |
| - show_cost: bool = Field(default=True, description="Display cost information") |
44 |
| - show_balance: bool = Field( |
45 |
| - default=True, description="Display balance information" |
46 |
| - ) |
47 |
| - show_spend_time: bool = Field(default=True, description="Display spend time") |
48 |
| - show_tokens: bool = Field(default=True, description="Display token usage") |
49 |
| - show_tokens_per_sec: bool = Field( |
50 |
| - default=True, description="Display tokens per second" |
51 |
| - ) |
52 |
| - language: str = Field( |
53 |
| - default="en", |
54 |
| - description="Language for messages (en/zh)" |
55 |
| - ) |
| 26 | + api_endpoint: str = Field(default="", description="openwebui-monitor's base url") |
| 27 | + api_key: str = Field(default="", description="openwebui-monitor's api key") |
| 28 | + priority: int = Field(default=5, description="filter priority") |
56 | 29 |
|
57 | 30 | def __init__(self):
|
58 | 31 | self.type = "filter"
|
59 |
| - self.name = "OpenWebUI Monitor" |
60 | 32 | self.valves = self.Valves()
|
61 |
| - self.outage = False |
62 |
| - self.start_time = None |
63 |
| - self.translations = TRANSLATIONS |
64 |
| - self.inlet_temp = None |
65 |
| - |
66 |
| - def get_text(self, key: str, **kwargs) -> str: |
67 |
| - """获取指定语言的文本""" |
68 |
| - lang = self.valves.language |
69 |
| - if lang not in self.translations: |
70 |
| - lang = "en" |
71 |
| - text = self.translations[lang].get(key, self.translations["en"][key]) |
72 |
| - return text.format(**kwargs) if kwargs else text |
73 |
| - |
74 |
| - def _prepare_user_dict(self, __user__: dict) -> dict: |
75 |
| - """将 __user__ 对象转换为可序列化的字典""" |
76 |
| - user_dict = dict(__user__) |
77 |
| - if "valves" in user_dict and hasattr(user_dict["valves"], "model_dump"): |
78 |
| - user_dict["valves"] = user_dict["valves"].model_dump() |
79 |
| - |
80 |
| - return user_dict |
81 |
| - def _prepare_body_dict(self, body: dict) -> dict: |
82 |
| - """将 body 对象转换为可序列化的字典""" |
83 |
| - body_dict = dict(body) |
84 |
| - if "model" in body_dict["metadata"] and hasattr( |
85 |
| - body_dict["metadata"]["model"], "model_dump" |
86 |
| - ): |
87 |
| - body_dict["metadata"]["model"] = body_dict["metadata"]["model"].model_dump() |
88 |
| - |
89 |
| - return body_dict |
90 |
| - def _modify_outlet_body(self, body: dict) -> dict: |
91 |
| - body_modify = dict(body) |
92 |
| - last_message = body_modify["messages"][-1] |
93 |
| - |
94 |
| - if "info" not in last_message and self.inlet_temp is not None: |
95 |
| - body_modify["messages"][:-1] = self.inlet_temp["messages"] |
96 |
| - return body_modify |
97 |
| - |
98 |
| - def inlet( |
99 |
| - self, body: dict, user: Optional[dict] = None, __user__: dict = {} |
100 |
| - ) -> dict: |
101 |
| - self.start_time = time.time() |
| 33 | + self.outage_map: Dict[str, bool] = {} |
102 | 34 |
|
103 |
| - try: |
104 |
| - post_url = f"{self.valves.API_ENDPOINT}/api/v1/inlet" |
105 |
| - headers = {"Authorization": f"Bearer {self.valves.API_KEY}"} |
106 |
| - |
107 |
| - user_dict = self._prepare_user_dict(__user__) |
108 |
| - body_dict = self._prepare_body_dict(body) |
109 |
| - self.inlet_temp = body_dict |
110 |
| - response = requests.post( |
111 |
| - post_url, headers=headers, json={"user": user_dict, "body": body_dict} |
112 |
| - ) |
| 35 | + async def request(self, client: AsyncClient, url: str, headers: dict, json: dict): |
| 36 | + response = await client.post(url=url, headers=headers, json=json) |
| 37 | + response.raise_for_status() |
| 38 | + response_data = response.json() |
| 39 | + if not response_data.get("success"): |
| 40 | + logger.error("[usage_monitor] req monitor failed: %s", response_data) |
| 41 | + raise CustomException("calculate usage failed, please contact administrator") |
| 42 | + return response_data |
113 | 43 |
|
114 |
| - if response.status_code == 401: |
115 |
| - return body |
| 44 | + async def inlet(self, body: dict, __metadata__: Optional[dict] = None, __user__: Optional[dict] = None) -> dict: |
| 45 | + __user__ = __user__ or {} |
| 46 | + __metadata__ = __metadata__ or {} |
| 47 | + user_id = __user__["id"] |
116 | 48 |
|
117 |
| - response.raise_for_status() |
118 |
| - response_data = response.json() |
| 49 | + client = AsyncClient() |
119 | 50 |
|
120 |
| - if not response_data.get("success"): |
121 |
| - error_msg = response_data.get("error", self.get_text("unknown_error")) |
122 |
| - error_type = response_data.get("error_type", "UNKNOWN_ERROR") |
123 |
| - raise Exception(self.get_text("request_failed", error_type=error_type, error_msg=error_msg)) |
| 51 | + try: |
| 52 | + response_data = await self.request( |
| 53 | + client=client, |
| 54 | + url=f"{self.valves.api_endpoint}/api/v1/inlet", |
| 55 | + headers={"Authorization": f"Bearer {self.valves.api_key}"}, |
| 56 | + json={"user": __user__, "body": body}, |
| 57 | + ) |
| 58 | + self.outage_map[user_id] = response_data.get("balance", 0) <= 0 |
| 59 | + if self.outage_map[user_id]: |
| 60 | + logger.info("[usage_monitor] no balance: %s", user_id) |
| 61 | + raise CustomException("no balance, please contact administrator") |
124 | 62 |
|
125 |
| - self.outage = response_data.get("balance", 0) <= 0 |
126 |
| - if self.outage: |
127 |
| - raise Exception(self.get_text("insufficient_balance", balance=response_data['balance'])) |
128 | 63 | return body
|
129 | 64 |
|
130 |
| - except requests.exceptions.RequestException as e: |
131 |
| - if ( |
132 |
| - isinstance(e, requests.exceptions.HTTPError) |
133 |
| - and e.response.status_code == 401 |
134 |
| - ): |
135 |
| - return body |
136 |
| - raise Exception(self.get_text("network_request_failed", error=str(e))) |
137 |
| - except Exception as e: |
138 |
| - raise Exception(f"处理请求时发生错误: {str(e)}") |
| 65 | + except Exception as err: |
| 66 | + logger.exception("[usage_monitor] error calculating usage: %s", err) |
| 67 | + if isinstance(err, CustomException): |
| 68 | + raise err |
| 69 | + raise Exception(f"error calculating usage, {err}") from err |
| 70 | + |
| 71 | + finally: |
| 72 | + await client.aclose() |
139 | 73 |
|
140 | 74 | async def outlet(
|
141 | 75 | self,
|
142 | 76 | body: dict,
|
143 |
| - user: Optional[dict] = None, |
144 |
| - __user__: dict = {}, |
145 |
| - __event_emitter__: Callable[[Any], Awaitable[None]] = None, |
| 77 | + __metadata__: Optional[dict] = None, |
| 78 | + __user__: Optional[dict] = None, |
| 79 | + __event_emitter__: callable = None, |
146 | 80 | ) -> dict:
|
147 |
| - if self.outage: |
| 81 | + __user__ = __user__ or {} |
| 82 | + __metadata__ = __metadata__ or {} |
| 83 | + user_id = __user__["id"] |
| 84 | + |
| 85 | + if self.outage_map[user_id]: |
148 | 86 | return body
|
149 | 87 |
|
| 88 | + client = AsyncClient() |
| 89 | + |
150 | 90 | try:
|
151 |
| - post_url = f"{self.valves.API_ENDPOINT}/api/v1/outlet" |
152 |
| - headers = {"Authorization": f"Bearer {self.valves.API_KEY}"} |
153 |
| - |
154 |
| - user_dict = self._prepare_user_dict(__user__) |
155 |
| - body_modify = self._modify_outlet_body(body) |
156 |
| - |
157 |
| - request_data = { |
158 |
| - "user": user_dict, |
159 |
| - "body": body_modify, |
160 |
| - } |
161 |
| - |
162 |
| - response = requests.post(post_url, headers=headers, json=request_data) |
163 |
| - |
164 |
| - if response.status_code == 401: |
165 |
| - if __event_emitter__: |
166 |
| - await __event_emitter__( |
167 |
| - { |
168 |
| - "type": "status", |
169 |
| - "data": { |
170 |
| - "description": "API密钥验证失败", |
171 |
| - "done": True, |
172 |
| - }, |
173 |
| - } |
174 |
| - ) |
175 |
| - return body |
176 |
| - |
177 |
| - response.raise_for_status() |
178 |
| - result = response.json() |
179 |
| - |
180 |
| - if not result.get("success"): |
181 |
| - error_msg = result.get("error", "未知错误") |
182 |
| - error_type = result.get("error_type", "UNKNOWN_ERROR") |
183 |
| - raise Exception(f"请求失败: [{error_type}] {error_msg}") |
184 |
| - |
185 |
| - input_tokens = result["inputTokens"] |
186 |
| - output_tokens = result["outputTokens"] |
187 |
| - total_cost = result["totalCost"] |
188 |
| - new_balance = result["newBalance"] |
189 |
| - |
190 |
| - stats_array = [] |
191 |
| - |
192 |
| - if self.valves.show_cost: |
193 |
| - stats_array.append(self.get_text("cost", cost=total_cost)) |
194 |
| - if self.valves.show_balance: |
195 |
| - stats_array.append(self.get_text("balance", balance=new_balance)) |
196 |
| - if self.valves.show_tokens: |
197 |
| - stats_array.append(self.get_text("tokens", input=input_tokens, output=output_tokens)) |
198 |
| - |
199 |
| - if self.start_time and self.valves.show_spend_time: |
200 |
| - elapsed_time = time.time() - self.start_time |
201 |
| - stats_array.append(self.get_text("time_spent", time=elapsed_time)) |
202 |
| - |
203 |
| - if self.valves.show_tokens_per_sec: |
204 |
| - stats_array.append(self.get_text("tokens_per_sec", tokens_per_sec=output_tokens/elapsed_time)) |
205 |
| - |
206 |
| - stats = " | ".join(stat for stat in stats_array) |
207 |
| - |
208 |
| - if __event_emitter__: |
209 |
| - await __event_emitter__( |
210 |
| - { |
211 |
| - "type": "status", |
212 |
| - "data": { |
213 |
| - "description": stats, |
214 |
| - "done": True, |
215 |
| - }, |
216 |
| - } |
217 |
| - ) |
| 91 | + response_data = await self.request( |
| 92 | + client=client, |
| 93 | + url=f"{self.valves.api_endpoint}/api/v1/outlet", |
| 94 | + headers={"Authorization": f"Bearer {self.valves.api_key}"}, |
| 95 | + json={"user": __user__, "body": body}, |
| 96 | + ) |
218 | 97 |
|
| 98 | + # pylint: disable=C0209 |
| 99 | + stats = " | ".join( |
| 100 | + [ |
| 101 | + f"Tokens: {response_data['inputTokens']} + {response_data['outputTokens']}", |
| 102 | + "Cost: %.4f" % response_data["totalCost"], |
| 103 | + "Balance: %.4f" % response_data["newBalance"], |
| 104 | + ] |
| 105 | + ) |
| 106 | + |
| 107 | + await __event_emitter__({"type": "status", "data": {"description": stats, "done": True}}) |
| 108 | + |
| 109 | + logger.info("usage_monitor: %s %s", user_id, stats) |
219 | 110 | return body
|
220 | 111 |
|
221 |
| - except requests.exceptions.RequestException as e: |
222 |
| - if ( |
223 |
| - isinstance(e, requests.exceptions.HTTPError) |
224 |
| - and e.response.status_code == 401 |
225 |
| - ): |
226 |
| - if __event_emitter__: |
227 |
| - await __event_emitter__( |
228 |
| - { |
229 |
| - "type": "status", |
230 |
| - "data": { |
231 |
| - "description": "API密钥验证失败", |
232 |
| - "done": True, |
233 |
| - }, |
234 |
| - } |
235 |
| - ) |
236 |
| - return body |
237 |
| - raise Exception(f"网络请求失败: {str(e)}") |
238 |
| - except Exception as e: |
239 |
| - if __event_emitter__: |
240 |
| - await __event_emitter__( |
241 |
| - { |
242 |
| - "type": "status", |
243 |
| - "data": { |
244 |
| - "description": f"错误: {str(e)}", |
245 |
| - "done": True, |
246 |
| - }, |
247 |
| - } |
248 |
| - ) |
249 |
| - raise Exception(f"处理请求时发生错误: {str(e)}") |
| 112 | + except Exception as err: |
| 113 | + logger.exception("[usage_monitor] error calculating usage: %s", err) |
| 114 | + raise Exception(f"error calculating usage, {err}") from err |
| 115 | + |
| 116 | + finally: |
| 117 | + await client.aclose() |
0 commit comments