diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index 439b359abf..548c1eeff4 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -170,21 +170,21 @@ def determine_worker_processes(ratio,maximum): sleep_time += 6 #60% of estimate, Maximum value of 45 - core_num_processes = determine_worker_processes(.6, 80) + core_num_processes = determine_worker_processes(.6, 45) logger.info(f"Starting core worker processes with concurrency={core_num_processes}") core_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={core_num_processes} -n core:{uuid.uuid4().hex}@%h" process_list.append(subprocess.Popen(core_worker.split(" "))) sleep_time += 6 #20% of estimate, Maximum value of 25 - secondary_num_processes = determine_worker_processes(.2, 26) + secondary_num_processes = determine_worker_processes(.2, 25) logger.info(f"Starting secondary worker processes with concurrency={secondary_num_processes}") secondary_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={secondary_num_processes} -n secondary:{uuid.uuid4().hex}@%h -Q secondary" process_list.append(subprocess.Popen(secondary_worker.split(" "))) sleep_time += 6 #15% of estimate, Maximum value of 20 - facade_num_processes = determine_worker_processes(.2, 40) + facade_num_processes = determine_worker_processes(.2, 20) logger.info(f"Starting facade worker processes with concurrency={facade_num_processes}") facade_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={facade_num_processes} -n facade:{uuid.uuid4().hex}@%h -Q facade" diff --git a/augur/application/config.py b/augur/application/config.py index c9aff085b1..7cf1eca3fb 100644 --- a/augur/application/config.py +++ b/augur/application/config.py @@ -68,7 +68,7 @@ def get_development_flag(): }, "Celery": { "worker_process_vmem_cap": 0.25, - "refresh_materialized_views_interval_in_days": 7 + "refresh_materialized_views_interval_in_days": 1 }, "Redis": { "cache_group": 0, diff --git a/augur/application/schema/alembic/versions/25_unique_on_mataview.py b/augur/application/schema/alembic/versions/25_unique_on_mataview.py new file mode 100644 index 0000000000..b9bc11e787 --- /dev/null +++ b/augur/application/schema/alembic/versions/25_unique_on_mataview.py @@ -0,0 +1,174 @@ +"""a unique index on a materialized view allows it to be refreshed concurrently, preventing blocking behavior + +Revision ID: 25 +Revises: 24 +Create Date: 2023-08-23 18:17:22.651191 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql +from sqlalchemy import text + +# revision identifiers, used by Alembic. +revision = '25' +down_revision = '24' +branch_labels = None +depends_on = None + + +def upgrade(): + + add_fix_keys_25() + +def downgrade(): + + upgrade=False + + add_fix_keys_25(upgrade) + +def add_fix_keys_25(upgrade=True): + + if upgrade: + + conn = op.get_bind() + conn.execute(text("""CREATE UNIQUE INDEX ON augur_data.api_get_all_repo_prs(repo_id);""")) + + conn = op.get_bind() + conn.execute(text(""" + CREATE UNIQUE INDEX ON augur_data.api_get_all_repos_commits(repo_id); """)) + + conn = op.get_bind() + conn.execute(text(""" + CREATE UNIQUE INDEX ON augur_data.api_get_all_repos_issues(repo_id); """)) + + conn = op.get_bind() + conn.execute(text(""" + CREATE UNIQUE INDEX ON augur_data.augur_new_contributors( cntrb_id, repo_id, month, login, year, rank); """)) + + conn = op.get_bind() + conn.execute(text(""" + CREATE UNIQUE INDEX ON augur_data.explorer_commits_and_committers_daily_count( repo_id, cmt_committer_date); """)) + + conn = op.get_bind() + conn.execute(text(""" + CREATE UNIQUE INDEX ON augur_data.explorer_new_contributors(cntrb_id, created_at, month, year, repo_id, login, rank); """)) + + conn = op.get_bind() + conn.execute(text(""" + CREATE UNIQUE INDEX ON augur_data.explorer_entry_list(repo_id); """)) + + conn = op.get_bind() + conn.execute(text(""" + drop MATERIALIZED VIEW if exists augur_data.explorer_libyear_all; + drop MATERIALIZED VIEW if exists augur_data.explorer_libyear_detail; + drop MATERIALIZED VIEW if exists augur_data.explorer_libyear_summary; + drop MATERIALIZED VIEW if exists augur_data.explorer_contributor_actions; """)) + + conn = op.get_bind() + conn.execute(text(""" + create materialized view augur_data.explorer_contributor_actions as + SELECT a.id AS cntrb_id, + a.created_at, + a.repo_id, + a.action, + repo.repo_name, + a.login, + row_number() OVER (PARTITION BY a.id, a.repo_id ORDER BY a.created_at desc) AS rank + FROM ( SELECT commits.cmt_ght_author_id AS id, + commits.cmt_author_timestamp AS created_at, + commits.repo_id, + 'commit'::text AS action, + contributors.cntrb_login AS login + FROM (augur_data.commits + LEFT JOIN augur_data.contributors ON (((contributors.cntrb_id)::text = (commits.cmt_ght_author_id)::text))) + GROUP BY commits.cmt_commit_hash, commits.cmt_ght_author_id, commits.repo_id, commits.cmt_author_timestamp, 'commit'::text, contributors.cntrb_login + UNION ALL + SELECT issues.reporter_id AS id, + issues.created_at, + issues.repo_id, + 'issue_opened'::text AS action, + contributors.cntrb_login AS login + FROM (augur_data.issues + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = issues.reporter_id))) + WHERE (issues.pull_request IS NULL) + UNION ALL + SELECT pull_request_events.cntrb_id AS id, + pull_request_events.created_at, + pull_requests.repo_id, + 'pull_request_closed'::text AS action, + contributors.cntrb_login AS login + FROM augur_data.pull_requests, + (augur_data.pull_request_events + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = pull_request_events.cntrb_id))) + WHERE ((pull_requests.pull_request_id = pull_request_events.pull_request_id) AND (pull_requests.pr_merged_at IS NULL) AND ((pull_request_events.action)::text = 'closed'::text)) + UNION ALL + SELECT pull_request_events.cntrb_id AS id, + pull_request_events.created_at, + pull_requests.repo_id, + 'pull_request_merged'::text AS action, + contributors.cntrb_login AS login + FROM augur_data.pull_requests, + (augur_data.pull_request_events + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = pull_request_events.cntrb_id))) + WHERE ((pull_requests.pull_request_id = pull_request_events.pull_request_id) AND ((pull_request_events.action)::text = 'merged'::text)) + UNION ALL + SELECT issue_events.cntrb_id AS id, + issue_events.created_at, + issues.repo_id, + 'issue_closed'::text AS action, + contributors.cntrb_login AS login + FROM augur_data.issues, + (augur_data.issue_events + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = issue_events.cntrb_id))) + WHERE ((issues.issue_id = issue_events.issue_id) AND (issues.pull_request IS NULL) AND ((issue_events.action)::text = 'closed'::text)) + UNION ALL + SELECT pull_request_reviews.cntrb_id AS id, + pull_request_reviews.pr_review_submitted_at AS created_at, + pull_requests.repo_id, + ('pull_request_review_'::text || (pull_request_reviews.pr_review_state)::text) AS action, + contributors.cntrb_login AS login + FROM augur_data.pull_requests, + (augur_data.pull_request_reviews + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = pull_request_reviews.cntrb_id))) + WHERE (pull_requests.pull_request_id = pull_request_reviews.pull_request_id) + UNION ALL + SELECT pull_requests.pr_augur_contributor_id AS id, + pull_requests.pr_created_at AS created_at, + pull_requests.repo_id, + 'pull_request_open'::text AS action, + contributors.cntrb_login AS login + FROM (augur_data.pull_requests + LEFT JOIN augur_data.contributors ON ((pull_requests.pr_augur_contributor_id = contributors.cntrb_id))) + UNION ALL + SELECT message.cntrb_id AS id, + message.msg_timestamp AS created_at, + pull_requests.repo_id, + 'pull_request_comment'::text AS action, + contributors.cntrb_login AS login + FROM augur_data.pull_requests, + augur_data.pull_request_message_ref, + (augur_data.message + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = message.cntrb_id))) + WHERE ((pull_request_message_ref.pull_request_id = pull_requests.pull_request_id) AND (pull_request_message_ref.msg_id = message.msg_id)) + UNION ALL + SELECT issues.reporter_id AS id, + message.msg_timestamp AS created_at, + issues.repo_id, + 'issue_comment'::text AS action, + contributors.cntrb_login AS login + FROM augur_data.issues, + augur_data.issue_message_ref, + (augur_data.message + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = message.cntrb_id))) + WHERE ((issue_message_ref.msg_id = message.msg_id) AND (issues.issue_id = issue_message_ref.issue_id) AND (issues.closed_at <> message.msg_timestamp))) a, + augur_data.repo + WHERE (a.repo_id = repo.repo_id) + ORDER BY a.created_at DESC; + + update augur_operations.config set value='1' where setting_name = 'refresh_materialized_views_interval_in_days';""")) + + conn = op.get_bind() + conn.execute(text(""" + CREATE UNIQUE INDEX ON augur_data.explorer_contributor_actions(cntrb_id,created_at,repo_id, action, repo_name,login, rank); """)) + diff --git a/augur/tasks/db/refresh_materialized_views.py b/augur/tasks/db/refresh_materialized_views.py index 53b29ddbd2..a5ea157c4d 100644 --- a/augur/tasks/db/refresh_materialized_views.py +++ b/augur/tasks/db/refresh_materialized_views.py @@ -16,15 +16,15 @@ def refresh_materialized_views(): logger = logging.getLogger(refresh_materialized_views.__name__) refresh_view_query = s.sql.text(""" - REFRESH MATERIALIZED VIEW augur_data.api_get_all_repos_issues with data; - REFRESH MATERIALIZED VIEW augur_data.explorer_commits_and_committers_daily_count with data; - REFRESH MATERIALIZED VIEW augur_data.api_get_all_repos_commits with data; - REFRESH MATERIALIZED VIEW augur_data.augur_new_contributors with data; - REFRESH MATERIALIZED VIEW augur_data.explorer_contributor_actions with data; - REFRESH MATERIALIZED VIEW augur_data.explorer_libyear_all with data; - REFRESH MATERIALIZED VIEW augur_data.explorer_libyear_detail with data; - REFRESH MATERIALIZED VIEW augur_data.explorer_new_contributors with data; - REFRESH MATERIALIZED VIEW augur_data.explorer_libyear_summary with data; + REFRESH MATERIALIZED VIEW concurrently augur_data.api_get_all_repo_prs with data; + REFRESH MATERIALIZED VIEW concurrently augur_data.api_get_all_repos_commits with data; + REFRESH MATERIALIZED VIEW concurrently augur_data.api_get_all_repos_issues with data; + REFRESH MATERIALIZED VIEW concurrently augur_data.augur_new_contributors with data; + REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_commits_and_committers_daily_count with data; + REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_new_contributors with data; + REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_entry_list with data; + REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_new_contributors with data; + REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_contributor_actions with data; """) with DatabaseSession(logger, engine) as session: diff --git a/augur/tasks/github/issues/tasks.py b/augur/tasks/github/issues/tasks.py index cb2c9787b6..5380b8bf10 100644 --- a/augur/tasks/github/issues/tasks.py +++ b/augur/tasks/github/issues/tasks.py @@ -5,6 +5,7 @@ from sqlalchemy.exc import IntegrityError +from augur.tasks.github.util.github_api_key_handler import GithubApiKeyHandler from augur.tasks.init.celery_app import celery_app as celery from augur.tasks.init.celery_app import AugurCoreRepoCollectionTask @@ -29,16 +30,29 @@ def collect_issues(repo_git : str) -> int: augur_db = manifest.augur_db + logger.info(f'this is the manifest.key_auth value: {str(manifest.key_auth)}') + try: query = augur_db.session.query(Repo).filter(Repo.repo_git == repo_git) repo_obj = execute_session_query(query, 'one') repo_id = repo_obj.repo_id + #try this + # the_key = manifest.key_auth + # try: + # randomon = GithubApiKeyHandler(augur_db.session) + # the_key = randomon.get_random_key() + # logger.info(f'The Random Key {the_key}') + # except Exception as e: + # logger.info(f'error: {e}') + # the_key = manifest.key_auth + # pass + owner, repo = get_owner_repo(repo_git) issue_data = retrieve_all_issue_data(repo_git, logger, manifest.key_auth) - + #issue_data = retrieve_all_issue_data(repo_git, logger, the_key) if issue_data: total_issues = len(issue_data) diff --git a/augur/tasks/github/repo_info/core.py b/augur/tasks/github/repo_info/core.py index 50fa88068e..50142f614e 100644 --- a/augur/tasks/github/repo_info/core.py +++ b/augur/tasks/github/repo_info/core.py @@ -150,10 +150,10 @@ def repo_info_model(augur_db, key_auth, repo_orm_obj, logger): pr_merged: pullRequests(states: MERGED) { totalCount } - ref(qualifiedName: "master") { + defaultBranchRef { target { ... on Commit { - history(first: 0){ + history { totalCount } } @@ -248,7 +248,7 @@ def repo_info_model(augur_db, key_auth, repo_orm_obj, logger): 'security_audit_file': None, 'status': None, 'keywords': None, - 'commit_count': data['ref']['target']['history']['totalCount'] if data['ref'] else None, + 'commit_count': data['defaultBranchRef']['target']['history']['totalCount'] if data['defaultBranchRef'] else None, 'issues_count': data['issue_count']['totalCount'] if data['issue_count'] else None, 'issues_closed': data['issues_closed']['totalCount'] if data['issues_closed'] else None, 'pull_request_count': data['pr_count']['totalCount'] if data['pr_count'] else None, @@ -256,7 +256,7 @@ def repo_info_model(augur_db, key_auth, repo_orm_obj, logger): 'pull_requests_closed': data['pr_closed']['totalCount'] if data['pr_closed'] else None, 'pull_requests_merged': data['pr_merged']['totalCount'] if data['pr_merged'] else None, 'tool_source': 'Repo_info Model', - 'tool_version': '0.42', + 'tool_version': '0.50.0', 'data_source': "Github" } diff --git a/augur/tasks/github/util/github_api_key_handler.py b/augur/tasks/github/util/github_api_key_handler.py index 2406ecef00..2a56e9c1c7 100644 --- a/augur/tasks/github/util/github_api_key_handler.py +++ b/augur/tasks/github/util/github_api_key_handler.py @@ -7,6 +7,7 @@ from augur.tasks.util.redis_list import RedisList from augur.application.db.session import DatabaseSession from augur.application.config import AugurConfig +from sqlalchemy import func class NoValidKeysError(Exception): @@ -39,7 +40,7 @@ def __init__(self, session: DatabaseSession): self.keys = self.get_api_keys() - # self.logger.debug(f"Retrieved {len(self.keys)} github api keys for use") + self.logger.info(f"Retrieved {len(self.keys)} github api keys for use") def get_random_key(self): """Retrieves a random key from the list of keys @@ -71,8 +72,11 @@ def get_api_keys_from_database(self) -> List[str]: from augur.application.db.models import WorkerOauth select = WorkerOauth.access_token + # randomizing the order at db time + #select.order_by(func.random()) where = [WorkerOauth.access_token != self.config_key, WorkerOauth.platform == 'github'] + #return [key_tuple[0] for key_tuple in self.session.query(select).filter(*where).order_by(func.random()).all()] return [key_tuple[0] for key_tuple in self.session.query(select).filter(*where).all()] @@ -130,6 +134,18 @@ def get_api_keys(self) -> List[str]: if not valid_keys: raise NoValidKeysError("No valid github api keys found in the config or worker oauth table") + + # shuffling the keys so not all processes get the same keys in the same order + valid_now = valid_keys + #try: + #self.logger.info(f'valid keys before shuffle: {valid_keys}') + #valid_keys = random.sample(valid_keys, len(valid_keys)) + #self.logger.info(f'valid keys AFTER shuffle: {valid_keys}') + #except Exception as e: + # self.logger.debug(f'{e}') + # valid_keys = valid_now + # pass + return valid_keys def is_bad_api_key(self, client: httpx.Client, oauth_key: str) -> bool: diff --git a/augur/tasks/github/util/github_random_key_auth.py b/augur/tasks/github/util/github_random_key_auth.py index 158d578a7c..926ac04216 100644 --- a/augur/tasks/github/util/github_random_key_auth.py +++ b/augur/tasks/github/util/github_random_key_auth.py @@ -3,6 +3,7 @@ from augur.tasks.util.random_key_auth import RandomKeyAuth from augur.tasks.github.util.github_api_key_handler import GithubApiKeyHandler from augur.application.db.session import DatabaseSession +import random class GithubRandomKeyAuth(RandomKeyAuth): @@ -16,6 +17,7 @@ def __init__(self, session: DatabaseSession, logger): # gets the github api keys from the database via the GithubApiKeyHandler github_api_keys = GithubApiKeyHandler(session).keys + #github_api_keys = random.sample(github_api_keys, len(github_api_keys)) if not github_api_keys: print("Failed to find github api keys. This is usually because your key has expired") diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index 1918efbf8a..6d52db8299 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -261,16 +261,16 @@ def augur_collection_monitor(): enabled_phase_names = get_enabled_phase_names_from_config(session.logger, session) if primary_repo_collect_phase.__name__ in enabled_phase_names: - start_primary_collection(session, max_repo=40) + start_primary_collection(session, max_repo=30) if secondary_repo_collect_phase.__name__ in enabled_phase_names: start_secondary_collection(session, max_repo=10) if facade_phase.__name__ in enabled_phase_names: - start_facade_collection(session, max_repo=30) + start_facade_collection(session, max_repo=20) if machine_learning_phase.__name__ in enabled_phase_names: - start_ml_collection(session,max_repo=5) + start_ml_collection(session,max_repo=1) # have a pipe of 180 diff --git a/augur/tasks/util/random_key_auth.py b/augur/tasks/util/random_key_auth.py index 345067ec18..7f7bd65557 100644 --- a/augur/tasks/util/random_key_auth.py +++ b/augur/tasks/util/random_key_auth.py @@ -33,7 +33,7 @@ def auth_flow(self, request: Request) -> Generator[Request, Response, None]: if self.list_of_keys: key_value = choice(self.list_of_keys) - + self.logger.debug(f'Key value used: {key_value}') # formats the key string into a format GitHub will accept if self.key_format: @@ -43,6 +43,7 @@ def auth_flow(self, request: Request) -> Generator[Request, Response, None]: # set the headers of the request with the new key request.headers[self.header_name] = key_string + #self.logger.info(f"List of Keys: {self.list_of_keys}") else: self.logger.error(f"There are no valid keys to make a request with: {self.list_of_keys}") diff --git a/augur/tasks/util/redis_list.py b/augur/tasks/util/redis_list.py index 0a3eaa79fa..0380cc6267 100644 --- a/augur/tasks/util/redis_list.py +++ b/augur/tasks/util/redis_list.py @@ -168,8 +168,10 @@ def pop(self, index: int = None): """ if index is None: - + # This will get a random index from the list and remove it, + # decreasing the likelihood of everyone using the same key all the time redis.rpop(self.redis_list_key) + #redis.spop(self.redis_list_key) else: # calls __delitem__