From 4a4631c5abd0ed7a2920e42de7045428b1715605 Mon Sep 17 00:00:00 2001 From: ChrisAdamsdevelopment Date: Tue, 6 Jan 2026 08:52:38 -0700 Subject: [PATCH] Add resilient multi-engine dork scanner --- utils/dork_scanner.py | 478 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 478 insertions(+) create mode 100644 utils/dork_scanner.py diff --git a/utils/dork_scanner.py b/utils/dork_scanner.py new file mode 100644 index 0000000..593e8b7 --- /dev/null +++ b/utils/dork_scanner.py @@ -0,0 +1,478 @@ +#!/usr/bin/env python3 +""" +MultiEngineDorkFramework v5.1 - Ethical OSINT Dork Scanner (Dec 19, 2025) +WARNING: Authorized recon only. Respect ToS/rate limits/robots.txt. +No API keys required – pure HTML scraping only. +""" + +import argparse +import asyncio +import json +import logging +import random +import re +import time +import urllib.parse +from dataclasses import asdict, dataclass, field +from functools import lru_cache, wraps +from typing import Dict, List, Optional, Set + +import aiohttp +from bs4 import BeautifulSoup +from tqdm.asyncio import tqdm + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +@dataclass +class SearchResult: + url: str + title: str + snippet: str + engine: str + score: int = field(default=0) + sqlmap_cli: str = field(default="") + nuclei_cli: str = field(default="") + + +class MultiEngineDorkFramework: + USER_AGENTS = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", + ] + + def __init__( + self, + proxies: Optional[List[str]] = None, + output_format: str = "json", + cache_file: str = "dork_cache.json", + ): + self.proxies = proxies or [] + self.output_format = output_format + self.session: Optional[aiohttp.ClientSession] = None + self.cache_file = cache_file + self.cache: Dict[str, List[SearchResult]] = self._load_cache() + self.engines = { + "google": { + "base_url": "https://www.google.com/search?q=", + "result_selector": 'div.g a[href^="http"]', + "title_selector": "h3", + "snippet_selector": ".VwiC3b", + "rate_limit": 2.0, + "pagination_param": "&start=", + }, + "bing": { + "base_url": "https://www.bing.com/search?q=", + "result_selector": 'li.b_algo h2 a[href^="http"]', + "title_selector": "h2", + "snippet_selector": ".b_caption p", + "rate_limit": 1.5, + "pagination_param": "&first=", + }, + "duckduckgo": { + "base_url": "https://html.duckduckgo.com/html/?q=", + "result_selector": '.result__a[href^="http"]', + "title_selector": ".result__title", + "snippet_selector": ".result__snippet", + "rate_limit": 1.0, + "pagination_param": "&s=", + }, + "yandex": { + "base_url": "https://yandex.com/search/?text=", + "result_selector": '.OrganicResults a[href^="http"]', + "title_selector": ".organic__title", + "snippet_selector": ".organic__text", + "rate_limit": 1.2, + "pagination_param": "&p=", + }, + } + + def _load_cache(self) -> Dict[str, List[SearchResult]]: + try: + with open(self.cache_file, "r") as f: + data = json.load(f) + return {eng: [SearchResult(**r) for r in eng_results] for eng, eng_results in data.items()} + except FileNotFoundError: + return {} + + def _save_cache(self) -> None: + with open(self.cache_file, "w") as f: + json.dump({eng: [asdict(r) for r in results] for eng, results in self.cache.items()}, f, indent=2) + + async def _get_session(self) -> aiohttp.ClientSession: + if self.session is None: + connector = aiohttp.TCPConnector(limit=40, limit_per_host=20, ttl_dns_cache=300) + self.session = aiohttp.ClientSession( + connector=connector, headers={"User-Agent": self._get_random_ua()} + ) + return self.session + + def _get_random_ua(self) -> str: + return random.choice(self.USER_AGENTS) + + @lru_cache(maxsize=10000) + def _normalize_url(self, url: str) -> str: + parsed = urllib.parse.urlparse(url.lower()) + return urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path.rstrip("/"), "", "", "")) + + def _score_dork_result(self, result: SearchResult) -> int: + score = 0 + low_snip = result.snippet.lower() + if any( + err in low_snip + for err in [ + "sql syntax", + "mysql_fetch", + "ora-", + "incorrect syntax", + "