-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain_demo.py
More file actions
44 lines (36 loc) · 1.71 KB
/
main_demo.py
File metadata and controls
44 lines (36 loc) · 1.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
import json
import os
from playcast.Parser import ParseOzon, ParseCitilink, ParseDNS, ParseAvito, ParseBit28
from typing import cast, Any
# Пример работы
async def main():
print("Начинаю работу с multiparser...")
# Пример использования:
ozon_results = await ParseOzon.get_cards_by_placeholder("RTX 5080")
print(f"OZON results: {len(ozon_results)} items")
# Можно запустить несколько параллельно
tasks = [
ParseCitilink.get_cards_by_placeholder("RTX 5080"),
ParseDNS.get_cards_by_placeholder("RTX 5080"),
ParseOzon.get_cards_by_placeholder("RTX 5080"),
ParseAvito.get_cards_by_placeholder("RTX 5080 Видеокарта"),
ParseBit28.get_cards_by_placeholder("RTX 5080")
]
results = await asyncio.gather(*tasks, return_exceptions=True)
clean_results = [cast(list[Any], res) for res in results if not isinstance(res, Exception)]
for res in results:
if isinstance(res, Exception):
print(f"Ошибка в парсере: {res}")
print(f"Все парсеры отработали. Общее количество результатов: {sum(len(r) for r in clean_results)}")
# Сохраняем результаты
if clean_results:
file_path = "all_data.json"
if not os.path.exists(file_path):
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(clean_results, f, ensure_ascii=False, indent=4)
print("Файл успешно создан.")
else:
print(f"Файл {file_path} уже существует.")
if __name__ == "__main__":
asyncio.run(main())