Skip to content

Commit

Permalink
feat: workers can report to queue
Browse files Browse the repository at this point in the history
  • Loading branch information
aatmanvaidya committed Mar 6, 2024
1 parent ead206d commit 3c59999
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 12 deletions.
3 changes: 2 additions & 1 deletion src/worker/audiovec/audio_payload_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
try:
feluda = Feluda("worker/audiovec/config.yml")
feluda.setup()
audio_index_queue = feluda.config.queue.parameters.queues[0]['name']
feluda.start_component(ComponentType.STORE)
feluda.start_component(ComponentType.QUEUE)

Expand All @@ -14,7 +15,7 @@
"id": str(12345),
"path": 'https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/audio.wav'
}
feluda.queue.message("tattle-search-index-queue", dummy_payload)
feluda.queue.message(audio_index_queue, dummy_payload)
sleep(0.3)

except Exception as e:
Expand Down
30 changes: 26 additions & 4 deletions src/worker/audiovec/audio_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,22 @@
from datetime import datetime
log = Logger(__name__)

def make_report_indexed(data, status):
report = {}
report["indexer_id"] = 1
report["post_id"] = data["id"]
report["status"] = status
report["status_code"] = 200
return json.dumps(report)

def make_report_failed(data, status):
report = {}
report["indexer_id"] = 1
report["post_id"] = data["id"]
report["status"] = status
report["status_code"] = 400
return json.dumps(report)

def indexer(feluda):
def worker(ch, method, properties, body):
print("MESSAGE RECEIVED")
Expand All @@ -24,12 +40,17 @@ def worker(ch, method, properties, body):
"audio_vec": audio_vec,
"date_added": datetime.utcnow(),
}
# result = feluda.store.store(media_type, doc)
# print(result)
result = feluda.store.store(media_type, doc)
print(result)
report = make_report_indexed(file_content, "indexed")
print(report)
feluda.queue.message(feluda.config.queue.parameters.queues[1]['name'], report)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print("Error indexing media", e)
# requeue the media file
report = make_report_failed(file_content, "failed")
feluda.queue.message(feluda.config.queue.parameters.queues[1]['name'], report)
ch.basic_nack(delivery_tag=method.delivery_tag)
return worker

Expand All @@ -52,12 +73,13 @@ def handle_exception(feluda, queue_name, worker_func, retries, max_retries):
try:
feluda = Feluda("worker/audiovec/config.yml")
feluda.setup()
audio_index_queue = feluda.config.queue.parameters.queues[0]['name']
feluda.start_component(ComponentType.STORE)
feluda.start_component(ComponentType.QUEUE)
audio_vec_embedding.initialize(param=None)
feluda.queue.listen("tattle-search-index-queue", indexer(feluda))
feluda.queue.listen(audio_index_queue, indexer(feluda))
except Exception as e:
print("Error Initializing Indexer", e)
retries = 0
max_retries = 10
handle_exception(feluda, "tattle-search-index-queue", indexer(feluda), retries, max_retries)
handle_exception(feluda, audio_index_queue, indexer(feluda), retries, max_retries)
4 changes: 2 additions & 2 deletions src/worker/audiovec/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ queue :
parameters:
host_name : "rabbitmq"
queues:
- name : "tattle-search-index-queue"
- name : "tattle-search-report-queue"
- name : "audio-index-queue"
- name : "report-queue"

operators :
label : "Operators"
Expand Down
4 changes: 2 additions & 2 deletions src/worker/vidvec/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ queue :
parameters:
host_name : "rabbitmq"
queues:
- name : "tattle-search-index-queue"
- name : "tattle-search-report-queue"
- name : "video-index-queue"
- name : "report-queue"

operators :
label : "Operators"
Expand Down
3 changes: 2 additions & 1 deletion src/worker/vidvec/video_payload_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
try:
feluda = Feluda("worker/vidvec/config.yml")
feluda.setup()
video_index_queue = feluda.config.queue.parameters.queues[0]['name']
feluda.start_component(ComponentType.STORE)
feluda.start_component(ComponentType.QUEUE)

Expand All @@ -14,7 +15,7 @@
"id": str(12345),
"path": 'https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/sample-cat-video.mp4'
}
feluda.queue.message("tattle-search-index-queue", dummy_payload)
feluda.queue.message(video_index_queue, dummy_payload)
sleep(0.3)

except Exception as e:
Expand Down
25 changes: 23 additions & 2 deletions src/worker/vidvec/video_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,22 @@
import subprocess
log = Logger(__name__)

def make_report_indexed(data, status):
report = {}
report["indexer_id"] = 1
report["post_id"] = data["id"]
report["status"] = status
report["status_code"] = 200
return json.dumps(report)

def make_report_failed(data, status):
report = {}
report["indexer_id"] = 1
report["post_id"] = data["id"]
report["status"] = status
report["status_code"] = 400
return json.dumps(report)

def generate_document(post_id: str, representation: any):
base_doc = {
"e_kosh_id": "",
Expand Down Expand Up @@ -40,9 +56,13 @@ def worker(ch, method, properties, body):
media_type = MediaType.VIDEO
result = feluda.store.store(media_type, doc)
print(result)
report = make_report_indexed(file_content, "indexed")
feluda.queue.message(feluda.config.queue.parameters.queues[1]['name'], report)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print("Error indexing media", e)
report = make_report_failed(file_content, "failed")
feluda.queue.message(feluda.config.queue.parameters.queues[1]['name'], report)
# requeue the media file
ch.basic_nack(delivery_tag=method.delivery_tag)
return worker
Expand All @@ -66,12 +86,13 @@ def handle_exception(feluda, queue_name, worker_func, retries, max_retries):
try:
feluda = Feluda("worker/vidvec/config.yml")
feluda.setup()
video_index_queue = feluda.config.queue.parameters.queues[0]['name']
feluda.start_component(ComponentType.STORE)
feluda.start_component(ComponentType.QUEUE)
vid_vec_rep_resnet.initialize(param=None)
feluda.queue.listen("tattle-search-index-queue", indexer(feluda))
feluda.queue.listen(video_index_queue, indexer(feluda))
except Exception as e:
print("Error Initializing Indexer", e)
retries = 0
max_retries = 10
handle_exception(feluda, "tattle-search-index-queue", indexer(feluda), retries, max_retries)
handle_exception(feluda, video_index_queue, indexer(feluda), retries, max_retries)

0 comments on commit 3c59999

Please sign in to comment.