From 7ef9ce3268815102338e0fb5aa2fa979c6ed9b90 Mon Sep 17 00:00:00 2001 From: Max Chis Date: Sat, 21 Dec 2024 12:04:41 -0500 Subject: [PATCH 1/3] Create Collector Manager Prototype --- collector_manager/CollectorBase.py | 29 +++++++++++ collector_manager/CollectorManager.py | 70 +++++++++++++++++++++++++ collector_manager/CommandHandler.py | 74 +++++++++++++++++++++++++++ collector_manager/ExampleCollector.py | 28 ++++++++++ collector_manager/__init__.py | 0 collector_manager/enums.py | 4 ++ collector_manager/main.py | 19 +++++++ 7 files changed, 224 insertions(+) create mode 100644 collector_manager/CollectorBase.py create mode 100644 collector_manager/CollectorManager.py create mode 100644 collector_manager/CommandHandler.py create mode 100644 collector_manager/ExampleCollector.py create mode 100644 collector_manager/__init__.py create mode 100644 collector_manager/enums.py create mode 100644 collector_manager/main.py diff --git a/collector_manager/CollectorBase.py b/collector_manager/CollectorBase.py new file mode 100644 index 00000000..fe2efe5e --- /dev/null +++ b/collector_manager/CollectorBase.py @@ -0,0 +1,29 @@ +""" +Base class for all collectors +""" +import abc +import threading +from abc import ABC + +from collector_manager.enums import Status + + +class CollectorBase(ABC): + def __init__(self, name: str, config: dict) -> None: + self.name = name + self.config = config + self.data = {} + self.logs = [] + self.status = Status.RUNNING + # # TODO: Determine how to update this in some of the other collectors + self._stop_event = threading.Event() + + @abc.abstractmethod + def run(self) -> None: + raise NotImplementedError + + def log(self, message: str) -> None: + self.logs.append(message) + + def stop(self) -> None: + self._stop_event.set() \ No newline at end of file diff --git a/collector_manager/CollectorManager.py b/collector_manager/CollectorManager.py new file mode 100644 index 00000000..02f85f30 --- /dev/null +++ b/collector_manager/CollectorManager.py @@ -0,0 +1,70 @@ +""" +Manager for all collectors +Can start, stop, and get info on running collectors +And manages the retrieval of collector info +""" + +import threading +import uuid +from typing import Dict, List, Optional + +from collector_manager.ExampleCollector import ExampleCollector +from collector_manager.enums import Status + + +# Collector Manager Class +class CollectorManager: + def __init__(self): + self.collectors: Dict[str, ExampleCollector] = {} + + def list_collectors(self) -> List[str]: + return ["example_collector"] + + def start_collector( + self, + name: str, + config: Optional[dict] = None + ) -> str: + cid = str(uuid.uuid4()) + # The below would need to become more sophisticated + # As we may load different collectors depending on the name + collector = ExampleCollector(name, config) + self.collectors[cid] = collector + thread = threading.Thread(target=collector.run, daemon=True) + thread.start() + return cid + + def get_status(self, cid: Optional[str] = None) -> str | List[str]: + if cid: + collector = self.collectors.get(cid) + if not collector: + return f"Collector with CID {cid} not found." + return f"{cid} ({collector.name}) - {collector.status}" + else: + return [ + f"{cid} ({collector.name}) - {collector.status}" + for cid, collector in self.collectors.items() + ] + + def get_info(self, cid: str) -> str: + collector = self.collectors.get(cid) + if not collector: + return f"Collector with CID {cid} not found." + logs = "\n".join(collector.logs[-3:]) # Show the last 3 logs + return f"{cid} ({collector.name}) - {collector.status}\nLogs:\n{logs}" + + def close_collector(self, cid: str) -> str: + collector = self.collectors.get(cid) + if not collector: + return f"Collector with CID {cid} not found." + match collector.status: + case Status.RUNNING: + collector.stop() + return f"Collector {cid} stopped." + case Status.COMPLETED: + data = collector.data + del self.collectors[cid] + return f"Collector {cid} harvested. Data: {data}" + case _: + return f"Cannot close collector {cid} with status {collector.status}." + diff --git a/collector_manager/CommandHandler.py b/collector_manager/CommandHandler.py new file mode 100644 index 00000000..5a1fe033 --- /dev/null +++ b/collector_manager/CommandHandler.py @@ -0,0 +1,74 @@ +""" +Command Handler + +This module provides a command handler for the Collector Manager CLI. +""" + +from typing import List + +from collector_manager.CollectorManager import CollectorManager + + +class CommandHandler: + def __init__(self, cm: CollectorManager): + self.cm = cm + self.commands = { + "list": self.list_collectors, + "start": self.start_collector, + "status": self.get_status, + "info": self.get_info, + "close": self.close_collector, + "exit": self.exit_manager, + } + self.running = True + + def handle_command(self, command: str): + parts = command.split() + if not parts: + return + + cmd = parts[0] + func = self.commands.get(cmd, self.unknown_command) + func(parts) + + def list_collectors(self, args: List[str]): + print("\n".join(self.cm.list_collectors())) + + def start_collector(self, args: List[str]): + if len(args) < 2: + print("Usage: start {collector_name}") + return + collector_name = args[1] + config = None + if len(args) > 3 and args[2] == "--config": + config = args[3] + cid = self.cm.start_collector(collector_name, config) + print(f"Started collector with CID: {cid}") + + def get_status(self, args: List[str]): + if len(args) > 1: + cid = args[1] + print(self.cm.get_status(cid)) + else: + print("\n".join(self.cm.get_status())) + + def get_info(self, args: List[str]): + if len(args) < 2: + print("Usage: info {cid}") + return + cid = args[1] + print(self.cm.get_info(cid)) + + def close_collector(self, args: List[str]): + if len(args) < 2: + print("Usage: close {cid}") + return + cid = args[1] + print(self.cm.close_collector(cid)) + + def exit_manager(self, args: List[str]): + print("Exiting Collector Manager.") + self.running = False + + def unknown_command(self, args: List[str]): + print("Unknown command.") diff --git a/collector_manager/ExampleCollector.py b/collector_manager/ExampleCollector.py new file mode 100644 index 00000000..55438aff --- /dev/null +++ b/collector_manager/ExampleCollector.py @@ -0,0 +1,28 @@ +""" +Example collector +Exists as a proof of concept for collector functionality + +""" +import time + +from collector_manager.CollectorBase import CollectorBase +from collector_manager.enums import Status + + +class ExampleCollector(CollectorBase): + + def run(self): + try: + for i in range(10): # Simulate a task + if self._stop_event.is_set(): + self.log("Collector stopped.") + self.status = Status.ERRORED + return + self.log(f"Step {i+1}/10") + time.sleep(1) # Simulate work + self.data = {"message": f"Data collected by {self.name}"} + self.status = Status.COMPLETED + self.log("Collector completed successfully.") + except Exception as e: + self.status = Status.ERRORED + self.log(f"Error: {e}") diff --git a/collector_manager/__init__.py b/collector_manager/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/collector_manager/enums.py b/collector_manager/enums.py new file mode 100644 index 00000000..85a2f08b --- /dev/null +++ b/collector_manager/enums.py @@ -0,0 +1,4 @@ +class Status: + RUNNING = "RUNNING" + COMPLETED = "COMPLETED" + ERRORED = "ERRORED" diff --git a/collector_manager/main.py b/collector_manager/main.py new file mode 100644 index 00000000..76295fb8 --- /dev/null +++ b/collector_manager/main.py @@ -0,0 +1,19 @@ +""" +This starts up the collector manager Command Line Interface (CLI) +""" + +from collector_manager.CollectorManager import CollectorManager +from collector_manager.CommandHandler import CommandHandler + + +def main(): + cm = CollectorManager() + handler = CommandHandler(cm) + print("Collector Manager CLI") + while handler.running: + command = input("Enter command: ") + handler.handle_command(command) + + +if __name__ == "__main__": + main() From d433fa7bb5c5683d2dfce257e542b2b86abd0192 Mon Sep 17 00:00:00 2001 From: Max Chis Date: Sun, 22 Dec 2024 15:45:22 -0500 Subject: [PATCH 2/3] Add mention of Collector Manager --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 33440d64..2ba235c8 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ html_tag_collector | Collects HTML header, meta, and title tags and appends them hugging_face | Utilities for interacting with our machine learning space at [Hugging Face](https://huggingface.co/PDAP) identification_pipeline.py | The core python script uniting this modular pipeline. More details below. openai-playground | Scripts for accessing the openai API on PDAP's shared account +collector_manager | A module which provides a unified interface for interacting with source collectors and relevant data ## How to use From 5cd1d43bfa804f38ff7b6f2a1196e2e4ad36a536 Mon Sep 17 00:00:00 2001 From: Max Chis Date: Sun, 22 Dec 2024 16:05:36 -0500 Subject: [PATCH 3/3] Build Collector Manager README.md --- collector_manager/README.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 collector_manager/README.md diff --git a/collector_manager/README.md b/collector_manager/README.md new file mode 100644 index 00000000..8c37fb6e --- /dev/null +++ b/collector_manager/README.md @@ -0,0 +1,23 @@ +The Collector Manager is a class used to manage collectors. It can start, stop, and get info on running collectors. + +The following commands are available: + +| Command | Description | +|------------------------------------------|----------------------------------------------------------------| +| list | List running collectors | +| start {collector_name} --config {config} | Start a collector, optionally with a given configuration | +| status {collector_id} | Get status of a collector, or all collectors if no id is given | +| info {collector_id} | Get info on a collector, including recent log updates | +| close {collector_id} | Close a collector | +| exit | Exit the collector manager | + +This directory consists of the following files: + +| File | Description | +|-------------------|----------------------------------------------------------------| +| CollectorManager.py | Main collector manager class | +| CommandHandler.py | Class used to handle commands from the command line interface | +| CollectorBase.py | Base class for collectors | +|enums.py | Enumerations used in the collector manager | +| ExampleCollector.py | Example collector | +| main.py | Main function for the collector manager | \ No newline at end of file