Skip to content

refactor: remove legacy_event_singleton_transform #1587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions lib/sequin/consumers/consumers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -448,20 +448,12 @@ defmodule Sequin.Consumers do
|> Repo.all()
end

@legacy_event_singleton_transform_cutoff_date ~D[2024-11-06]
def consumer_features(%SinkConsumer{} = consumer) do
consumer = Repo.lazy_preload(consumer, [:account])

cond do
Accounts.has_feature?(consumer.account, :legacy_event_transform) ->
[legacy_event_transform: true]

Date.before?(consumer.account.inserted_at, @legacy_event_singleton_transform_cutoff_date) ->
[legacy_event_singleton_transform: true]

true ->
[]
end
if Accounts.has_feature?(consumer.account, :legacy_event_transform),
do: [legacy_event_transform: true],
else: []
end

# ConsumerEvent
Expand Down
4 changes: 0 additions & 4 deletions lib/sequin/runtime/http_push_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ defmodule Sequin.Runtime.HttpPushPipeline do
[message] = messages
legacy_event_transform_message(consumer, message.data)

features[:legacy_event_singleton_transform] && length(messages) == 1 ->
[message] = messages
message.data

consumer.sink.batch == false ->
[message] = messages
Transforms.Message.to_external(consumer, message)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Sequin.Repo.Migrations.RemoveLegacyEventSingletonTransformConfig do
use Ecto.Migration

@config_schema Application.compile_env(:sequin, [Sequin.Repo, :config_schema_prefix])

def change do
execute(
"""
update #{@config_schema}.sink_consumers
set
sink = jsonb_set(sink, '{batch}', 'false'::jsonb),
batch_size = 1
where
type = 'http_push'
and account_id in (
select id
from #{@config_schema}.accounts
where inserted_at <= '2024-11-06'
)
""",
""
)
end
end
4 changes: 2 additions & 2 deletions test/sequin/consumers_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2271,7 +2271,7 @@ defmodule Sequin.ConsumersTest do
assert Consumers.consumer_features(consumer) == []
end

test "returns legacy_event_singleton_transform when account is old enough" do
test "not return legacy_event_singleton_transform when account is old enough" do
account = AccountsFactory.account(features: [], inserted_at: ~D[2024-11-01])

event_table = DatabasesFactory.event_table()
Expand All @@ -2285,7 +2285,7 @@ defmodule Sequin.ConsumersTest do
type: :http_push
)

assert Consumers.consumer_features(consumer) == [{:legacy_event_singleton_transform, true}]
refute Consumers.consumer_features(consumer) == [{:legacy_event_singleton_transform, true}]
end
end

Expand Down
27 changes: 0 additions & 27 deletions test/sequin/http_push_pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -139,33 +139,6 @@ defmodule Sequin.Runtime.HttpPushPipelineTest do
assert_receive :sent, 1_000
end

test "legacy_event_singleton_transform sends unwrapped single messages", %{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it right to remove it, or should we keep this test?

consumer: consumer,
http_endpoint: http_endpoint
} do
test_pid = self()
event = ConsumersFactory.insert_consumer_event!(consumer_id: consumer.id, action: :insert)

adapter = fn %Req.Request{} = req ->
assert to_string(req.url) == HttpEndpoint.url(http_endpoint)
json = Jason.decode!(req.body)

# Should NOT be wrapped in a list
refute is_list(json)
assert json["action"] == "insert"

send(test_pid, :sent)
{req, Req.Response.new(status: 200)}
end

# Start pipeline with legacy_event_singleton_transform enabled
start_pipeline!(consumer, adapter, features: [legacy_event_singleton_transform: true])

ref = send_test_event(consumer, event)
assert_receive {:ack, ^ref, [%{data: %{data: %{action: :insert}}}], []}, 1_000
assert_receive :sent, 1_000
end

test "when all messages are rejected due to idempotency, the pipeline does not invoke the adapter", %{
consumer: consumer
} do
Expand Down
Loading