Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ identification_pipeline.py | The core python script uniting this modular pipelin
openai-playground | Scripts for accessing the openai API on PDAP's shared account
source_collectors| Tools for extracting metadata from different sources, including CKAN data portals and Common Crawler
collector_db | Database for storing data from source collectors
collector_manager | A module which provides a unified interface for interacting with source collectors and relevant data

## How to use

Expand Down
29 changes: 29 additions & 0 deletions collector_manager/CollectorBase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""
Base class for all collectors
"""
import abc
import threading
from abc import ABC

from collector_manager.enums import Status


class CollectorBase(ABC):

Check warning on line 11 in collector_manager/CollectorBase.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CollectorBase.py#L11 <101>

Missing docstring in public class
Raw output
./collector_manager/CollectorBase.py:11:1: D101 Missing docstring in public class
def __init__(self, name: str, config: dict) -> None:

Check warning on line 12 in collector_manager/CollectorBase.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CollectorBase.py#L12 <107>

Missing docstring in __init__
Raw output
./collector_manager/CollectorBase.py:12:1: D107 Missing docstring in __init__
self.name = name
self.config = config
self.data = {}
self.logs = []
self.status = Status.RUNNING
# # TODO: Determine how to update this in some of the other collectors
self._stop_event = threading.Event()

@abc.abstractmethod
def run(self) -> None:

Check warning on line 22 in collector_manager/CollectorBase.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CollectorBase.py#L22 <102>

Missing docstring in public method
Raw output
./collector_manager/CollectorBase.py:22:1: D102 Missing docstring in public method
raise NotImplementedError

def log(self, message: str) -> None:

Check warning on line 25 in collector_manager/CollectorBase.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CollectorBase.py#L25 <102>

Missing docstring in public method
Raw output
./collector_manager/CollectorBase.py:25:1: D102 Missing docstring in public method
self.logs.append(message)

def stop(self) -> None:

Check warning on line 28 in collector_manager/CollectorBase.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CollectorBase.py#L28 <102>

Missing docstring in public method
Raw output
./collector_manager/CollectorBase.py:28:1: D102 Missing docstring in public method
self._stop_event.set()

Check warning on line 29 in collector_manager/CollectorBase.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CollectorBase.py#L29 <292>

no newline at end of file
Raw output
./collector_manager/CollectorBase.py:29:31: W292 no newline at end of file
70 changes: 70 additions & 0 deletions collector_manager/CollectorManager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
Manager for all collectors
Can start, stop, and get info on running collectors
And manages the retrieval of collector info
"""

import threading
import uuid
from typing import Dict, List, Optional

from collector_manager.ExampleCollector import ExampleCollector
from collector_manager.enums import Status


# Collector Manager Class
class CollectorManager:

Check warning on line 16 in collector_manager/CollectorManager.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CollectorManager.py#L16 <101>

Missing docstring in public class
Raw output
./collector_manager/CollectorManager.py:16:1: D101 Missing docstring in public class
def __init__(self):

Check warning on line 17 in collector_manager/CollectorManager.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CollectorManager.py#L17 <107>

Missing docstring in __init__
Raw output
./collector_manager/CollectorManager.py:17:1: D107 Missing docstring in __init__
self.collectors: Dict[str, ExampleCollector] = {}

def list_collectors(self) -> List[str]:

Check warning on line 20 in collector_manager/CollectorManager.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CollectorManager.py#L20 <102>

Missing docstring in public method
Raw output
./collector_manager/CollectorManager.py:20:1: D102 Missing docstring in public method
return ["example_collector"]

def start_collector(

Check warning on line 23 in collector_manager/CollectorManager.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CollectorManager.py#L23 <102>

Missing docstring in public method
Raw output
./collector_manager/CollectorManager.py:23:1: D102 Missing docstring in public method
self,
name: str,
config: Optional[dict] = None
) -> str:
cid = str(uuid.uuid4())
# The below would need to become more sophisticated
# As we may load different collectors depending on the name
collector = ExampleCollector(name, config)
self.collectors[cid] = collector
thread = threading.Thread(target=collector.run, daemon=True)
thread.start()
return cid

def get_status(self, cid: Optional[str] = None) -> str | List[str]:

Check warning on line 37 in collector_manager/CollectorManager.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CollectorManager.py#L37 <102>

Missing docstring in public method
Raw output
./collector_manager/CollectorManager.py:37:1: D102 Missing docstring in public method
if cid:
collector = self.collectors.get(cid)
if not collector:
return f"Collector with CID {cid} not found."
return f"{cid} ({collector.name}) - {collector.status}"
else:
return [
f"{cid} ({collector.name}) - {collector.status}"
for cid, collector in self.collectors.items()
]

def get_info(self, cid: str) -> str:

Check warning on line 49 in collector_manager/CollectorManager.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CollectorManager.py#L49 <102>

Missing docstring in public method
Raw output
./collector_manager/CollectorManager.py:49:1: D102 Missing docstring in public method
collector = self.collectors.get(cid)
if not collector:
return f"Collector with CID {cid} not found."
logs = "\n".join(collector.logs[-3:]) # Show the last 3 logs
return f"{cid} ({collector.name}) - {collector.status}\nLogs:\n{logs}"

def close_collector(self, cid: str) -> str:

Check warning on line 56 in collector_manager/CollectorManager.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CollectorManager.py#L56 <102>

Missing docstring in public method
Raw output
./collector_manager/CollectorManager.py:56:1: D102 Missing docstring in public method
collector = self.collectors.get(cid)
if not collector:
return f"Collector with CID {cid} not found."
match collector.status:
case Status.RUNNING:
collector.stop()
return f"Collector {cid} stopped."
case Status.COMPLETED:
data = collector.data
del self.collectors[cid]
return f"Collector {cid} harvested. Data: {data}"
case _:
return f"Cannot close collector {cid} with status {collector.status}."

Check warning on line 70 in collector_manager/CollectorManager.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CollectorManager.py#L70 <391>

blank line at end of file
Raw output
./collector_manager/CollectorManager.py:70:1: W391 blank line at end of file
74 changes: 74 additions & 0 deletions collector_manager/CommandHandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
Command Handler

This module provides a command handler for the Collector Manager CLI.
"""

from typing import List

from collector_manager.CollectorManager import CollectorManager


class CommandHandler:

Check warning on line 12 in collector_manager/CommandHandler.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CommandHandler.py#L12 <101>

Missing docstring in public class
Raw output
./collector_manager/CommandHandler.py:12:1: D101 Missing docstring in public class
def __init__(self, cm: CollectorManager):

Check warning on line 13 in collector_manager/CommandHandler.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CommandHandler.py#L13 <107>

Missing docstring in __init__
Raw output
./collector_manager/CommandHandler.py:13:1: D107 Missing docstring in __init__
self.cm = cm
self.commands = {
"list": self.list_collectors,
"start": self.start_collector,
"status": self.get_status,
"info": self.get_info,
"close": self.close_collector,
"exit": self.exit_manager,
}
self.running = True

def handle_command(self, command: str):

Check warning on line 25 in collector_manager/CommandHandler.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CommandHandler.py#L25 <102>

Missing docstring in public method
Raw output
./collector_manager/CommandHandler.py:25:1: D102 Missing docstring in public method
parts = command.split()
if not parts:
return

cmd = parts[0]
func = self.commands.get(cmd, self.unknown_command)
func(parts)

def list_collectors(self, args: List[str]):

Check warning on line 34 in collector_manager/CommandHandler.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CommandHandler.py#L34 <102>

Missing docstring in public method
Raw output
./collector_manager/CommandHandler.py:34:1: D102 Missing docstring in public method

Check warning on line 34 in collector_manager/CommandHandler.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CommandHandler.py#L34 <100>

Unused argument 'args'
Raw output
./collector_manager/CommandHandler.py:34:31: U100 Unused argument 'args'
print("\n".join(self.cm.list_collectors()))

def start_collector(self, args: List[str]):

Check warning on line 37 in collector_manager/CommandHandler.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CommandHandler.py#L37 <102>

Missing docstring in public method
Raw output
./collector_manager/CommandHandler.py:37:1: D102 Missing docstring in public method
if len(args) < 2:
print("Usage: start {collector_name}")
return
collector_name = args[1]
config = None
if len(args) > 3 and args[2] == "--config":
config = args[3]
cid = self.cm.start_collector(collector_name, config)
print(f"Started collector with CID: {cid}")

def get_status(self, args: List[str]):

Check warning on line 48 in collector_manager/CommandHandler.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CommandHandler.py#L48 <102>

Missing docstring in public method
Raw output
./collector_manager/CommandHandler.py:48:1: D102 Missing docstring in public method
if len(args) > 1:
cid = args[1]
print(self.cm.get_status(cid))
else:
print("\n".join(self.cm.get_status()))

def get_info(self, args: List[str]):

Check warning on line 55 in collector_manager/CommandHandler.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CommandHandler.py#L55 <102>

Missing docstring in public method
Raw output
./collector_manager/CommandHandler.py:55:1: D102 Missing docstring in public method
if len(args) < 2:
print("Usage: info {cid}")
return
cid = args[1]
print(self.cm.get_info(cid))

def close_collector(self, args: List[str]):

Check warning on line 62 in collector_manager/CommandHandler.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CommandHandler.py#L62 <102>

Missing docstring in public method
Raw output
./collector_manager/CommandHandler.py:62:1: D102 Missing docstring in public method
if len(args) < 2:
print("Usage: close {cid}")
return
cid = args[1]
print(self.cm.close_collector(cid))

def exit_manager(self, args: List[str]):

Check warning on line 69 in collector_manager/CommandHandler.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CommandHandler.py#L69 <102>

Missing docstring in public method
Raw output
./collector_manager/CommandHandler.py:69:1: D102 Missing docstring in public method

Check warning on line 69 in collector_manager/CommandHandler.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CommandHandler.py#L69 <100>

Unused argument 'args'
Raw output
./collector_manager/CommandHandler.py:69:28: U100 Unused argument 'args'
print("Exiting Collector Manager.")
self.running = False

def unknown_command(self, args: List[str]):

Check warning on line 73 in collector_manager/CommandHandler.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CommandHandler.py#L73 <102>

Missing docstring in public method
Raw output
./collector_manager/CommandHandler.py:73:1: D102 Missing docstring in public method

Check warning on line 73 in collector_manager/CommandHandler.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/CommandHandler.py#L73 <100>

Unused argument 'args'
Raw output
./collector_manager/CommandHandler.py:73:31: U100 Unused argument 'args'
print("Unknown command.")
28 changes: 28 additions & 0 deletions collector_manager/ExampleCollector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""
Example collector
Exists as a proof of concept for collector functionality

"""
import time

from collector_manager.CollectorBase import CollectorBase
from collector_manager.enums import Status


class ExampleCollector(CollectorBase):

Check warning on line 12 in collector_manager/ExampleCollector.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/ExampleCollector.py#L12 <101>

Missing docstring in public class
Raw output
./collector_manager/ExampleCollector.py:12:1: D101 Missing docstring in public class

def run(self):

Check warning on line 14 in collector_manager/ExampleCollector.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/ExampleCollector.py#L14 <102>

Missing docstring in public method
Raw output
./collector_manager/ExampleCollector.py:14:1: D102 Missing docstring in public method
try:
for i in range(10): # Simulate a task
if self._stop_event.is_set():
self.log("Collector stopped.")
self.status = Status.ERRORED
return
self.log(f"Step {i+1}/10")
time.sleep(1) # Simulate work
self.data = {"message": f"Data collected by {self.name}"}
self.status = Status.COMPLETED
self.log("Collector completed successfully.")
except Exception as e:
self.status = Status.ERRORED
self.log(f"Error: {e}")
23 changes: 23 additions & 0 deletions collector_manager/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
The Collector Manager is a class used to manage collectors. It can start, stop, and get info on running collectors.

The following commands are available:

| Command | Description |
|------------------------------------------|----------------------------------------------------------------|
| list | List running collectors |
| start {collector_name} --config {config} | Start a collector, optionally with a given configuration |
| status {collector_id} | Get status of a collector, or all collectors if no id is given |
| info {collector_id} | Get info on a collector, including recent log updates |
| close {collector_id} | Close a collector |
| exit | Exit the collector manager |

This directory consists of the following files:

| File | Description |
|-------------------|----------------------------------------------------------------|
| CollectorManager.py | Main collector manager class |
| CommandHandler.py | Class used to handle commands from the command line interface |
| CollectorBase.py | Base class for collectors |
|enums.py | Enumerations used in the collector manager |
| ExampleCollector.py | Example collector |
| main.py | Main function for the collector manager |
Empty file added collector_manager/__init__.py
Empty file.
4 changes: 4 additions & 0 deletions collector_manager/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
class Status:

Check warning on line 1 in collector_manager/enums.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/enums.py#L1 <100>

Missing docstring in public module
Raw output
./collector_manager/enums.py:1:1: D100 Missing docstring in public module

Check warning on line 1 in collector_manager/enums.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/enums.py#L1 <101>

Missing docstring in public class
Raw output
./collector_manager/enums.py:1:1: D101 Missing docstring in public class
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
ERRORED = "ERRORED"
19 changes: 19 additions & 0 deletions collector_manager/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""
This starts up the collector manager Command Line Interface (CLI)
"""

from collector_manager.CollectorManager import CollectorManager
from collector_manager.CommandHandler import CommandHandler


def main():

Check warning on line 9 in collector_manager/main.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/main.py#L9 <103>

Missing docstring in public function
Raw output
./collector_manager/main.py:9:1: D103 Missing docstring in public function
cm = CollectorManager()
handler = CommandHandler(cm)
print("Collector Manager CLI")
while handler.running:
command = input("Enter command: ")
handler.handle_command(command)


if __name__ == "__main__":
main()
Loading