Skip to content

feat(taskworker): Add compression in taskworker producer and worker #95153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

enochtangg
Copy link
Member

The taskbroker system does not handle large payloads well. We first experienced the effects of this with ingest-profiles when rolling out to smaller environments. We also observed slow throughput and sqlite issues for processing pools handling sentryapp and seer tasks (2-8MB payloads). This PR is responsible for adding the ability to enable task parameter compression in taskworkers.

Flow:

  1. User defines a CompressionType attribute on the @instrumented_task decorator which chooses the compression algorithm (only supports ZSTD and PLAINTEXT). Defaults to PLAINTEXT.
  2. In the taskworker producer layer, parameters gets compression and serialized. A task header is added indicating the compression type
  3. Parameters stay compressed in kafka and taskbroker storage.
  4. Using the tasks header, the worker determines whether the task needs to be decompressed. If so, base64 decode and decompress the message.

Rollout:
To rollout this change, we update a task's decorator with CompressionType, then update the sentry option's compression rollout rate. Depending on the rate, tasks will be sampled for compression. By using the task header, this enables same tasks to be incrementally rolled out.

Testing:

  • Local testing
  • Unit tests
  • Will start with 1% of a single task in S4S. Observe compression metrics (duration and size) to ensure similar performance.

@enochtangg enochtangg requested a review from a team as a code owner July 9, 2025 18:47
@github-actions github-actions bot added the Scope: Backend Automatically applied to PRs that change backend components label Jul 9, 2025
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Compression Rollout Bug Causes UnboundLocalError

An UnboundLocalError occurs for the parameters_str variable when compression_type is ZSTD but the compression rollout condition is not met. The code's else block only assigns parameters_str when compression_type is not ZSTD, leaving parameters_str undefined for tasks configured with ZSTD compression that do not pass the rollout check.

src/sentry/taskworker/task.py#L180-L207

parameters_json = orjson.dumps({"args": args, "kwargs": kwargs})
if self.compression_type == CompressionType.ZSTD:
option_flag = f"taskworker.{self._namespace.name}.compression.rollout"
compression_rollout_rate = options.get(option_flag)
# TODO(taskworker): This option is for added safety and requires a rollout
# percentage to be set to actually use compression.
# We can remove this later.
if compression_rollout_rate and compression_rollout_rate > random.random():
# Worker uses this header to determine if the parameters are decompressed
headers["compression-type"] = CompressionType.ZSTD.value
start_time = time.perf_counter()
parameters_data = zstd.compress(parameters_json)
# Compressed data is binary and needs base64 encoding for transport
parameters_str = base64.b64encode(parameters_data).decode("utf8")
end_time = time.perf_counter()
metrics.distribution(
"taskworker.producer.compressed_parameters_size",
len(parameters_str),
tags={"namespace": self._namespace.name, "taskname": self.name},
)
metrics.distribution(
"taskworker.producer.compression_time",
end_time - start_time,
tags={"namespace": self._namespace.name, "taskname": self.name},
)
else:
parameters_str = parameters_json.decode("utf8")

Fix in CursorFix in Web


Bug: Task Name Conflict Overwrites Existing Task

The new simple_task_compressed function is registered with the name "examples.simple_task", which conflicts with an existing simple_task function. This overwrites the original task in the registry, rendering it unreachable. The new task requires a unique name.

src/sentry/taskworker/tasks/examples.py#L92-L96

@exampletasks.register(name="examples.simple_task", compression_type=CompressionType.ZSTD)
def simple_task_compressed(*args: list[Any], **kwargs: dict[str, Any]) -> None:
sleep(0.1)
logger.debug("simple_task_compressed complete")

Fix in CursorFix in Web


Was this report helpful? Give feedback by reacting with 👍 or 👎

Copy link
Member

@evanh evanh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me. I'm interested to see how this will impact things.

Comment on lines +3462 to +3466
register(
"taskworker.deletions.compression.rollout",
default=0.0,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to enable compression per namespace? Once we know that the general compression flow works, we could opt-tasks into compression via deploys.

@@ -125,6 +126,7 @@ def wrapped(func: Callable[P, R]) -> Task[P, R]:
),
at_most_once=at_most_once,
wait_for_delivery=wait_for_delivery,
compression_type=compression_type,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add this to the docstring so it shows up in LSP tooltips?

@@ -169,12 +177,41 @@ def create_activation(
f"The `{key}` header value is of type {type(value)}"
)

parameters_json = orjson.dumps({"args": args, "kwargs": kwargs})
if self.compression_type == CompressionType.ZSTD:
option_flag = f"taskworker.{self._namespace.name}.compression.rollout"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have a single option for enabling compression. We might also want the option check to be done with the self.compression_type comparison so that both the task has to have enabled compression and compression needs to be enabled.


@pytest.mark.django_db
@mock.patch("sentry.taskworker.workerchild.capture_checkin")
def test_child_process_decompression(mock_capture_checkin) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have a task covering payload compression when an activation is created as well. There are some existing tests in tests/sentry/taskworker/test_task.py and test_registry.py covering activation creation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Scope: Backend Automatically applied to PRs that change backend components
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants