From b39a50a43b70c73ce7393eadc783408817649b13 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 16 Nov 2023 14:38:08 +0000 Subject: [PATCH 01/20] Remove obsolete `"app"` from worker templates --- .../conf-workers/synapse.supervisord.conf.j2 | 4 ++-- docker/conf-workers/worker.yaml.j2 | 2 +- docker/configure_workers_and_start.py | 18 ------------------ 3 files changed, 3 insertions(+), 21 deletions(-) diff --git a/docker/conf-workers/synapse.supervisord.conf.j2 b/docker/conf-workers/synapse.supervisord.conf.j2 index 481eb4fc92f..9f7dd819797 100644 --- a/docker/conf-workers/synapse.supervisord.conf.j2 +++ b/docker/conf-workers/synapse.supervisord.conf.j2 @@ -6,7 +6,7 @@ command=/usr/local/bin/python -m synapse.app.complement_fork_starter --config-path="{{ main_config_path }}" --config-path=/conf/workers/shared.yaml {%- for worker in workers %} - -- {{ worker.app }} + -- synapse.app.generic_worker --config-path="{{ main_config_path }}" --config-path=/conf/workers/shared.yaml --config-path=/conf/workers/{{ worker.name }}.yaml @@ -36,7 +36,7 @@ exitcodes=0 {% for worker in workers %} [program:synapse_{{ worker.name }}] -command=/usr/local/bin/prefix-log /usr/local/bin/python -m {{ worker.app }} +command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.generic_worker --config-path="{{ main_config_path }}" --config-path=/conf/workers/shared.yaml --config-path=/conf/workers/{{ worker.name }}.yaml diff --git a/docker/conf-workers/worker.yaml.j2 b/docker/conf-workers/worker.yaml.j2 index 29ec74b4ea0..88e7a33c420 100644 --- a/docker/conf-workers/worker.yaml.j2 +++ b/docker/conf-workers/worker.yaml.j2 @@ -3,7 +3,7 @@ # Values will be change depending on whichever workers are selected when # running that image. -worker_app: "{{ app }}" +worker_app: "synapse.app.generic_worker" worker_name: "{{ name }}" worker_listeners: diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 62952e6b263..48abdae6744 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -90,14 +90,12 @@ # have to attach by instance_map to the master process and have client endpoints. WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "pusher": { - "app": "synapse.app.generic_worker", "listener_resources": [], "endpoint_patterns": [], "shared_extra_conf": {}, "worker_extra_conf": "", }, "user_dir": { - "app": "synapse.app.generic_worker", "listener_resources": ["client"], "endpoint_patterns": [ "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$" @@ -108,7 +106,6 @@ "worker_extra_conf": "", }, "media_repository": { - "app": "synapse.app.generic_worker", "listener_resources": ["media"], "endpoint_patterns": [ "^/_matrix/media/", @@ -126,7 +123,6 @@ "worker_extra_conf": "enable_media_repo: true", }, "appservice": { - "app": "synapse.app.generic_worker", "listener_resources": [], "endpoint_patterns": [], "shared_extra_conf": { @@ -135,14 +131,12 @@ "worker_extra_conf": "", }, "federation_sender": { - "app": "synapse.app.generic_worker", "listener_resources": [], "endpoint_patterns": [], "shared_extra_conf": {}, "worker_extra_conf": "", }, "synchrotron": { - "app": "synapse.app.generic_worker", "listener_resources": ["client"], "endpoint_patterns": [ "^/_matrix/client/(v2_alpha|r0|v3)/sync$", @@ -154,7 +148,6 @@ "worker_extra_conf": "", }, "client_reader": { - "app": "synapse.app.generic_worker", "listener_resources": ["client"], "endpoint_patterns": [ "^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$", @@ -189,7 +182,6 @@ "worker_extra_conf": "", }, "federation_reader": { - "app": "synapse.app.generic_worker", "listener_resources": ["federation"], "endpoint_patterns": [ "^/_matrix/federation/(v1|v2)/event/", @@ -216,21 +208,18 @@ "worker_extra_conf": "", }, "federation_inbound": { - "app": "synapse.app.generic_worker", "listener_resources": ["federation"], "endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"], "shared_extra_conf": {}, "worker_extra_conf": "", }, "event_persister": { - "app": "synapse.app.generic_worker", "listener_resources": ["replication"], "endpoint_patterns": [], "shared_extra_conf": {}, "worker_extra_conf": "", }, "background_worker": { - "app": "synapse.app.generic_worker", "listener_resources": [], "endpoint_patterns": [], # This worker cannot be sharded. Therefore, there should only ever be one @@ -239,7 +228,6 @@ "worker_extra_conf": "", }, "event_creator": { - "app": "synapse.app.generic_worker", "listener_resources": ["client"], "endpoint_patterns": [ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact", @@ -253,14 +241,12 @@ "worker_extra_conf": "", }, "frontend_proxy": { - "app": "synapse.app.generic_worker", "listener_resources": ["client", "replication"], "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], "shared_extra_conf": {}, "worker_extra_conf": "", }, "account_data": { - "app": "synapse.app.generic_worker", "listener_resources": ["client", "replication"], "endpoint_patterns": [ "^/_matrix/client/(r0|v3|unstable)/.*/tags", @@ -270,14 +256,12 @@ "worker_extra_conf": "", }, "presence": { - "app": "synapse.app.generic_worker", "listener_resources": ["client", "replication"], "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"], "shared_extra_conf": {}, "worker_extra_conf": "", }, "receipts": { - "app": "synapse.app.generic_worker", "listener_resources": ["client", "replication"], "endpoint_patterns": [ "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt", @@ -287,14 +271,12 @@ "worker_extra_conf": "", }, "to_device": { - "app": "synapse.app.generic_worker", "listener_resources": ["client", "replication"], "endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"], "shared_extra_conf": {}, "worker_extra_conf": "", }, "typing": { - "app": "synapse.app.generic_worker", "listener_resources": ["client", "replication"], "endpoint_patterns": [ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing" From a22eb7dc1563f4dce521dd4b42fb53ba8ede3b28 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 16 Nov 2023 15:00:48 +0000 Subject: [PATCH 02/20] Convert worker templates into dataclass --- docker/configure_workers_and_start.py | 312 +++++++++++++------------- 1 file changed, 162 insertions(+), 150 deletions(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 48abdae6744..8329aab6dca 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -47,12 +47,14 @@ # in the project's README), this script may be run multiple times, and functionality should # continue to work if so. +import dataclasses import os import platform import re import subprocess import sys from collections import defaultdict +from dataclasses import dataclass, field from itertools import chain from pathlib import Path from typing import ( @@ -82,32 +84,41 @@ # during processing with the name of the worker. WORKER_PLACEHOLDER_NAME = "placeholder_name" + # Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources # Watching /_matrix/client needs a "client" listener # Watching /_matrix/federation needs a "federation" listener # Watching /_matrix/media and related needs a "media" listener # Stream Writers require "client" and "replication" listeners because they # have to attach by instance_map to the master process and have client endpoints. -WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { - "pusher": { - "listener_resources": [], - "endpoint_patterns": [], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "user_dir": { - "listener_resources": ["client"], - "endpoint_patterns": [ +@dataclass +class WorkerTemplate: + listener_resources: List[str] = field(default_factory=list) + endpoint_patterns: List[str] = field(default_factory=list) + shared_extra_conf: Dict[str, Any] = field(default_factory=dict) + worker_extra_conf: str = "" + + +WORKERS_CONFIG: Dict[str, WorkerTemplate] = { + "pusher": WorkerTemplate( + listener_resources=[], + endpoint_patterns=[], + shared_extra_conf={}, + worker_extra_conf="", + ), + "user_dir": WorkerTemplate( + listener_resources=["client"], + endpoint_patterns=[ "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$" ], - "shared_extra_conf": { + shared_extra_conf={ "update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME }, - "worker_extra_conf": "", - }, - "media_repository": { - "listener_resources": ["media"], - "endpoint_patterns": [ + worker_extra_conf="", + ), + "media_repository": WorkerTemplate( + listener_resources=["media"], + endpoint_patterns=[ "^/_matrix/media/", "^/_synapse/admin/v1/purge_media_cache$", "^/_synapse/admin/v1/room/.*/media.*$", @@ -116,40 +127,38 @@ "^/_synapse/admin/v1/quarantine_media/.*$", ], # The first configured media worker will run the media background jobs - "shared_extra_conf": { + shared_extra_conf={ "enable_media_repo": False, "media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME, }, - "worker_extra_conf": "enable_media_repo: true", - }, - "appservice": { - "listener_resources": [], - "endpoint_patterns": [], - "shared_extra_conf": { - "notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME - }, - "worker_extra_conf": "", - }, - "federation_sender": { - "listener_resources": [], - "endpoint_patterns": [], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "synchrotron": { - "listener_resources": ["client"], - "endpoint_patterns": [ + worker_extra_conf="enable_media_repo: true", + ), + "appservice": WorkerTemplate( + listener_resources=[], + endpoint_patterns=[], + shared_extra_conf={"notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME}, + worker_extra_conf="", + ), + "federation_sender": WorkerTemplate( + listener_resources=[], + endpoint_patterns=[], + shared_extra_conf={}, + worker_extra_conf="", + ), + "synchrotron": WorkerTemplate( + listener_resources=["client"], + endpoint_patterns=[ "^/_matrix/client/(v2_alpha|r0|v3)/sync$", "^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$", "^/_matrix/client/(api/v1|r0|v3)/initialSync$", "^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$", ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "client_reader": { - "listener_resources": ["client"], - "endpoint_patterns": [ + shared_extra_conf={}, + worker_extra_conf="", + ), + "client_reader": WorkerTemplate( + listener_resources=["client"], + endpoint_patterns=[ "^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$", @@ -178,12 +187,12 @@ "^/_matrix/client/(r0|v3|unstable)/capabilities$", "^/_matrix/client/(r0|v3|unstable)/notifications$", ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "federation_reader": { - "listener_resources": ["federation"], - "endpoint_patterns": [ + shared_extra_conf={}, + worker_extra_conf="", + ), + "federation_reader": WorkerTemplate( + listener_resources=["federation"], + endpoint_patterns=[ "^/_matrix/federation/(v1|v2)/event/", "^/_matrix/federation/(v1|v2)/state/", "^/_matrix/federation/(v1|v2)/state_ids/", @@ -204,32 +213,32 @@ "^/_matrix/federation/(v1|v2)/get_groups_publicised$", "^/_matrix/key/v2/query", ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "federation_inbound": { - "listener_resources": ["federation"], - "endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "event_persister": { - "listener_resources": ["replication"], - "endpoint_patterns": [], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "background_worker": { - "listener_resources": [], - "endpoint_patterns": [], + shared_extra_conf={}, + worker_extra_conf="", + ), + "federation_inbound": WorkerTemplate( + listener_resources=["federation"], + endpoint_patterns=["/_matrix/federation/(v1|v2)/send/"], + shared_extra_conf={}, + worker_extra_conf="", + ), + "event_persister": WorkerTemplate( + listener_resources=["replication"], + endpoint_patterns=[], + shared_extra_conf={}, + worker_extra_conf="", + ), + "background_worker": WorkerTemplate( + listener_resources=[], + endpoint_patterns=[], # This worker cannot be sharded. Therefore, there should only ever be one # background worker. This is enforced for the safety of your database. - "shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME}, - "worker_extra_conf": "", - }, - "event_creator": { - "listener_resources": ["client"], - "endpoint_patterns": [ + shared_extra_conf={"run_background_tasks_on": WORKER_PLACEHOLDER_NAME}, + worker_extra_conf="", + ), + "event_creator": WorkerTemplate( + listener_resources=["client"], + endpoint_patterns=[ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$", @@ -237,53 +246,51 @@ "^/_matrix/client/(api/v1|r0|v3|unstable)/knock/", "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/", ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "frontend_proxy": { - "listener_resources": ["client", "replication"], - "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "account_data": { - "listener_resources": ["client", "replication"], - "endpoint_patterns": [ + shared_extra_conf={}, + worker_extra_conf="", + ), + "frontend_proxy": WorkerTemplate( + listener_resources=["client", "replication"], + endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], + shared_extra_conf={}, + worker_extra_conf="", + ), + "account_data": WorkerTemplate( + listener_resources=["client", "replication"], + endpoint_patterns=[ "^/_matrix/client/(r0|v3|unstable)/.*/tags", "^/_matrix/client/(r0|v3|unstable)/.*/account_data", ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "presence": { - "listener_resources": ["client", "replication"], - "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "receipts": { - "listener_resources": ["client", "replication"], - "endpoint_patterns": [ + shared_extra_conf={}, + worker_extra_conf="", + ), + "presence": WorkerTemplate( + listener_resources=["client", "replication"], + endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"], + shared_extra_conf={}, + worker_extra_conf="", + ), + "receipts": WorkerTemplate( + listener_resources=["client", "replication"], + endpoint_patterns=[ "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt", "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers", ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "to_device": { - "listener_resources": ["client", "replication"], - "endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "typing": { - "listener_resources": ["client", "replication"], - "endpoint_patterns": [ - "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing" - ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, + shared_extra_conf={}, + worker_extra_conf="", + ), + "to_device": WorkerTemplate( + listener_resources=["client", "replication"], + endpoint_patterns=["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"], + shared_extra_conf={}, + worker_extra_conf="", + ), + "typing": WorkerTemplate( + listener_resources=["client", "replication"], + endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"], + shared_extra_conf={}, + worker_extra_conf="", + ), } # Templates for sections that may be inserted multiple times in config files @@ -425,54 +432,59 @@ def add_worker_roles_to_shared_config( def merge_worker_template_configs( - existing_dict: Optional[Dict[str, Any]], - to_be_merged_dict: Dict[str, Any], -) -> Dict[str, Any]: + existing_template: WorkerTemplate, + to_be_merged_template: WorkerTemplate, +) -> WorkerTemplate: """When given an existing dict of worker template configuration consisting with both dicts and lists, merge new template data from WORKERS_CONFIG(or create) and return new dict. Args: - existing_dict: Either an existing worker template or a fresh blank one. - to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into + existing_template: Either an existing worker template or a fresh blank one. + to_be_merged_template: The template from WORKERS_CONFIGS to be merged into existing_dict. Returns: The newly merged together dict values. """ - new_dict: Dict[str, Any] = {} - if not existing_dict: - # It doesn't exist yet, just use the new dict(but take a copy not a reference) - new_dict = to_be_merged_dict.copy() - else: - for i in to_be_merged_dict.keys(): - if (i == "endpoint_patterns") or (i == "listener_resources"): - # merge the two lists, remove duplicates - new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i])) - elif i == "shared_extra_conf": - # merge dictionary's, the worker name will be replaced later - new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]} - elif i == "worker_extra_conf": - # There is only one worker type that has a 'worker_extra_conf' and it is - # the media_repo. Since duplicate worker types on the same worker don't - # work, this is fine. - new_dict[i] = existing_dict[i] + to_be_merged_dict[i] - else: - # Everything else should be identical, like "app", which only works - # because all apps are now generic_workers. - new_dict[i] = to_be_merged_dict[i] - return new_dict + # copy existing_template without any replacements + new_template: WorkerTemplate = dataclasses.replace(existing_template) + + # merge the two lists, remove duplicates + new_template.listener_resources = list( + set(new_template.listener_resources + to_be_merged_template.listener_resources) + ) + + # merge the two lists, remove duplicates + new_template.endpoint_patterns = list( + set(new_template.endpoint_patterns + to_be_merged_template.endpoint_patterns) + ) + + # merge dictionaries; the worker name will be replaced later + new_template.shared_extra_conf = { + **new_template.shared_extra_conf, + **to_be_merged_template.shared_extra_conf, + } + + # There is only one worker type that has a 'worker_extra_conf' and it is + # the media_repo. Since duplicate worker types on the same worker don't + # work, this is fine. + new_template.worker_extra_conf = ( + new_template.worker_extra_conf + to_be_merged_template.worker_extra_conf + ) + + return new_template def insert_worker_name_for_worker_config( - existing_dict: Dict[str, Any], worker_name: str + existing_template: WorkerTemplate, worker_name: str ) -> Dict[str, Any]: """Insert a given worker name into the worker's configuration dict. Args: - existing_dict: The worker_config dict that is imported into shared_config. + existing_template: The WorkerTemplate that is imported into shared_config. worker_name: The name of the worker to insert. Returns: Copy of the dict with newly inserted worker name """ - dict_to_edit = existing_dict.copy() + dict_to_edit = dataclasses.asdict(existing_template) for k, v in dict_to_edit["shared_extra_conf"].items(): # Only proceed if it's the placeholder name string if v == WORKER_PLACEHOLDER_NAME: @@ -793,27 +805,27 @@ def generate_worker_files( # Map locations to upstreams (corresponding to worker types) in Nginx # but only if we use the appropriate worker type for worker_type in all_worker_types_in_use: - for endpoint_pattern in WORKERS_CONFIG[worker_type]["endpoint_patterns"]: + for endpoint_pattern in WORKERS_CONFIG[worker_type].endpoint_patterns: nginx_locations[endpoint_pattern] = f"http://{worker_type}" # For each worker type specified by the user, create config values and write it's # yaml config file for worker_name, worker_types_set in requested_worker_types.items(): # The collected and processed data will live here. - worker_config: Dict[str, Any] = {} + worker_template: WorkerTemplate = WorkerTemplate() # Merge all worker config templates for this worker into a single config for worker_type in worker_types_set: - copy_of_template_config = WORKERS_CONFIG[worker_type].copy() - # Merge worker type template configuration data. It's a combination of lists # and dicts, so use this helper. - worker_config = merge_worker_template_configs( - worker_config, copy_of_template_config + worker_template = merge_worker_template_configs( + worker_template, WORKERS_CONFIG[worker_type] ) # Replace placeholder names in the config template with the actual worker name. - worker_config = insert_worker_name_for_worker_config(worker_config, worker_name) + worker_config: Dict[str, Any] = insert_worker_name_for_worker_config( + worker_template, worker_name + ) worker_config.update( {"name": worker_name, "port": str(worker_port), "config_path": config_path} From ba3b6a4dfd3905165e5e30da6d552ca02de0bb7b Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 16 Nov 2023 15:05:39 +0000 Subject: [PATCH 03/20] Use a lambda for the worker name rather than search and replace later --- docker/configure_workers_and_start.py | 61 +++++++++++++-------------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 8329aab6dca..a4dd16018ff 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -59,6 +59,7 @@ from pathlib import Path from typing import ( Any, + Callable, Dict, List, Mapping, @@ -80,10 +81,6 @@ MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock" MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock" -# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced -# during processing with the name of the worker. -WORKER_PLACEHOLDER_NAME = "placeholder_name" - # Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources # Watching /_matrix/client needs a "client" listener @@ -95,7 +92,8 @@ class WorkerTemplate: listener_resources: List[str] = field(default_factory=list) endpoint_patterns: List[str] = field(default_factory=list) - shared_extra_conf: Dict[str, Any] = field(default_factory=dict) + # (worker_name) -> {} + shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {} worker_extra_conf: str = "" @@ -103,7 +101,7 @@ class WorkerTemplate: "pusher": WorkerTemplate( listener_resources=[], endpoint_patterns=[], - shared_extra_conf={}, + shared_extra_conf=lambda _worker_name: {}, worker_extra_conf="", ), "user_dir": WorkerTemplate( @@ -111,8 +109,8 @@ class WorkerTemplate: endpoint_patterns=[ "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$" ], - shared_extra_conf={ - "update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME + shared_extra_conf=lambda worker_name: { + "update_user_directory_from_worker": worker_name }, worker_extra_conf="", ), @@ -127,22 +125,24 @@ class WorkerTemplate: "^/_synapse/admin/v1/quarantine_media/.*$", ], # The first configured media worker will run the media background jobs - shared_extra_conf={ + shared_extra_conf=lambda worker_name: { "enable_media_repo": False, - "media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME, + "media_instance_running_background_jobs": worker_name, }, worker_extra_conf="enable_media_repo: true", ), "appservice": WorkerTemplate( listener_resources=[], endpoint_patterns=[], - shared_extra_conf={"notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME}, + shared_extra_conf=lambda worker_name: { + "notify_appservices_from_worker": worker_name + }, worker_extra_conf="", ), "federation_sender": WorkerTemplate( listener_resources=[], endpoint_patterns=[], - shared_extra_conf={}, + shared_extra_conf=lambda _worker_name: {}, worker_extra_conf="", ), "synchrotron": WorkerTemplate( @@ -153,7 +153,7 @@ class WorkerTemplate: "^/_matrix/client/(api/v1|r0|v3)/initialSync$", "^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$", ], - shared_extra_conf={}, + shared_extra_conf=lambda _worker_name: {}, worker_extra_conf="", ), "client_reader": WorkerTemplate( @@ -187,7 +187,7 @@ class WorkerTemplate: "^/_matrix/client/(r0|v3|unstable)/capabilities$", "^/_matrix/client/(r0|v3|unstable)/notifications$", ], - shared_extra_conf={}, + shared_extra_conf=lambda _worker_name: {}, worker_extra_conf="", ), "federation_reader": WorkerTemplate( @@ -213,19 +213,19 @@ class WorkerTemplate: "^/_matrix/federation/(v1|v2)/get_groups_publicised$", "^/_matrix/key/v2/query", ], - shared_extra_conf={}, + shared_extra_conf=lambda _worker_name: {}, worker_extra_conf="", ), "federation_inbound": WorkerTemplate( listener_resources=["federation"], endpoint_patterns=["/_matrix/federation/(v1|v2)/send/"], - shared_extra_conf={}, + shared_extra_conf=lambda _worker_name: {}, worker_extra_conf="", ), "event_persister": WorkerTemplate( listener_resources=["replication"], endpoint_patterns=[], - shared_extra_conf={}, + shared_extra_conf=lambda _worker_name: {}, worker_extra_conf="", ), "background_worker": WorkerTemplate( @@ -233,7 +233,7 @@ class WorkerTemplate: endpoint_patterns=[], # This worker cannot be sharded. Therefore, there should only ever be one # background worker. This is enforced for the safety of your database. - shared_extra_conf={"run_background_tasks_on": WORKER_PLACEHOLDER_NAME}, + shared_extra_conf=lambda worker_name: {"run_background_tasks_on": worker_name}, worker_extra_conf="", ), "event_creator": WorkerTemplate( @@ -246,13 +246,13 @@ class WorkerTemplate: "^/_matrix/client/(api/v1|r0|v3|unstable)/knock/", "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/", ], - shared_extra_conf={}, + shared_extra_conf=lambda _worker_name: {}, worker_extra_conf="", ), "frontend_proxy": WorkerTemplate( listener_resources=["client", "replication"], endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], - shared_extra_conf={}, + shared_extra_conf=lambda _worker_name: {}, worker_extra_conf="", ), "account_data": WorkerTemplate( @@ -261,13 +261,13 @@ class WorkerTemplate: "^/_matrix/client/(r0|v3|unstable)/.*/tags", "^/_matrix/client/(r0|v3|unstable)/.*/account_data", ], - shared_extra_conf={}, + shared_extra_conf=lambda _worker_name: {}, worker_extra_conf="", ), "presence": WorkerTemplate( listener_resources=["client", "replication"], endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"], - shared_extra_conf={}, + shared_extra_conf=lambda _worker_name: {}, worker_extra_conf="", ), "receipts": WorkerTemplate( @@ -276,19 +276,19 @@ class WorkerTemplate: "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt", "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers", ], - shared_extra_conf={}, + shared_extra_conf=lambda _worker_name: {}, worker_extra_conf="", ), "to_device": WorkerTemplate( listener_resources=["client", "replication"], endpoint_patterns=["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"], - shared_extra_conf={}, + shared_extra_conf=lambda _worker_name: {}, worker_extra_conf="", ), "typing": WorkerTemplate( listener_resources=["client", "replication"], endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"], - shared_extra_conf={}, + shared_extra_conf=lambda _worker_name: {}, worker_extra_conf="", ), } @@ -459,9 +459,9 @@ def merge_worker_template_configs( ) # merge dictionaries; the worker name will be replaced later - new_template.shared_extra_conf = { - **new_template.shared_extra_conf, - **to_be_merged_template.shared_extra_conf, + new_template.shared_extra_conf = lambda worker_name: { + **new_template.shared_extra_conf(worker_name), + **to_be_merged_template.shared_extra_conf(worker_name), } # There is only one worker type that has a 'worker_extra_conf' and it is @@ -485,10 +485,7 @@ def insert_worker_name_for_worker_config( Returns: Copy of the dict with newly inserted worker name """ dict_to_edit = dataclasses.asdict(existing_template) - for k, v in dict_to_edit["shared_extra_conf"].items(): - # Only proceed if it's the placeholder name string - if v == WORKER_PLACEHOLDER_NAME: - dict_to_edit["shared_extra_conf"][k] = worker_name + dict_to_edit["shared_extra_conf"] = existing_template.shared_extra_conf(worker_name) return dict_to_edit From 67d4fc8b996bf487b3cc7ed396c95c9509c5ed7f Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 16 Nov 2023 15:07:13 +0000 Subject: [PATCH 04/20] Collapse WORKERS_CONFIG by removing entries with defaults --- docker/configure_workers_and_start.py | 46 ++------------------------- 1 file changed, 2 insertions(+), 44 deletions(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index a4dd16018ff..b6428601ccd 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -98,12 +98,7 @@ class WorkerTemplate: WORKERS_CONFIG: Dict[str, WorkerTemplate] = { - "pusher": WorkerTemplate( - listener_resources=[], - endpoint_patterns=[], - shared_extra_conf=lambda _worker_name: {}, - worker_extra_conf="", - ), + "pusher": WorkerTemplate(), "user_dir": WorkerTemplate( listener_resources=["client"], endpoint_patterns=[ @@ -112,7 +107,6 @@ class WorkerTemplate: shared_extra_conf=lambda worker_name: { "update_user_directory_from_worker": worker_name }, - worker_extra_conf="", ), "media_repository": WorkerTemplate( listener_resources=["media"], @@ -132,19 +126,11 @@ class WorkerTemplate: worker_extra_conf="enable_media_repo: true", ), "appservice": WorkerTemplate( - listener_resources=[], - endpoint_patterns=[], shared_extra_conf=lambda worker_name: { "notify_appservices_from_worker": worker_name }, - worker_extra_conf="", - ), - "federation_sender": WorkerTemplate( - listener_resources=[], - endpoint_patterns=[], - shared_extra_conf=lambda _worker_name: {}, - worker_extra_conf="", ), + "federation_sender": WorkerTemplate(), "synchrotron": WorkerTemplate( listener_resources=["client"], endpoint_patterns=[ @@ -153,8 +139,6 @@ class WorkerTemplate: "^/_matrix/client/(api/v1|r0|v3)/initialSync$", "^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$", ], - shared_extra_conf=lambda _worker_name: {}, - worker_extra_conf="", ), "client_reader": WorkerTemplate( listener_resources=["client"], @@ -187,8 +171,6 @@ class WorkerTemplate: "^/_matrix/client/(r0|v3|unstable)/capabilities$", "^/_matrix/client/(r0|v3|unstable)/notifications$", ], - shared_extra_conf=lambda _worker_name: {}, - worker_extra_conf="", ), "federation_reader": WorkerTemplate( listener_resources=["federation"], @@ -213,28 +195,18 @@ class WorkerTemplate: "^/_matrix/federation/(v1|v2)/get_groups_publicised$", "^/_matrix/key/v2/query", ], - shared_extra_conf=lambda _worker_name: {}, - worker_extra_conf="", ), "federation_inbound": WorkerTemplate( listener_resources=["federation"], endpoint_patterns=["/_matrix/federation/(v1|v2)/send/"], - shared_extra_conf=lambda _worker_name: {}, - worker_extra_conf="", ), "event_persister": WorkerTemplate( listener_resources=["replication"], - endpoint_patterns=[], - shared_extra_conf=lambda _worker_name: {}, - worker_extra_conf="", ), "background_worker": WorkerTemplate( - listener_resources=[], - endpoint_patterns=[], # This worker cannot be sharded. Therefore, there should only ever be one # background worker. This is enforced for the safety of your database. shared_extra_conf=lambda worker_name: {"run_background_tasks_on": worker_name}, - worker_extra_conf="", ), "event_creator": WorkerTemplate( listener_resources=["client"], @@ -246,14 +218,10 @@ class WorkerTemplate: "^/_matrix/client/(api/v1|r0|v3|unstable)/knock/", "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/", ], - shared_extra_conf=lambda _worker_name: {}, - worker_extra_conf="", ), "frontend_proxy": WorkerTemplate( listener_resources=["client", "replication"], endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], - shared_extra_conf=lambda _worker_name: {}, - worker_extra_conf="", ), "account_data": WorkerTemplate( listener_resources=["client", "replication"], @@ -261,14 +229,10 @@ class WorkerTemplate: "^/_matrix/client/(r0|v3|unstable)/.*/tags", "^/_matrix/client/(r0|v3|unstable)/.*/account_data", ], - shared_extra_conf=lambda _worker_name: {}, - worker_extra_conf="", ), "presence": WorkerTemplate( listener_resources=["client", "replication"], endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"], - shared_extra_conf=lambda _worker_name: {}, - worker_extra_conf="", ), "receipts": WorkerTemplate( listener_resources=["client", "replication"], @@ -276,20 +240,14 @@ class WorkerTemplate: "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt", "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers", ], - shared_extra_conf=lambda _worker_name: {}, - worker_extra_conf="", ), "to_device": WorkerTemplate( listener_resources=["client", "replication"], endpoint_patterns=["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"], - shared_extra_conf=lambda _worker_name: {}, - worker_extra_conf="", ), "typing": WorkerTemplate( listener_resources=["client", "replication"], endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"], - shared_extra_conf=lambda _worker_name: {}, - worker_extra_conf="", ), } From 94a85b36f7df8a0bf9a864ca5ca94b136a42d39a Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 16 Nov 2023 15:12:18 +0000 Subject: [PATCH 05/20] Convert listener_resources and endpoint_patterns to Set[str] --- docker/configure_workers_and_start.py | 90 +++++++++++++-------------- 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index b6428601ccd..47292eecc1b 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -90,8 +90,8 @@ # have to attach by instance_map to the master process and have client endpoints. @dataclass class WorkerTemplate: - listener_resources: List[str] = field(default_factory=list) - endpoint_patterns: List[str] = field(default_factory=list) + listener_resources: Set[str] = field(default_factory=set) + endpoint_patterns: Set[str] = field(default_factory=set) # (worker_name) -> {} shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {} worker_extra_conf: str = "" @@ -100,24 +100,24 @@ class WorkerTemplate: WORKERS_CONFIG: Dict[str, WorkerTemplate] = { "pusher": WorkerTemplate(), "user_dir": WorkerTemplate( - listener_resources=["client"], - endpoint_patterns=[ + listener_resources={"client"}, + endpoint_patterns={ "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$" - ], + }, shared_extra_conf=lambda worker_name: { "update_user_directory_from_worker": worker_name }, ), "media_repository": WorkerTemplate( - listener_resources=["media"], - endpoint_patterns=[ + listener_resources={"media"}, + endpoint_patterns={ "^/_matrix/media/", "^/_synapse/admin/v1/purge_media_cache$", "^/_synapse/admin/v1/room/.*/media.*$", "^/_synapse/admin/v1/user/.*/media.*$", "^/_synapse/admin/v1/media/.*$", "^/_synapse/admin/v1/quarantine_media/.*$", - ], + }, # The first configured media worker will run the media background jobs shared_extra_conf=lambda worker_name: { "enable_media_repo": False, @@ -132,17 +132,17 @@ class WorkerTemplate: ), "federation_sender": WorkerTemplate(), "synchrotron": WorkerTemplate( - listener_resources=["client"], - endpoint_patterns=[ + listener_resources={"client"}, + endpoint_patterns={ "^/_matrix/client/(v2_alpha|r0|v3)/sync$", "^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$", "^/_matrix/client/(api/v1|r0|v3)/initialSync$", "^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$", - ], + }, ), "client_reader": WorkerTemplate( - listener_resources=["client"], - endpoint_patterns=[ + listener_resources={"client"}, + endpoint_patterns={ "^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$", @@ -170,11 +170,11 @@ class WorkerTemplate: "^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$", "^/_matrix/client/(r0|v3|unstable)/capabilities$", "^/_matrix/client/(r0|v3|unstable)/notifications$", - ], + }, ), "federation_reader": WorkerTemplate( - listener_resources=["federation"], - endpoint_patterns=[ + listener_resources={"federation"}, + endpoint_patterns={ "^/_matrix/federation/(v1|v2)/event/", "^/_matrix/federation/(v1|v2)/state/", "^/_matrix/federation/(v1|v2)/state_ids/", @@ -194,14 +194,14 @@ class WorkerTemplate: "^/_matrix/federation/(v1|v2)/user/devices/", "^/_matrix/federation/(v1|v2)/get_groups_publicised$", "^/_matrix/key/v2/query", - ], + }, ), "federation_inbound": WorkerTemplate( - listener_resources=["federation"], - endpoint_patterns=["/_matrix/federation/(v1|v2)/send/"], + listener_resources={"federation"}, + endpoint_patterns={"/_matrix/federation/(v1|v2)/send/"}, ), "event_persister": WorkerTemplate( - listener_resources=["replication"], + listener_resources={"replication"}, ), "background_worker": WorkerTemplate( # This worker cannot be sharded. Therefore, there should only ever be one @@ -209,45 +209,45 @@ class WorkerTemplate: shared_extra_conf=lambda worker_name: {"run_background_tasks_on": worker_name}, ), "event_creator": WorkerTemplate( - listener_resources=["client"], - endpoint_patterns=[ + listener_resources={"client"}, + endpoint_patterns={ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$", "^/_matrix/client/(api/v1|r0|v3|unstable)/join/", "^/_matrix/client/(api/v1|r0|v3|unstable)/knock/", "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/", - ], + }, ), "frontend_proxy": WorkerTemplate( - listener_resources=["client", "replication"], - endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], + listener_resources={"client", "replication"}, + endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"}, ), "account_data": WorkerTemplate( - listener_resources=["client", "replication"], - endpoint_patterns=[ + listener_resources={"client", "replication"}, + endpoint_patterns={ "^/_matrix/client/(r0|v3|unstable)/.*/tags", "^/_matrix/client/(r0|v3|unstable)/.*/account_data", - ], + }, ), "presence": WorkerTemplate( - listener_resources=["client", "replication"], - endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"], + listener_resources={"client", "replication"}, + endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"}, ), "receipts": WorkerTemplate( - listener_resources=["client", "replication"], - endpoint_patterns=[ + listener_resources={"client", "replication"}, + endpoint_patterns={ "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt", "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers", - ], + }, ), "to_device": WorkerTemplate( - listener_resources=["client", "replication"], - endpoint_patterns=["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"], + listener_resources={"client", "replication"}, + endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"}, ), "typing": WorkerTemplate( - listener_resources=["client", "replication"], - endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"], + listener_resources={"client", "replication"}, + endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"}, ), } @@ -406,15 +406,11 @@ def merge_worker_template_configs( # copy existing_template without any replacements new_template: WorkerTemplate = dataclasses.replace(existing_template) - # merge the two lists, remove duplicates - new_template.listener_resources = list( - set(new_template.listener_resources + to_be_merged_template.listener_resources) - ) + # add listener resources from the other set + new_template.listener_resources |= to_be_merged_template.listener_resources - # merge the two lists, remove duplicates - new_template.endpoint_patterns = list( - set(new_template.endpoint_patterns + to_be_merged_template.endpoint_patterns) - ) + # add endpoint patterns from the other set + new_template.endpoint_patterns |= to_be_merged_template.endpoint_patterns # merge dictionaries; the worker name will be replaced later new_template.shared_extra_conf = lambda worker_name: { @@ -444,6 +440,8 @@ def insert_worker_name_for_worker_config( """ dict_to_edit = dataclasses.asdict(existing_template) dict_to_edit["shared_extra_conf"] = existing_template.shared_extra_conf(worker_name) + dict_to_edit["endpoint_patterns"] = sorted(existing_template.endpoint_patterns) + dict_to_edit["listener_resources"] = sorted(existing_template.listener_resources) return dict_to_edit @@ -760,7 +758,7 @@ def generate_worker_files( # Map locations to upstreams (corresponding to worker types) in Nginx # but only if we use the appropriate worker type for worker_type in all_worker_types_in_use: - for endpoint_pattern in WORKERS_CONFIG[worker_type].endpoint_patterns: + for endpoint_pattern in sorted(WORKERS_CONFIG[worker_type].endpoint_patterns): nginx_locations[endpoint_pattern] = f"http://{worker_type}" # For each worker type specified by the user, create config values and write it's From 26073fa77889836833e870ff1d0847638fe4ebe8 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 16 Nov 2023 15:26:11 +0000 Subject: [PATCH 06/20] Tweak comments --- docker/configure_workers_and_start.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 47292eecc1b..98958d52afa 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -82,21 +82,21 @@ MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock" -# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources -# Watching /_matrix/client needs a "client" listener -# Watching /_matrix/federation needs a "federation" listener -# Watching /_matrix/media and related needs a "media" listener -# Stream Writers require "client" and "replication" listeners because they -# have to attach by instance_map to the master process and have client endpoints. @dataclass class WorkerTemplate: listener_resources: Set[str] = field(default_factory=set) endpoint_patterns: Set[str] = field(default_factory=set) - # (worker_name) -> {} + # (worker_name) -> {config} shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {} worker_extra_conf: str = "" +# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources +# Watching /_matrix/client needs a "client" listener +# Watching /_matrix/federation needs a "federation" listener +# Watching /_matrix/media and related needs a "media" listener +# Stream Writers require "client" and "replication" listeners because they +# have to attach by instance_map to the master process and have client endpoints. WORKERS_CONFIG: Dict[str, WorkerTemplate] = { "pusher": WorkerTemplate(), "user_dir": WorkerTemplate( From 8b7463957fa6642c5a6fd2a6685b00cc7acfd35c Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 16 Nov 2023 15:26:17 +0000 Subject: [PATCH 07/20] Add `merge_into` --- docker/configure_workers_and_start.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 98958d52afa..5c15b4b9fde 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -283,6 +283,31 @@ def flush_buffers() -> None: sys.stderr.flush() +def merge_into(dest: Any, new: Any) -> None: + """ + Merges `new` into `dest` with the following rules: + + - dicts: values with the same key will be merged recursively + - lists: `new` will be appended to `dest` + - primitives: they will be checked for equality and inequality will result + in a ValueError + + It is an error for `dest` and `new` to be of different types. + """ + if isinstance(dest, dict) and isinstance(new, dict): + for k, v in new.items(): + if k in dest: + merge_into(dest[k], v) + else: + dest[k] = v + elif isinstance(dest, list) and isinstance(new, list): + dest.extend(new) + elif type(dest) != type(new): + raise TypeError(f"Cannot merge {type(dest).__name__} and {type(new).__name__}") + elif dest != new: + raise ValueError(f"Cannot merge primitive values: {dest!r} != {new!r}") + + def convert(src: str, dst: str, **template_vars: object) -> None: """Generate a file from a template From f38297b619d527c7c77d32d8877b1184db759977 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 16 Nov 2023 15:47:28 +0000 Subject: [PATCH 08/20] Remove special logic for adding stream_writers: just make it part of the extra config template --- docker/configure_workers_and_start.py | 120 ++++++++++++-------------- 1 file changed, 53 insertions(+), 67 deletions(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 5c15b4b9fde..548392ccb0a 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -98,7 +98,11 @@ class WorkerTemplate: # Stream Writers require "client" and "replication" listeners because they # have to attach by instance_map to the master process and have client endpoints. WORKERS_CONFIG: Dict[str, WorkerTemplate] = { - "pusher": WorkerTemplate(), + "pusher": WorkerTemplate( + shared_extra_conf=lambda worker_name: { + "pusher_instances": [worker_name], + } + ), "user_dir": WorkerTemplate( listener_resources={"client"}, endpoint_patterns={ @@ -130,7 +134,11 @@ class WorkerTemplate: "notify_appservices_from_worker": worker_name }, ), - "federation_sender": WorkerTemplate(), + "federation_sender": WorkerTemplate( + shared_extra_conf=lambda worker_name: { + "federation_sender_instances": [worker_name], + } + ), "synchrotron": WorkerTemplate( listener_resources={"client"}, endpoint_patterns={ @@ -202,6 +210,9 @@ class WorkerTemplate: ), "event_persister": WorkerTemplate( listener_resources={"replication"}, + shared_extra_conf=lambda worker_name: { + "stream_writers": {"events": [worker_name]} + }, ), "background_worker": WorkerTemplate( # This worker cannot be sharded. Therefore, there should only ever be one @@ -229,10 +240,16 @@ class WorkerTemplate: "^/_matrix/client/(r0|v3|unstable)/.*/tags", "^/_matrix/client/(r0|v3|unstable)/.*/account_data", }, + shared_extra_conf=lambda worker_name: { + "stream_writers": {"account_data": [worker_name]} + }, ), "presence": WorkerTemplate( listener_resources={"client", "replication"}, endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"}, + shared_extra_conf=lambda worker_name: { + "stream_writers": {"presence": [worker_name]} + }, ), "receipts": WorkerTemplate( listener_resources={"client", "replication"}, @@ -240,14 +257,23 @@ class WorkerTemplate: "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt", "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers", }, + shared_extra_conf=lambda worker_name: { + "stream_writers": {"receipts": [worker_name]} + }, ), "to_device": WorkerTemplate( listener_resources={"client", "replication"}, endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"}, + shared_extra_conf=lambda worker_name: { + "stream_writers": {"to_device": [worker_name]} + }, ), "typing": WorkerTemplate( listener_resources={"client", "replication"}, endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"}, + shared_extra_conf=lambda worker_name: { + "stream_writers": {"typing": [worker_name]} + }, ), } @@ -308,6 +334,14 @@ def merge_into(dest: Any, new: Any) -> None: raise ValueError(f"Cannot merge primitive values: {dest!r} != {new!r}") +def merged(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]: + """ + Merges `b` into `a` and returns `a`. + """ + merge_into(a, b) + return a + + def convert(src: str, dst: str, **template_vars: object) -> None: """Generate a file from a template @@ -338,7 +372,6 @@ def convert(src: str, dst: str, **template_vars: object) -> None: def add_worker_roles_to_shared_config( shared_config: dict, - worker_types_set: Set[str], worker_name: str, worker_port: int, ) -> None: @@ -348,8 +381,6 @@ def add_worker_roles_to_shared_config( Args: shared_config: The config dict that all worker instances share (after being converted to YAML) - worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG). - This list can be a single worker type or multiple. worker_name: The name of the worker instance. worker_port: The HTTP replication port that the worker instance is listening on. """ @@ -357,61 +388,18 @@ def add_worker_roles_to_shared_config( # streams instance_map = shared_config.setdefault("instance_map", {}) - # This is a list of the stream_writers that there can be only one of. Events can be - # sharded, and therefore doesn't belong here. - singular_stream_writers = [ - "account_data", - "presence", - "receipts", - "to_device", - "typing", - ] - - # Worker-type specific sharding config. Now a single worker can fulfill multiple - # roles, check each. - if "pusher" in worker_types_set: - shared_config.setdefault("pusher_instances", []).append(worker_name) - - if "federation_sender" in worker_types_set: - shared_config.setdefault("federation_sender_instances", []).append(worker_name) - - if "event_persister" in worker_types_set: - # Event persisters write to the events stream, so we need to update - # the list of event stream writers - shared_config.setdefault("stream_writers", {}).setdefault("events", []).append( - worker_name - ) - - # Map of stream writer instance names to host/ports combos - if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False): - instance_map[worker_name] = { - "path": f"/run/worker.{worker_port}", - } - else: - instance_map[worker_name] = { - "host": "localhost", - "port": worker_port, - } - # Update the list of stream writers. It's convenient that the name of the worker - # type is the same as the stream to write. Iterate over the whole list in case there - # is more than one. - for worker in worker_types_set: - if worker in singular_stream_writers: - shared_config.setdefault("stream_writers", {}).setdefault( - worker, [] - ).append(worker_name) - - # Map of stream writer instance names to host/ports combos - # For now, all stream writers need http replication ports - if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False): - instance_map[worker_name] = { - "path": f"/run/worker.{worker_port}", - } - else: - instance_map[worker_name] = { - "host": "localhost", - "port": worker_port, - } + # Add all workers to the `instance_map` + # Technically only certain types of workers, such as stream writers, are needed + # here but it is simpler just to be consistent. + if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False): + instance_map[worker_name] = { + "path": f"/run/worker.{worker_port}", + } + else: + instance_map[worker_name] = { + "host": "localhost", + "port": worker_port, + } def merge_worker_template_configs( @@ -438,10 +426,10 @@ def merge_worker_template_configs( new_template.endpoint_patterns |= to_be_merged_template.endpoint_patterns # merge dictionaries; the worker name will be replaced later - new_template.shared_extra_conf = lambda worker_name: { - **new_template.shared_extra_conf(worker_name), - **to_be_merged_template.shared_extra_conf(worker_name), - } + new_template.shared_extra_conf = lambda worker_name: merged( + existing_template.shared_extra_conf(worker_name), + to_be_merged_template.shared_extra_conf(worker_name), + ) # There is only one worker type that has a 'worker_extra_conf' and it is # the media_repo. Since duplicate worker types on the same worker don't @@ -821,9 +809,7 @@ def generate_worker_files( healthcheck_urls.append("http://localhost:%d/health" % (worker_port,)) # Update the shared config with sharding-related options if necessary - add_worker_roles_to_shared_config( - shared_config, worker_types_set, worker_name, worker_port - ) + add_worker_roles_to_shared_config(shared_config, worker_name, worker_port) # Enable the worker in supervisord worker_descriptors.append(worker_config) From 7d8824e2dc7178375fc776a27f4eeea69ea97428 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 16 Nov 2023 15:49:20 +0000 Subject: [PATCH 09/20] Rename function to add_worker_to_instance_map given reduction of scope --- docker/configure_workers_and_start.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 548392ccb0a..f3e5a389743 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -370,13 +370,13 @@ def convert(src: str, dst: str, **template_vars: object) -> None: outfile.write(rendered) -def add_worker_roles_to_shared_config( +def add_worker_to_instance_map( shared_config: dict, worker_name: str, worker_port: int, ) -> None: - """Given a dictionary representing a config file shared across all workers, - append appropriate worker information to it for the current worker_type instance. + """ + Update the shared config map to add the worker in the instance_map. Args: shared_config: The config dict that all worker instances share (after being @@ -384,13 +384,8 @@ def add_worker_roles_to_shared_config( worker_name: The name of the worker instance. worker_port: The HTTP replication port that the worker instance is listening on. """ - # The instance_map config field marks the workers that write to various replication - # streams instance_map = shared_config.setdefault("instance_map", {}) - # Add all workers to the `instance_map` - # Technically only certain types of workers, such as stream writers, are needed - # here but it is simpler just to be consistent. if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False): instance_map[worker_name] = { "path": f"/run/worker.{worker_port}", @@ -808,8 +803,10 @@ def generate_worker_files( else: healthcheck_urls.append("http://localhost:%d/health" % (worker_port,)) - # Update the shared config with sharding-related options if necessary - add_worker_roles_to_shared_config(shared_config, worker_name, worker_port) + # Add all workers to the `instance_map` + # Technically only certain types of workers, such as stream writers, are needed + # here but it is simpler just to be consistent. + add_worker_to_instance_map(shared_config, worker_name, worker_port) # Enable the worker in supervisord worker_descriptors.append(worker_config) From f49dbc7ba7e125d0891ea30617b74230ade1a242 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 16 Nov 2023 15:52:52 +0000 Subject: [PATCH 10/20] Add `sharding_allowed` to the WorkerTemplate rather than having a separate function for that --- docker/configure_workers_and_start.py | 28 ++++++++++----------------- 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index f3e5a389743..3d5fe3a73ca 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -90,6 +90,9 @@ class WorkerTemplate: shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {} worker_extra_conf: str = "" + # True if and only if multiple of this worker type are allowed. + sharding_allowed: bool = True + # Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources # Watching /_matrix/client needs a "client" listener @@ -218,6 +221,7 @@ class WorkerTemplate: # This worker cannot be sharded. Therefore, there should only ever be one # background worker. This is enforced for the safety of your database. shared_extra_conf=lambda worker_name: {"run_background_tasks_on": worker_name}, + sharding_allowed=False, ), "event_creator": WorkerTemplate( listener_resources={"client"}, @@ -243,6 +247,7 @@ class WorkerTemplate: shared_extra_conf=lambda worker_name: { "stream_writers": {"account_data": [worker_name]} }, + sharding_allowed=False, ), "presence": WorkerTemplate( listener_resources={"client", "replication"}, @@ -250,6 +255,7 @@ class WorkerTemplate: shared_extra_conf=lambda worker_name: { "stream_writers": {"presence": [worker_name]} }, + sharding_allowed=False, ), "receipts": WorkerTemplate( listener_resources={"client", "replication"}, @@ -260,6 +266,7 @@ class WorkerTemplate: shared_extra_conf=lambda worker_name: { "stream_writers": {"receipts": [worker_name]} }, + sharding_allowed=False, ), "to_device": WorkerTemplate( listener_resources={"client", "replication"}, @@ -267,6 +274,7 @@ class WorkerTemplate: shared_extra_conf=lambda worker_name: { "stream_writers": {"to_device": [worker_name]} }, + sharding_allowed=False, ), "typing": WorkerTemplate( listener_resources={"client", "replication"}, @@ -274,6 +282,7 @@ class WorkerTemplate: shared_extra_conf=lambda worker_name: { "stream_writers": {"typing": [worker_name]} }, + sharding_allowed=False, ), } @@ -495,23 +504,6 @@ def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]: return new_worker_types -def is_sharding_allowed_for_worker_type(worker_type: str) -> bool: - """Helper to check to make sure worker types that cannot have multiples do not. - - Args: - worker_type: The type of worker to check against. - Returns: True if allowed, False if not - """ - return worker_type not in [ - "background_worker", - "account_data", - "presence", - "receipts", - "typing", - "to_device", - ] - - def split_and_strip_string( given_string: str, split_char: str, max_split: SupportsIndex = -1 ) -> List[str]: @@ -637,7 +629,7 @@ def parse_worker_types( ) if worker_type in worker_type_shard_counter: - if not is_sharding_allowed_for_worker_type(worker_type): + if not WORKERS_CONFIG[worker_type].sharding_allowed: error( f"There can be only a single worker with {worker_type} " "type. Please recount and remove." From 3bb21a9c265f24c5855c72aac3cb222b8fb3daff Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 16 Nov 2023 15:55:37 +0000 Subject: [PATCH 11/20] Use `merge_into` when adding workers to the shared config --- docker/configure_workers_and_start.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 3d5fe3a73ca..a9f48311947 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -784,10 +784,9 @@ def generate_worker_files( {"name": worker_name, "port": str(worker_port), "config_path": config_path} ) - # Update the shared config with any worker_type specific options. The first of a - # given worker_type needs to stay assigned and not be replaced. - worker_config["shared_extra_conf"].update(shared_config) - shared_config = worker_config["shared_extra_conf"] + # Update the shared config with any options needed to enable this worker. + merge_into(shared_config, worker_config["shared_extra_conf"]) + if using_unix_sockets: healthcheck_urls.append( f"--unix-socket /run/worker.{worker_port} http://localhost/health" From fbafde81a1c55667b57447ed8882af81aff271b6 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 16 Nov 2023 16:07:05 +0000 Subject: [PATCH 12/20] Promote mark_filepath to constant --- docker/configure_workers_and_start.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index a9f48311947..8fd11935015 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -81,6 +81,10 @@ MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock" MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock" +# We place a file at this path to indicate that the script has already been +# run and should not be run again. +MARKER_FILE_PATH = "/conf/workers_have_been_configured" + @dataclass class WorkerTemplate: @@ -980,8 +984,8 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None: log("Base homeserver config exists—not regenerating") # This script may be run multiple times (mostly by Complement, see note at top of # file). Don't re-configure workers in this instance. - mark_filepath = "/conf/workers_have_been_configured" - if not os.path.exists(mark_filepath): + + if not os.path.exists(MARKER_FILE_PATH): # Collect and validate worker_type requests # Read the desired worker configuration from the environment worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip() @@ -1000,7 +1004,7 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None: generate_worker_files(environ, config_path, data_dir, requested_worker_types) # Mark workers as being configured - with open(mark_filepath, "w") as f: + with open(MARKER_FILE_PATH, "w") as f: f.write("") else: log("Worker config exists—not regenerating") From 321d3590feb533504337edfbd1905b9382de34a7 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 16 Nov 2023 16:07:29 +0000 Subject: [PATCH 13/20] Add a --generate-only option --- docker/configure_workers_and_start.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 8fd11935015..f936a71cc3b 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -53,6 +53,7 @@ import re import subprocess import sys +from argparse import ArgumentParser from collections import defaultdict from dataclasses import dataclass, field from itertools import chain @@ -968,6 +969,14 @@ def generate_worker_log_config( def main(args: List[str], environ: MutableMapping[str, str]) -> None: + parser = ArgumentParser() + parser.add_argument( + "--generate-only", + action="store_true", + help="Only generate configuration; don't run Synapse.", + ) + opts = parser.parse_args(args) + config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data") config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml") data_dir = environ.get("SYNAPSE_DATA_DIR", "/data") @@ -1009,6 +1018,10 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None: else: log("Worker config exists—not regenerating") + if opts.generate_only: + log("--generate-only: won't run Synapse") + return + # Lifted right out of start.py jemallocpath = "/usr/lib/%s-linux-gnu/libjemalloc.so.2" % (platform.machine(),) @@ -1031,4 +1044,4 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None: if __name__ == "__main__": - main(sys.argv, os.environ) + main(sys.argv[1:], os.environ) From 259a808dc2c63dc37f363ab6123ab1dc95936ca0 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 6 Dec 2023 15:14:41 +0000 Subject: [PATCH 14/20] Docstring on WorkerTemplate --- docker/configure_workers_and_start.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index f936a71cc3b..89b77d44fdc 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -89,6 +89,14 @@ @dataclass class WorkerTemplate: + """ + A definition of individual settings for a specific worker type. + A worker name can be fed into the template in order to generate a config. + + These worker templates can be merged with `merge_worker_template_configs` + in order for a single worker to be made from multiple templates. + """ + listener_resources: Set[str] = field(default_factory=set) endpoint_patterns: Set[str] = field(default_factory=set) # (worker_name) -> {config} From 3a46cf08e9fdae2d9cbb4a81bb76abfb02994442 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 6 Dec 2023 15:14:58 +0000 Subject: [PATCH 15/20] Fix comment and mutation bug on merge_worker_template_configs --- docker/configure_workers_and_start.py | 53 +++++++++++---------------- 1 file changed, 22 insertions(+), 31 deletions(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 89b77d44fdc..ba47a579642 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -420,43 +420,34 @@ def add_worker_to_instance_map( def merge_worker_template_configs( - existing_template: WorkerTemplate, - to_be_merged_template: WorkerTemplate, + left: WorkerTemplate, + right: WorkerTemplate, ) -> WorkerTemplate: - """When given an existing dict of worker template configuration consisting with both - dicts and lists, merge new template data from WORKERS_CONFIG(or create) and - return new dict. + """Merges two templates together, returning a new template that includes + the listeners, endpoint patterns and configuration from both. - Args: - existing_template: Either an existing worker template or a fresh blank one. - to_be_merged_template: The template from WORKERS_CONFIGS to be merged into - existing_dict. - Returns: The newly merged together dict values. + Does not mutate the input templates. """ - # copy existing_template without any replacements - new_template: WorkerTemplate = dataclasses.replace(existing_template) - - # add listener resources from the other set - new_template.listener_resources |= to_be_merged_template.listener_resources - # add endpoint patterns from the other set - new_template.endpoint_patterns |= to_be_merged_template.endpoint_patterns - - # merge dictionaries; the worker name will be replaced later - new_template.shared_extra_conf = lambda worker_name: merged( - existing_template.shared_extra_conf(worker_name), - to_be_merged_template.shared_extra_conf(worker_name), - ) - - # There is only one worker type that has a 'worker_extra_conf' and it is - # the media_repo. Since duplicate worker types on the same worker don't - # work, this is fine. - new_template.worker_extra_conf = ( - new_template.worker_extra_conf + to_be_merged_template.worker_extra_conf + return WorkerTemplate( + # include listener resources from both + listener_resources=left.listener_resources | right.listener_resources, + # include endpoint patterns from both + endpoint_patterns=left.endpoint_patterns | right.endpoint_patterns, + # merge shared config dictionaries; the worker name will be replaced later + shared_extra_conf=lambda worker_name: merged( + left.shared_extra_conf(worker_name), + right.shared_extra_conf(worker_name), + ), + # There is only one worker type that has a 'worker_extra_conf' and it is + # the media_repo. Since duplicate worker types on the same worker don't + # work, this is fine. + worker_extra_conf=(left.worker_extra_conf + right.worker_extra_conf), + # (This is unused, but in principle sharding this hybrid worker type + # would be allowed if both constituent types are shardable) + sharding_allowed=left.sharding_allowed and right.sharding_allowed, ) - return new_template - def insert_worker_name_for_worker_config( existing_template: WorkerTemplate, worker_name: str From 2f1d7270381e6fb46eaae6726cc2d72d31e1e935 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 6 Dec 2023 15:17:55 +0000 Subject: [PATCH 16/20] Update comment on `merged` --- docker/configure_workers_and_start.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index ba47a579642..90ce377d0df 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -358,7 +358,8 @@ def merge_into(dest: Any, new: Any) -> None: def merged(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]: """ - Merges `b` into `a` and returns `a`. + Merges `b` into `a` and returns `a`. Here because we can't use `merge_into` + in a lamba conveniently. """ merge_into(a, b) return a From ad4bb0e6b5cb586e129d007f71bc559395c822f7 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 6 Dec 2023 15:18:12 +0000 Subject: [PATCH 17/20] Tweak `instantiate_worker_template`, both in name, description and variable names --- docker/configure_workers_and_start.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 90ce377d0df..0dba889f966 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -450,21 +450,22 @@ def merge_worker_template_configs( ) -def insert_worker_name_for_worker_config( - existing_template: WorkerTemplate, worker_name: str +def instantiate_worker_template( + template: WorkerTemplate, worker_name: str ) -> Dict[str, Any]: - """Insert a given worker name into the worker's configuration dict. + """Given a worker template, instantiate it into a worker configuration + (which is currently represented as a dictionary). Args: - existing_template: The WorkerTemplate that is imported into shared_config. - worker_name: The name of the worker to insert. - Returns: Copy of the dict with newly inserted worker name + template: The WorkerTemplate to template + worker_name: The name of the worker to use. + Returns: worker configuration dictionary """ - dict_to_edit = dataclasses.asdict(existing_template) - dict_to_edit["shared_extra_conf"] = existing_template.shared_extra_conf(worker_name) - dict_to_edit["endpoint_patterns"] = sorted(existing_template.endpoint_patterns) - dict_to_edit["listener_resources"] = sorted(existing_template.listener_resources) - return dict_to_edit + worker_config_dict = dataclasses.asdict(template) + worker_config_dict["shared_extra_conf"] = template.shared_extra_conf(worker_name) + worker_config_dict["endpoint_patterns"] = sorted(template.endpoint_patterns) + worker_config_dict["listener_resources"] = sorted(template.listener_resources) + return worker_config_dict def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]: @@ -781,7 +782,7 @@ def generate_worker_files( ) # Replace placeholder names in the config template with the actual worker name. - worker_config: Dict[str, Any] = insert_worker_name_for_worker_config( + worker_config: Dict[str, Any] = instantiate_worker_template( worker_template, worker_name ) From 2ff1de3b3c977b2a89d188f0100d8dc66156a148 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 10 Jan 2024 12:15:14 +0000 Subject: [PATCH 18/20] Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/16803.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16803.misc diff --git a/changelog.d/16803.misc b/changelog.d/16803.misc new file mode 100644 index 00000000000..40db4e232ab --- /dev/null +++ b/changelog.d/16803.misc @@ -0,0 +1 @@ +Refactor the `configure_workers_and_start.py` script used internally by Complement. \ No newline at end of file From 29541fd994200eebeba7d22dae9f428a2a7c2a27 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 17 Jan 2024 14:39:57 +0000 Subject: [PATCH 19/20] Move `stream_writers` to their own field in the WorkerTemplate --- docker/configure_workers_and_start.py | 33 ++++++++++++--------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 0dba889f966..b2a03f075a9 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -103,6 +103,8 @@ class WorkerTemplate: shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {} worker_extra_conf: str = "" + stream_writers: Set[str] = field(default_factory=set) + # True if and only if multiple of this worker type are allowed. sharding_allowed: bool = True @@ -226,9 +228,7 @@ class WorkerTemplate: ), "event_persister": WorkerTemplate( listener_resources={"replication"}, - shared_extra_conf=lambda worker_name: { - "stream_writers": {"events": [worker_name]} - }, + stream_writers={"events"}, ), "background_worker": WorkerTemplate( # This worker cannot be sharded. Therefore, there should only ever be one @@ -257,17 +257,13 @@ class WorkerTemplate: "^/_matrix/client/(r0|v3|unstable)/.*/tags", "^/_matrix/client/(r0|v3|unstable)/.*/account_data", }, - shared_extra_conf=lambda worker_name: { - "stream_writers": {"account_data": [worker_name]} - }, + stream_writers={"account_data"}, sharding_allowed=False, ), "presence": WorkerTemplate( listener_resources={"client", "replication"}, endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"}, - shared_extra_conf=lambda worker_name: { - "stream_writers": {"presence": [worker_name]} - }, + stream_writers={"presence"}, sharding_allowed=False, ), "receipts": WorkerTemplate( @@ -276,25 +272,19 @@ class WorkerTemplate: "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt", "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers", }, - shared_extra_conf=lambda worker_name: { - "stream_writers": {"receipts": [worker_name]} - }, + stream_writers={"receipts"}, sharding_allowed=False, ), "to_device": WorkerTemplate( listener_resources={"client", "replication"}, endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"}, - shared_extra_conf=lambda worker_name: { - "stream_writers": {"to_device": [worker_name]} - }, + stream_writers={"to_device"}, sharding_allowed=False, ), "typing": WorkerTemplate( listener_resources={"client", "replication"}, endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"}, - shared_extra_conf=lambda worker_name: { - "stream_writers": {"typing": [worker_name]} - }, + stream_writers={"typing"}, sharding_allowed=False, ), } @@ -447,6 +437,8 @@ def merge_worker_template_configs( # (This is unused, but in principle sharding this hybrid worker type # would be allowed if both constituent types are shardable) sharding_allowed=left.sharding_allowed and right.sharding_allowed, + # include stream writers from both + stream_writers=left.stream_writers | right.stream_writers, ) @@ -462,7 +454,10 @@ def instantiate_worker_template( Returns: worker configuration dictionary """ worker_config_dict = dataclasses.asdict(template) - worker_config_dict["shared_extra_conf"] = template.shared_extra_conf(worker_name) + stream_writers_dict = { + writer: worker_name for writer in template.stream_writers + } + worker_config_dict["shared_extra_conf"] = merged(template.shared_extra_conf(worker_name), stream_writers_dict) worker_config_dict["endpoint_patterns"] = sorted(template.endpoint_patterns) worker_config_dict["listener_resources"] = sorted(template.listener_resources) return worker_config_dict From c91ab4bc55f2c31b7bfcb5e76aa0993d3a2cb040 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 17 Jan 2024 15:02:07 +0000 Subject: [PATCH 20/20] Remove `merge_into` and just have `merged` which copies inputs to avoid footguns --- docker/configure_workers_and_start.py | 60 +++++++++++++++------------ 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index b2a03f075a9..80f0a2e542d 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -55,6 +55,7 @@ import sys from argparse import ArgumentParser from collections import defaultdict +from copy import deepcopy from dataclasses import dataclass, field from itertools import chain from pathlib import Path @@ -321,37 +322,42 @@ def flush_buffers() -> None: sys.stderr.flush() -def merge_into(dest: Any, new: Any) -> None: +def merged(a: Any, b: Any) -> Any: """ - Merges `new` into `dest` with the following rules: + Merges `a` and `b` together, returning the result. + + The merge is performed with the following rules: - dicts: values with the same key will be merged recursively - lists: `new` will be appended to `dest` - primitives: they will be checked for equality and inequality will result in a ValueError - It is an error for `dest` and `new` to be of different types. - """ - if isinstance(dest, dict) and isinstance(new, dict): - for k, v in new.items(): - if k in dest: - merge_into(dest[k], v) - else: - dest[k] = v - elif isinstance(dest, list) and isinstance(new, list): - dest.extend(new) - elif type(dest) != type(new): - raise TypeError(f"Cannot merge {type(dest).__name__} and {type(new).__name__}") - elif dest != new: - raise ValueError(f"Cannot merge primitive values: {dest!r} != {new!r}") - -def merged(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]: + It is an error for `a` and `b` to be of different types. """ - Merges `b` into `a` and returns `a`. Here because we can't use `merge_into` - in a lamba conveniently. - """ - merge_into(a, b) + if isinstance(a, dict) and isinstance(b, dict): + result = {} + for key in set(a.keys()) | set(b.keys()): + if key in a and key in b: + result[key] = merged(a[key], b[key]) + elif key in a: + result[key] = deepcopy(a[key]) + else: + result[key] = deepcopy(b[key]) + + return result + elif isinstance(a, list) and isinstance(b, list): + return deepcopy(a) + deepcopy(b) + elif type(a) != type(b): + raise TypeError(f"Cannot merge {type(a).__name__} and {type(b).__name__}") + elif a != b: + raise ValueError(f"Cannot merge primitive values: {a!r} != {b!r}") + + if type(a) not in {str, int, float, bool, None.__class__}: + raise TypeError( + f"Cannot use `merged` on type {a} as it may not be safe (must either be an immutable primitive or must have special copy/merge logic)" + ) return a @@ -454,10 +460,10 @@ def instantiate_worker_template( Returns: worker configuration dictionary """ worker_config_dict = dataclasses.asdict(template) - stream_writers_dict = { - writer: worker_name for writer in template.stream_writers - } - worker_config_dict["shared_extra_conf"] = merged(template.shared_extra_conf(worker_name), stream_writers_dict) + stream_writers_dict = {writer: worker_name for writer in template.stream_writers} + worker_config_dict["shared_extra_conf"] = merged( + template.shared_extra_conf(worker_name), stream_writers_dict + ) worker_config_dict["endpoint_patterns"] = sorted(template.endpoint_patterns) worker_config_dict["listener_resources"] = sorted(template.listener_resources) return worker_config_dict @@ -786,7 +792,7 @@ def generate_worker_files( ) # Update the shared config with any options needed to enable this worker. - merge_into(shared_config, worker_config["shared_extra_conf"]) + shared_config = merged(shared_config, worker_config["shared_extra_conf"]) if using_unix_sockets: healthcheck_urls.append(