From f9449627386849248c40f74d51380376803929d1 Mon Sep 17 00:00:00 2001 From: Stephen Poserina Date: Tue, 9 Jul 2024 20:54:32 -0400 Subject: [PATCH 1/9] publish payloads one at a time and correct schema for the json embedded in updated_contacts_json__c of platform message --- src/server/api/API_ingest/updated_data.py | 19 +++++++++---------- .../pub_sub/salesforce_message_publisher.py | 10 ++++++---- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/server/api/API_ingest/updated_data.py b/src/server/api/API_ingest/updated_data.py index 7d669a7f..1b13da54 100644 --- a/src/server/api/API_ingest/updated_data.py +++ b/src/server/api/API_ingest/updated_data.py @@ -14,21 +14,21 @@ def get_updated_contact_data(): select json_agg (upd) as "cd" from ( select - sf.source_id as "Id" , -- long salesforce string - array_agg(sl.source_id) filter (where sl.source_id is not null) as "Person_Id__c", -- short PAWS-local shelterluv id + sf.source_id as "contactId" , -- long salesforce string + array_agg(sl.source_id) filter (where sl.source_id is not null) as "personIds", -- short PAWS-local shelterluv id case when (extract(epoch from now())::bigint - max(foster_out) < 365*86400) -- foster out in last year or (extract(epoch from now())::bigint - max(foster_return) < 365*86400) -- foster return then 'Active' else 'Inactive' - end as "Foster_Activity__c", - max(foster_out) as "Foster_Start_Date__c", - max(foster_return) as "Foster_End_Date__c", - min(vol.first_date) "First_volunteer_date__c", - max(vol.last_date) "Last_volunteer_date__c", - sum(vol.hours) as "Total_volunteer_hours__c", - array_agg(vc.source_id::integer) filter(where vc.source_id is not null) as "Volgistics_Id__c" + end as "volunteerStatus", + to_timestamp(max(foster_out) / 1000)::date as "fosterStartDate", + to_timestamp(max(foster_return) / 1000)::date as "fosterEndDate", + to_timestamp(min(vol.first_date) / 1000)::date "firstVolunteerDate", + to_timestamp(max(vol.last_date) / 1000)::date "lastVolunteerDate", + sum(vol.hours) as "totalVolunteerHours", + array_agg(vc.source_id::integer) filter(where vc.source_id is not null) as "volgisticIds" from ( select source_id, matching_id from pdp_contacts sf where sf.source_type = 'salesforcecontacts' @@ -63,6 +63,5 @@ def get_updated_contact_data(): result = session.execute(qry) sfdata = result.fetchone()[0] if sfdata: - logger.debug(sfdata) logger.debug("Query for Salesforce update returned %d records", len(sfdata)) return sfdata \ No newline at end of file diff --git a/src/server/pub_sub/salesforce_message_publisher.py b/src/server/pub_sub/salesforce_message_publisher.py index 5d239090..37340128 100644 --- a/src/server/pub_sub/salesforce_message_publisher.py +++ b/src/server/pub_sub/salesforce_message_publisher.py @@ -60,7 +60,8 @@ def send_pipeline_update_messages(contacts_list): schema_id = stub.GetTopic(pb2.TopicRequest(topic_name=UPDATE_TOPIC), metadata=auth_meta_data).schema_id schema = stub.GetSchema(pb2.SchemaRequest(schema_id=schema_id), metadata=auth_meta_data).schema_json - payloads = [] + + batches = 0 while len(contacts_list) > 0: if len(contacts_list) > BATCH_SIZE: current_batch = contacts_list[:BATCH_SIZE] @@ -85,9 +86,10 @@ def send_pipeline_update_messages(contacts_list): "schema_id": schema_id, "payload": buf.getvalue() } - payloads.append(payload) - stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=payloads), metadata=auth_meta_data) + stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=[payload]), metadata=auth_meta_data) + logger.info('Sent %s contacts in message', len(current_batch)) + batches = batches + 1 - logger.info("%s total pipeline update messages sent", len(payloads)) + logger.info('completed sending platform messages, %s messages sent', batches) From 8ecbcb09165a445f7e095f17d69be8c5be7bbe44 Mon Sep 17 00:00:00 2001 From: Stephen Poserina Date: Sun, 8 Sep 2024 19:45:32 -0400 Subject: [PATCH 2/9] update get_updated_data endpoint to return data and length, use volunteer dates to determine volunteer status --- src/server/api/API_ingest/updated_data.py | 3 +-- src/server/api/internal_api.py | 6 +++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/server/api/API_ingest/updated_data.py b/src/server/api/API_ingest/updated_data.py index 1b13da54..6e3b5cf0 100644 --- a/src/server/api/API_ingest/updated_data.py +++ b/src/server/api/API_ingest/updated_data.py @@ -18,8 +18,7 @@ def get_updated_contact_data(): array_agg(sl.source_id) filter (where sl.source_id is not null) as "personIds", -- short PAWS-local shelterluv id case when - (extract(epoch from now())::bigint - max(foster_out) < 365*86400) -- foster out in last year - or (extract(epoch from now())::bigint - max(foster_return) < 365*86400) -- foster return + (extract(epoch from now())::bigint - (max(vol.last_date)/1000) < 365*86400) -- volunteered in last year then 'Active' else 'Inactive' end as "volunteerStatus", diff --git a/src/server/api/internal_api.py b/src/server/api/internal_api.py index adc40f8a..80e5bed5 100644 --- a/src/server/api/internal_api.py +++ b/src/server/api/internal_api.py @@ -49,7 +49,11 @@ def get_contact_data(): logger.debug("Returning %d contact records", len(contact_json)) else: logger.debug("No contact records found") - return jsonify({'outcome': 'OK'}), 200 + return jsonify({ + 'outcome': 'OK', + 'data': contact_json, + 'length': len(contact_json) if contact_json else 0 + }), 200 @internal_api.route("/api/internal/start_flow", methods=["GET"]) From 09defdfc6ef216624a15d07ad00946477fa9528d Mon Sep 17 00:00:00 2001 From: Stephen Poserina Date: Sat, 14 Sep 2024 13:36:12 -0400 Subject: [PATCH 3/9] return empty array instead of null for person and volgistics ids in platform messages --- src/server/api/API_ingest/updated_data.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/server/api/API_ingest/updated_data.py b/src/server/api/API_ingest/updated_data.py index 6e3b5cf0..e2d661ad 100644 --- a/src/server/api/API_ingest/updated_data.py +++ b/src/server/api/API_ingest/updated_data.py @@ -15,7 +15,12 @@ def get_updated_contact_data(): from ( select sf.source_id as "contactId" , -- long salesforce string - array_agg(sl.source_id) filter (where sl.source_id is not null) as "personIds", -- short PAWS-local shelterluv id + case + when + array_agg(sl.source_id) filter (where sl.source_id is not null) is null + then '{}'::varchar[] + else array_agg(sl.source_id) filter (where sl.source_id is not null) + end as "personIds", -- short PAWS-local shelterluv id case when (extract(epoch from now())::bigint - (max(vol.last_date)/1000) < 365*86400) -- volunteered in last year @@ -27,7 +32,12 @@ def get_updated_contact_data(): to_timestamp(min(vol.first_date) / 1000)::date "firstVolunteerDate", to_timestamp(max(vol.last_date) / 1000)::date "lastVolunteerDate", sum(vol.hours) as "totalVolunteerHours", - array_agg(vc.source_id::integer) filter(where vc.source_id is not null) as "volgisticIds" + case + when + (array_agg(vc.source_id::integer) filter(where vc.source_id is not null)) is null + then '{}'::int[] + else array_agg(vc.source_id::integer) filter(where vc.source_id is not null) + end as "volgisticIds" from ( select source_id, matching_id from pdp_contacts sf where sf.source_type = 'salesforcecontacts' From 3062d00a36a679af8e79c8baed07e2a5528438e6 Mon Sep 17 00:00:00 2001 From: Steve Carroll Date: Tue, 17 Sep 2024 18:41:24 -0400 Subject: [PATCH 4/9] 571: Add pool_pre_ping to db engine --- src/server/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/config.py b/src/server/config.py index 5783158d..6f6550a2 100644 --- a/src/server/config.py +++ b/src/server/config.py @@ -67,7 +67,7 @@ + POSTGRES_DATABASE ) -engine = db.create_engine(DB) +engine = db.create_engine(DB, pool_pre_ping=True) # Run Alembic to create managed tables # from alembic.config import Config From 52d3953183116942696c2b77f0ff5375044fee15 Mon Sep 17 00:00:00 2001 From: Stephen Poserina Date: Mon, 14 Oct 2024 09:47:33 -0400 Subject: [PATCH 5/9] remove sleep when sending platform messages and change 'lastVolunteerDate' to 'lastShiftDate' in platform message --- src/server/api/API_ingest/updated_data.py | 2 +- src/server/pub_sub/salesforce_message_publisher.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/server/api/API_ingest/updated_data.py b/src/server/api/API_ingest/updated_data.py index e2d661ad..781630b4 100644 --- a/src/server/api/API_ingest/updated_data.py +++ b/src/server/api/API_ingest/updated_data.py @@ -30,7 +30,7 @@ def get_updated_contact_data(): to_timestamp(max(foster_out) / 1000)::date as "fosterStartDate", to_timestamp(max(foster_return) / 1000)::date as "fosterEndDate", to_timestamp(min(vol.first_date) / 1000)::date "firstVolunteerDate", - to_timestamp(max(vol.last_date) / 1000)::date "lastVolunteerDate", + to_timestamp(max(vol.last_date) / 1000)::date "lastShiftDate", sum(vol.hours) as "totalVolunteerHours", case when diff --git a/src/server/pub_sub/salesforce_message_publisher.py b/src/server/pub_sub/salesforce_message_publisher.py index 37340128..6b23c4b5 100644 --- a/src/server/pub_sub/salesforce_message_publisher.py +++ b/src/server/pub_sub/salesforce_message_publisher.py @@ -91,5 +91,6 @@ def send_pipeline_update_messages(contacts_list): logger.info('Sent %s contacts in message', len(current_batch)) batches = batches + 1 + logger.info('completed sending platform messages, %s messages sent', batches) From b90299596bdb2f7e60e3906ee2997f39ca825c61 Mon Sep 17 00:00:00 2001 From: Stephen Poserina Date: Mon, 14 Oct 2024 12:07:44 -0400 Subject: [PATCH 6/9] return person id instead of internal id --- src/server/api/API_ingest/updated_data.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/server/api/API_ingest/updated_data.py b/src/server/api/API_ingest/updated_data.py index 781630b4..995ed9cc 100644 --- a/src/server/api/API_ingest/updated_data.py +++ b/src/server/api/API_ingest/updated_data.py @@ -17,9 +17,9 @@ def get_updated_contact_data(): sf.source_id as "contactId" , -- long salesforce string case when - array_agg(sl.source_id) filter (where sl.source_id is not null) is null + array_agg(sp.id) filter (where sp.id is not null) is null then '{}'::varchar[] - else array_agg(sl.source_id) filter (where sl.source_id is not null) + else array_agg(sp.id) filter (where sp.id is not null) end as "personIds", -- short PAWS-local shelterluv id case when @@ -63,6 +63,7 @@ def get_updated_contact_data(): from volgisticsshifts group by volg_id ) vol on vol.volg_id::text = vc.source_id + left join shelterluvpeople sp on sp.internal_id = sl.source_id where sl.matching_id is not null or vc.matching_id is not null group by sf.source_id ) upd; From db4cddb26a2be8f599c1e61bbe43e18a57be45c3 Mon Sep 17 00:00:00 2001 From: Stephen Poserina Date: Sun, 17 Nov 2024 19:47:35 -0500 Subject: [PATCH 7/9] log status code errors pulling shelterluv people, return 500 if failure when pulling shelterluv people data, display human-readable timestamp in logs... mostly --- .../api/API_ingest/shelterluv_people.py | 20 +++++++++++-------- src/server/api/internal_api.py | 7 +------ src/server/config.py | 9 ++++----- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/src/server/api/API_ingest/shelterluv_people.py b/src/server/api/API_ingest/shelterluv_people.py index 76932a7e..26ab0339 100644 --- a/src/server/api/API_ingest/shelterluv_people.py +++ b/src/server/api/API_ingest/shelterluv_people.py @@ -1,8 +1,11 @@ -import requests, os -from models import ShelterluvPeople -from config import engine -from sqlalchemy.orm import sessionmaker +import os +import requests import structlog +from sqlalchemy.orm import sessionmaker + +from config import engine +from models import ShelterluvPeople + logger = structlog.get_logger() try: @@ -44,17 +47,18 @@ def store_shelterluv_people_all(): with Session() as session: logger.debug("Truncating table shelterluvpeople") - session.execute("TRUNCATE TABLE shelterluvpeople") - logger.debug("Start getting shelterluv contacts from people table") while has_more: r = requests.get("http://shelterluv.com/api/v1/people?limit={}&offset={}".format(LIMIT, offset), - headers={"x-api-key": SHELTERLUV_SECRET_TOKEN}) + headers={"x-api-key": SHELTERLUV_SECRET_TOKEN}) + if r.status_code != 200: + logger.error("HTTP status code: %s Error detail: %s", r.status_code, r.text) + raise Exception("Error pulling Shelterluv people") + response = r.json() for person in response["people"]: - #todo: Does this need more "null checks"? session.add(ShelterluvPeople(firstname=person["Firstname"], lastname=person["Lastname"], id=person["ID"] if "ID" in person else None, diff --git a/src/server/api/internal_api.py b/src/server/api/internal_api.py index 80e5bed5..b62e2a9c 100644 --- a/src/server/api/internal_api.py +++ b/src/server/api/internal_api.py @@ -33,14 +33,9 @@ def user_test2(): @internal_api.route("/api/internal/ingestRawData", methods=["GET"]) def ingest_raw_data(): - try: - ingest_sources_from_api.start() - except Exception as e: - logger.error(e) - + ingest_sources_from_api.start() return jsonify({'outcome': 'OK'}), 200 - @internal_api.route("/api/internal/get_updated_data", methods=["GET"]) def get_contact_data(): logger.debug("Calling get_updated_contact_data()") diff --git a/src/server/config.py b/src/server/config.py index 6f6550a2..2af0abb0 100644 --- a/src/server/config.py +++ b/src/server/config.py @@ -1,13 +1,12 @@ +import logging import os import sys -import sqlalchemy as db -import models -from constants import IS_LOCAL, BASE_PATH, RAW_DATA_PATH, OUTPUT_PATH, LOGS_PATH, REPORT_PATH, ZIPPED_FILES -import logging +import sqlalchemy as db import structlog from structlog.processors import CallsiteParameter +from constants import IS_LOCAL, BASE_PATH, RAW_DATA_PATH, OUTPUT_PATH, LOGS_PATH, REPORT_PATH, ZIPPED_FILES # structlog setup for complete app @@ -17,7 +16,7 @@ structlog.processors.add_log_level, structlog.processors.StackInfoRenderer(), structlog.dev.set_exc_info, - structlog.processors.TimeStamper(fmt=None, utc=True ), + structlog.processors.TimeStamper(fmt="iso", utc=True), structlog.processors.CallsiteParameterAdder( [ CallsiteParameter.FILENAME, From b96c1fc64bccc953f4d532a9e52eb64f157f2fa5 Mon Sep 17 00:00:00 2001 From: Stephen Poserina Date: Sun, 17 Nov 2024 20:40:03 -0500 Subject: [PATCH 8/9] rewrite get updated data query to fix duplicates --- src/server/api/API_ingest/updated_data.py | 78 +++++++++-------------- 1 file changed, 30 insertions(+), 48 deletions(-) diff --git a/src/server/api/API_ingest/updated_data.py b/src/server/api/API_ingest/updated_data.py index 995ed9cc..cb7a9ab6 100644 --- a/src/server/api/API_ingest/updated_data.py +++ b/src/server/api/API_ingest/updated_data.py @@ -13,59 +13,41 @@ def get_updated_contact_data(): qry = """ -- Collect latest foster/volunteer dates select json_agg (upd) as "cd" from ( - select - sf.source_id as "contactId" , -- long salesforce string - case - when - array_agg(sp.id) filter (where sp.id is not null) is null - then '{}'::varchar[] - else array_agg(sp.id) filter (where sp.id is not null) - end as "personIds", -- short PAWS-local shelterluv id + select + salesforce.source_id as "contactId", + shelterluv.person_ids as "personIds", case - when - (extract(epoch from now())::bigint - (max(vol.last_date)/1000) < 365*86400) -- volunteered in last year - then 'Active' - else 'Inactive' - end as "volunteerStatus", - to_timestamp(max(foster_out) / 1000)::date as "fosterStartDate", - to_timestamp(max(foster_return) / 1000)::date as "fosterEndDate", - to_timestamp(min(vol.first_date) / 1000)::date "firstVolunteerDate", - to_timestamp(max(vol.last_date) / 1000)::date "lastShiftDate", - sum(vol.hours) as "totalVolunteerHours", - case - when - (array_agg(vc.source_id::integer) filter(where vc.source_id is not null)) is null - then '{}'::int[] - else array_agg(vc.source_id::integer) filter(where vc.source_id is not null) - end as "volgisticIds" + when volgistics.last_shift_date > now() - interval '1 year' then 'Active' else 'InActive' + end as "volunteerStatus", + shelterluv.foster_start as "fosterStartDate", + shelterluv.foster_end as "fosterEndDate", + volgistics.first_volunteer_date as "firstVolunteerDate", + volgistics.last_shift_date as "lastShiftDate", + volgistics.total_hours as "totalVolunteerHours", + volgistics.volg_ids as "volgisticIds" from ( - select source_id, matching_id from pdp_contacts sf - where sf.source_type = 'salesforcecontacts' - ) sf - left join pdp_contacts sl on sl.matching_id = sf.matching_id and sl.source_type = 'shelterluvpeople' + select * from pdp_contacts pc where source_type = 'salesforcecontacts' + ) salesforce left join ( - select - person_id, - max(case when event_type=1 then time else null end) * 1000 adopt, - max(case when event_type=2 then time else null end) * 1000 foster_out, - -- max(case when event_type=3 then time else null end) rto, - max(case when event_type=5 then time else null end) * 1000 foster_return - from sl_animal_events - group by person_id - ) sle on sle.person_id::text = sl.source_id - left join pdp_contacts vc on vc.matching_id = sf.matching_id and vc.source_type = 'volgistics' + select matching_id, array_agg(distinct v."number"::int) volg_ids, sum(hours) total_hours, + min(from_date) first_volunteer_date, max(from_date) last_shift_date + from volgistics v + left join volgisticsshifts v2 on v2.volg_id::varchar = v.number + inner join pdp_contacts pc on pc.source_id = v2.volg_id::varchar and pc.source_type = 'volgistics' + group by matching_id + ) volgistics on volgistics.matching_id = salesforce.matching_id left join ( select - volg_id, - sum(hours) as hours, - extract(epoch from min(from_date)) * 1000 as first_date, - extract(epoch from max(from_date)) * 1000 as last_date - from volgisticsshifts - group by volg_id - ) vol on vol.volg_id::text = vc.source_id - left join shelterluvpeople sp on sp.internal_id = sl.source_id - where sl.matching_id is not null or vc.matching_id is not null - group by sf.source_id + matching_id, array_agg(distinct p.internal_id) as person_ids, + max(case when event_type=1 then to_timestamp(time) else null end) adopt, + min(case when event_type=2 then to_timestamp(time) else null end) foster_start, + max(case when event_type=5 then to_timestamp(time) else null end) foster_end + from shelterluvpeople p + left join sl_animal_events sae on sae.person_id::varchar = p.internal_id + inner join pdp_contacts pc on pc.source_id = p.internal_id + group by matching_id + ) shelterluv on shelterluv.matching_id = salesforce.matching_id + where volgistics.matching_id is not null or shelterluv.matching_id is not null ) upd; """ From de15af20dbdfb3991e3561849723d9cea4391e2e Mon Sep 17 00:00:00 2001 From: Stephen Poserina Date: Thu, 6 Feb 2025 21:06:29 -0500 Subject: [PATCH 9/9] add retry logic and fix query parameter typo in SL animal events pull --- .../api/API_ingest/shelterluv_people.py | 29 ++++++++++++++--- src/server/api/API_ingest/sl_animal_events.py | 32 ++++++++++++------- 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/src/server/api/API_ingest/shelterluv_people.py b/src/server/api/API_ingest/shelterluv_people.py index 26ab0339..67afedd9 100644 --- a/src/server/api/API_ingest/shelterluv_people.py +++ b/src/server/api/API_ingest/shelterluv_people.py @@ -1,6 +1,7 @@ import os import requests import structlog +import time from sqlalchemy.orm import sessionmaker from config import engine @@ -26,6 +27,7 @@ TEST_MODE=os.getenv("TEST_MODE") # if not present, has value None LIMIT = 100 +MAX_RETRIES = 10 ################################# # This script is used to fetch data from shelterluv API. # Please be mindful of your usage. @@ -44,6 +46,7 @@ def store_shelterluv_people_all(): offset = 0 has_more = True Session = sessionmaker(engine) + retries = 0 with Session() as session: logger.debug("Truncating table shelterluvpeople") @@ -51,13 +54,29 @@ def store_shelterluv_people_all(): logger.debug("Start getting shelterluv contacts from people table") while has_more: - r = requests.get("http://shelterluv.com/api/v1/people?limit={}&offset={}".format(LIMIT, offset), + if retries > MAX_RETRIES: + raise Exception("reached max retries for get store_shelterluv_people_all") + + try: + r = requests.get("http://shelterluv.com/api/v1/people?limit={}&offset={}".format(LIMIT, offset), headers={"x-api-key": SHELTERLUV_SECRET_TOKEN}) + except Exception as e: + logger.error("store_shelterluv_people_all failed with %s, retrying...", e) + retries += 1 + continue + if r.status_code != 200: - logger.error("HTTP status code: %s Error detail: %s", r.status_code, r.text) - raise Exception("Error pulling Shelterluv people") + logger.error("store_shelterluv_people_all %s code, retrying...", r.status_code) + retries += 1 + continue + + try: + response = r.json() + except Exception as e: + logger.error("store_shelterluv_people_all JSON decode failed with %s", e) + retries += 1 + continue - response = r.json() for person in response["people"]: session.add(ShelterluvPeople(firstname=person["Firstname"], lastname=person["Lastname"], @@ -73,9 +92,11 @@ def store_shelterluv_people_all(): phone=person["Phone"], animal_ids=person["Animal_ids"])) offset += LIMIT + retries = 0 has_more = response["has_more"] if not TEST_MODE else response["has_more"] and offset < 1000 if offset % 1000 == 0: logger.debug("Reading offset %s", str(offset)) + time.sleep(0.2) session.commit() logger.debug("Finished getting shelterluv contacts from people table") diff --git a/src/server/api/API_ingest/sl_animal_events.py b/src/server/api/API_ingest/sl_animal_events.py index 43170184..8a747792 100644 --- a/src/server/api/API_ingest/sl_animal_events.py +++ b/src/server/api/API_ingest/sl_animal_events.py @@ -1,6 +1,7 @@ import json import os import posixpath as path +import time import structlog @@ -25,6 +26,7 @@ BASE_URL = "http://shelterluv.com/api/" MAX_COUNT = 100 # Max records the API will return for one call +MAX_RETRY = 10 # Get the API key try: @@ -75,8 +77,9 @@ def get_event_count(): """Test that server is operational and get total event count.""" - events = "v1/events&offset=0&limit=1" + events = "v1/events?offset=0&limit=1" URL = path.join(BASE_URL, events) + logger.info("making call: %s", URL) try: response = requests.request("GET", URL, headers=headers) @@ -85,7 +88,7 @@ def get_event_count(): return -2 if response.status_code != 200: - logger.error("get_event_count ", response.status_code, "code") + logger.error("get_event_count status code: %s", response.status_code) return -3 try: @@ -111,30 +114,36 @@ def get_events_bulk(): event_records = [] - raw_url = path.join(BASE_URL, "v1/events&offset={0}&limit={1}") + raw_url = path.join(BASE_URL, "v1/events?offset={0}&limit={1}") offset = 0 limit = MAX_COUNT more_records = True + retries = 0 while more_records: + if retries > MAX_RETRY: + raise Exception("get_events_bulk failed, max retries reached") url = raw_url.format(offset, limit) try: response = requests.request("GET", url, headers=headers) except Exception as e: - logger.error("get_events failed with ", e) - return -2 + logger.error("get_events_buk failed with %s, retrying...", e) + retries += 1 + continue if response.status_code != 200: - logger.error("get_event_count ", response.status_code, "code") - return -3 + logger.error("get_events_bulk %s code, retrying...", response.status_code) + retries += 1 + continue try: decoded = json.loads(response.text) except json.decoder.JSONDecodeError as e: - logger.error("get_event_count JSON decode failed with", e) - return -4 + logger.error("get_events_bulk JSON decode failed with %s", e) + retries += 1 + continue if decoded["success"]: for evrec in decoded["events"]: @@ -143,13 +152,14 @@ def get_events_bulk(): more_records = decoded["has_more"] # if so, we'll make another pass offset += limit + retries = 0 if offset % 1000 == 0: logger.debug("Reading offset %s", str(offset)) if TEST_MODE and offset > 1000: - more_records=False # Break out early - + more_records=False # Break out early else: return -5 # AFAICT, this means URL was bad + time.sleep(0.2) return event_records