-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdocument_processing.py
149 lines (135 loc) · 4.17 KB
/
document_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import asyncio
import os
from typing import Dict, Any
from dotenv import load_dotenv
import anthropic
from ..orchestrator import OrchestratorBuilder
async def llm_call(prompt: str) -> str:
"""Call Anthropic's Claude API"""
client = anthropic.Anthropic()
response = await client.messages.create(
model="claude-3-opus-20240229",
max_tokens=1000,
temperature=0.7,
system="You are a helpful task decomposition assistant.",
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
# Worker handlers for different document processing tasks
async def handle_text_extraction(data: Dict[str, Any]) -> Dict[str, Any]:
"""Extract text from document (mock implementation)"""
document = data["document"]
return {
"text": f"Extracted text from {document}",
"pages": 5,
"language": "en"
}
async def handle_language_detection(data: Dict[str, Any]) -> Dict[str, Any]:
"""Detect language of text"""
text = data["text"]
return {
"language": "en",
"confidence": 0.95
}
async def handle_summarization(data: Dict[str, Any]) -> Dict[str, Any]:
"""Summarize text content"""
text = data["text"]
return {
"summary": f"Summary of: {text}",
"length": "medium"
}
async def handle_topic_extraction(data: Dict[str, Any]) -> Dict[str, Any]:
"""Extract main topics from text"""
text = data["text"]
return {
"topics": ["topic1", "topic2", "topic3"],
"confidence_scores": [0.9, 0.8, 0.7]
}
async def handle_sentiment_analysis(data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze sentiment of text"""
text = data["text"]
return {
"sentiment": "positive",
"score": 0.8,
"aspects": {
"tone": "professional",
"emotion": "confident"
}
}
async def handle_formatting(data: Dict[str, Any]) -> Dict[str, Any]:
"""Format the final document analysis report"""
return {
"report": {
"summary": data["summary"],
"topics": data["topics"],
"sentiment": data["sentiment"],
"language": data["language"]
},
"format": "json"
}
async def main():
# Load environment variables
load_dotenv()
# Build the document processor
processor = (
OrchestratorBuilder()
.add_worker(
name="extractor",
task_types=["text_extraction"],
handler=handle_text_extraction,
concurrency_limit=2
)
.add_worker(
name="language_detector",
task_types=["language_detection"],
handler=handle_language_detection,
concurrency_limit=1
)
.add_worker(
name="summarizer",
task_types=["summarization"],
handler=handle_summarization,
concurrency_limit=1
)
.add_worker(
name="topic_extractor",
task_types=["topic_extraction"],
handler=handle_topic_extraction,
concurrency_limit=1
)
.add_worker(
name="sentiment_analyzer",
task_types=["sentiment_analysis"],
handler=handle_sentiment_analysis,
concurrency_limit=1
)
.add_worker(
name="formatter",
task_types=["formatting"],
handler=handle_formatting,
concurrency_limit=1
)
.build(llm_caller=llm_call)
)
# Example document to process
document = {
"document": "sample_document.pdf",
"options": {
"include_summary": True,
"include_topics": True,
"include_sentiment": True
}
}
try:
# Process document
print("\nProcessing document...")
results = await processor.execute(document)
# Print results
print("\nProcessing Results:")
for task_id, result in results.items():
print(f"\n{task_id}:")
print(result)
except Exception as e:
print(f"Error processing document: {str(e)}")
if __name__ == "__main__":
asyncio.run(main())