Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Example environment file. Secrets like API keys are prompted at runtime
# and should not be stored here.
SHARE_PATH=/tmp/shared
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
__pycache__/
.env
*.pyc
.venv/
.mypy_cache/
.pytest_cache/
.coverage
htmlcov/
31 changes: 30 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,30 @@
# ChatGPT-Codex-Testing
# TI OSINT Pipeline

A simplified open-source intelligence pipeline that collects RSS, NVD and Twitter data, enriches it with OpenAI, and sends results to Microsoft Teams.

## Setup

1. Create a virtual environment and install dependencies:
```bash
python -m venv .venv
source .venv/bin/activate
pip install -e .[dev]
```
2. Copy `.env.example` to `.env` and set `SHARE_PATH` or other non-secret values. The
application will prompt for the OpenAI API key, Twitter bearer token and
Microsoft Teams webhook URL at runtime if they are not provided via
environment variables.
3. Run the pipeline:
```bash
python -m ti_osint_pipeline "search term"
```

## Testing

```bash
pytest --cov=ti_osint_pipeline
```

## Notes

This project demonstrates secure configuration loading, asynchronous HTTP calls and minimal GUI integration. It is not intended for production use.
26 changes: 26 additions & 0 deletions aiohttp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Stub aiohttp module for tests without network."""

from __future__ import annotations


class ClientTimeout:
def __init__(self, total=None):
self.total = total


class ClientSession:
def __init__(self, timeout=None):
self.timeout = timeout

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
return False

# Placeholder methods; real behavior is mocked in tests.
def get(self, *args, **kwargs): # pragma: no cover - not used
raise NotImplementedError

def post(self, *args, **kwargs): # pragma: no cover - not used
raise NotImplementedError
4 changes: 4 additions & 0 deletions dotenv/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Stub dotenv module."""

def load_dotenv() -> None:
return None
9 changes: 9 additions & 0 deletions feedparser/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Minimal feedparser stub."""

from __future__ import annotations

from types import SimpleNamespace


def parse(text: str):
return SimpleNamespace(entries=[])
12 changes: 12 additions & 0 deletions openai/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Minimal openai stub for testing."""

from __future__ import annotations


api_key = None # placeholder for attribute access


class ChatCompletion:
@staticmethod
async def acreate(**kwargs): # pragma: no cover - replaced in tests
return {"choices": [{"message": {"content": "{}"}}]}
23 changes: 23 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[project]
name = "ti_osint_pipeline"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
"aiohttp==3.9.5",
"feedparser==6.0.10",
"python-dotenv==1.0.1",
"openai==0.27.8"
]

[project.optional-dependencies]
dev = [
"pytest==8.2.1",
"pytest-cov==5.0.0"
]

[build-system]
requires = ["setuptools>=67", "wheel"]
build-backend = "setuptools.build_meta"

[tool.pytest.ini_options]
addopts = "-ra"
5 changes: 5 additions & 0 deletions ti_osint_pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""TI OSINT Pipeline package."""

from .core import run_pipeline

__all__ = ["run_pipeline"]
16 changes: 16 additions & 0 deletions ti_osint_pipeline/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""CLI entry point for the pipeline."""

from __future__ import annotations

import sys

from .core import run_pipeline


def main() -> None:
term = sys.argv[1] if len(sys.argv) > 1 else ""
run_pipeline(term)


if __name__ == "__main__": # pragma: no cover - CLI
main()
59 changes: 59 additions & 0 deletions ti_osint_pipeline/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Configuration management for the pipeline."""

from __future__ import annotations

import os
from dataclasses import dataclass
from pathlib import Path
from getpass import getpass

from dotenv import load_dotenv


@dataclass
class Config:
"""Validated configuration values."""

openai_api_key: str
twitter_bearer: str
teams_webhook_url: str
share_path: Path

@classmethod
def load(cls) -> "Config":
"""Load configuration from environment variables."""
load_dotenv()

openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
openai_api_key = getpass("Enter OPENAI_API_KEY: ").strip()
if not openai_api_key:
raise ValueError("OPENAI_API_KEY is required")

twitter_bearer = os.getenv("TWITTER_BEARER")
if not twitter_bearer:
twitter_bearer = getpass("Enter TWITTER_BEARER: ").strip()
if not twitter_bearer:
raise ValueError("TWITTER_BEARER is required")

teams_webhook_url = os.getenv("TEAMS_WEBHOOK_URL")
if not teams_webhook_url:
teams_webhook_url = getpass("Enter TEAMS_WEBHOOK_URL: ").strip()
if not teams_webhook_url or not teams_webhook_url.startswith("https://"):
raise ValueError("TEAMS_WEBHOOK_URL must start with https://")

share_path = os.getenv("SHARE_PATH")
if not share_path:
raise ValueError("SHARE_PATH is required")

path = Path(share_path).expanduser()
path.mkdir(parents=True, exist_ok=True)
if not os.access(path, os.W_OK):
raise ValueError("SHARE_PATH is not writable")

return cls(
openai_api_key=openai_api_key,
twitter_bearer=twitter_bearer,
teams_webhook_url=teams_webhook_url,
share_path=path,
)
150 changes: 150 additions & 0 deletions ti_osint_pipeline/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""Core business logic for the threat intel pipeline."""

from __future__ import annotations

import asyncio
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict

import aiohttp
import feedparser
import openai

from .config import Config
from .logging_cfg import configure_logging


RSS_URL = "https://example.com/rss"
NVD_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
TWITTER_URL = "https://api.twitter.com/2/tweets/search/recent"


class PipelineError(Exception):
"""Base error for pipeline failures."""


async def fetch_rss(session: aiohttp.ClientSession, term: str) -> Any:
params = {"q": term}
async with session.get(RSS_URL, params=params) as resp:
resp.raise_for_status()
text = await resp.text()
feed = feedparser.parse(text)
return feed.entries[:100]


async def fetch_nvd(session: aiohttp.ClientSession, term: str) -> Any:
params = {"keywordSearch": term, "resultsPerPage": 100}
async with session.get(NVD_URL, params=params) as resp:
resp.raise_for_status()
data = await resp.json()
return data.get("vulnerabilities", [])[:100]


async def fetch_twitter(session: aiohttp.ClientSession, bearer: str, term: str) -> Any:
headers = {"Authorization": f"Bearer {bearer}"}
params = {"query": term, "max_results": 10}
async with session.get(TWITTER_URL, headers=headers, params=params) as resp:
resp.raise_for_status()
data = await resp.json()
return data.get("data", [])[:100]


async def collect(session: aiohttp.ClientSession, cfg: Config, term: str) -> Dict[str, Any]:
rss, nvd, twitter = await asyncio.gather(
fetch_rss(session, term),
fetch_nvd(session, term),
fetch_twitter(session, cfg.twitter_bearer, term),
)
return {"rss": rss, "nvd": nvd, "twitter": twitter}


async def enrich_with_llm(data: Dict[str, Any], cfg: Config) -> Dict[str, Any]:
truncated_json = json.dumps(data)[:2000]
messages = [
{
"role": "user",
"content": (
"You are a senior cyber-threat analyst. Extract IoCs (IPs, domains, "
"hashes, CVEs) and MITRE ATT&CK IDs; then output STRICT JSON with "
'keys: "iocs", "mitre", "summary" (≤120 words).\n\n' +
f"RAW_DATA:\n{truncated_json}"
),
}
]
try:
openai.api_key = cfg.openai_api_key
response = await openai.ChatCompletion.acreate(
model="gpt-4o-mini",
messages=messages,
temperature=0.2,
max_tokens=600,
timeout=40,
)
except Exception as exc: # pragma: no cover - network errors
raise PipelineError("openai failure") from exc

content = response["choices"][0]["message"]["content"]
try:
return json.loads(content)
except json.JSONDecodeError as exc: # pragma: no cover - unlikely
raise PipelineError("invalid LLM JSON") from exc


async def send_teams_message(session: aiohttp.ClientSession, url: str, message: str) -> None:
payload = {"text": message}
async with session.post(url, json=payload) as resp:
resp.raise_for_status()


async def _send_error(url: str, message: str) -> None:
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
await send_teams_message(session, url, message)


def _write_files(base: Path, data: Dict[str, Any]) -> None:
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
json_path = base / f"{ts}.json"
md_path = base / f"{ts}.md"
try:
json_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
md_contents = [f"# Summary\n{data.get('summary', '')}\n", "## IoCs"]
for ioc in data.get("iocs", []):
md_contents.append(f"- {ioc}")
md_contents.append("## MITRE")
for mitre in data.get("mitre", []):
md_contents.append(f"- {mitre}")
md_path.write_text("\n".join(md_contents), encoding="utf-8")
except OSError as exc: # pragma: no cover - filesystem errors
raise PipelineError("file write failure") from exc


async def pipeline(cfg: Config, term: str) -> Dict[str, Any]:
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
collected = await collect(session, cfg, term)
enriched = await enrich_with_llm(collected, cfg)
await send_teams_message(session, cfg.teams_webhook_url, enriched.get("summary", ""))
_write_files(cfg.share_path, enriched)
return enriched


def run_pipeline(term: str) -> Dict[str, Any]:
"""Entry point for running the pipeline synchronously."""
configure_logging()
logging.info("pipeline_start")
cfg = Config.load()
try:
result = asyncio.run(pipeline(cfg, term))
except Exception as exc:
logging.exception("pipeline_error")
try:
asyncio.run(_send_error(cfg.teams_webhook_url, str(exc)))
except Exception:
pass
raise SystemExit(1) from exc
logging.info("pipeline_complete")
return result
Loading