Skip to content

feat: add webhook integration test #1474

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ config :sequin,
nats_module: Sequin.Sinks.NatsMock,
# Arbitrarily high memory limit for testing
max_memory_bytes: 100 * 1024 * 1024 * 1024,
slot_message_store: [flush_batch_size: 8]
slot_message_store: [flush_batch_size: 8],
jepsen_http_host: System.get_env("JEPSEN_HTTP_HOST", "127.0.0.1"),
jepsen_http_port: "JEPSEN_HTTP_PORT" |> System.get_env("4040") |> String.to_integer(),
jepsen_transactions_count: "JEPSEN_TRANSACTIONS_COUNT" |> System.get_env("10") |> String.to_integer(),
jepsen_transaction_queries_count: "JEPSEN_TRANSACTION_QUERIES_COUNT" |> System.get_env("10") |> String.to_integer()

# In AES.GCM, it is important to specify 12-byte IV length for
# interoperability with other encryption software. See this GitHub
Expand Down
92 changes: 91 additions & 1 deletion test/sequin/postgres_replication_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ defmodule Sequin.PostgresReplicationTest do
alias Sequin.TestSupport.Models.CharacterMultiPK
alias Sequin.TestSupport.Models.TestEventLogPartitioned
alias Sequin.TestSupport.ReplicationSlots
alias Sequin.TestSupport.SimpleHttpServer

@moduletag :unboxed

Expand Down Expand Up @@ -122,6 +123,39 @@ defmodule Sequin.PostgresReplicationTest do
)
)

http_endpoint =
ConsumersFactory.insert_http_endpoint!(
account_id: account_id,
scheme: :http,
host: Application.get_env(:sequin, :jepsen_http_host),
port: Application.get_env(:sequin, :jepsen_http_port),
path: "/",
headers: %{"Content-Type" => "application/json"}
)

test_event_log_partitioned_consumer_http =
ConsumersFactory.insert_sink_consumer!(
account_id: account_id,
type: :http_push,
legacy_transform: :none,
sink: %{
type: :http_push,
http_endpoint_id: http_endpoint.id,
http_endpoint: http_endpoint,
batch: true
},
replication_slot_id: pg_replication.id,
message_kind: :event,
status: :active,
sequence_id: test_event_log_partitioned_sequence.id,
sequence_filter:
ConsumersFactory.sequence_filter_attrs(
actions: [:insert, :update, :delete],
column_filters: [],
group_column_attnums: [1]
)
)

event_character_ident_consumer =
ConsumersFactory.insert_sink_consumer!(
name: "event_character_ident_consumer",
Expand Down Expand Up @@ -246,6 +280,7 @@ defmodule Sequin.PostgresReplicationTest do
{:ok, _} = Runtime.Supervisor.start_replication(sup, pg_replication, test_pid: self())

%{
sup: sup,
pg_replication: pg_replication,
source_db: source_db,
event_character_consumer: event_character_consumer,
Expand All @@ -254,7 +289,8 @@ defmodule Sequin.PostgresReplicationTest do
record_character_consumer: record_character_consumer,
record_character_ident_consumer: record_character_ident_consumer,
record_character_multi_pk_consumer: record_character_multi_pk_consumer,
test_event_log_partitioned_consumer: test_event_log_partitioned_consumer
test_event_log_partitioned_consumer: test_event_log_partitioned_consumer,
test_event_log_partitioned_consumer_http: test_event_log_partitioned_consumer_http
}
end

Expand Down Expand Up @@ -816,6 +852,42 @@ defmodule Sequin.PostgresReplicationTest do
assert consumer_event.consumer_id == consumer.id
assert consumer_event.table_oid == TestEventLogPartitioned.table_oid()
end

@tag :jepsen
@tag capture_log: true
test "batch updates are delivered in sequence order via HTTP", %{sup: sup} do
{:ok, pid} = SimpleHttpServer.start_link(%{caller: self()})

on_exit(fn ->
try do
GenServer.stop(pid)
catch
_, _ -> :ok
end
end)

ref = make_ref()
initial_seq = 0
event = [seq: initial_seq, source_table_schema: inspect(ref)]
event = TestEventLogFactory.insert_test_event_log_partitioned!(event, repo: UnboxedRepo)

transactions_count = Application.get_env(:sequin, :jepsen_transactions_count)
transaction_queries_count = Application.get_env(:sequin, :jepsen_transaction_queries_count)

Enum.reduce(1..transactions_count, initial_seq, fn _, seq ->
{events, seq} =
Enum.reduce(1..transaction_queries_count, {[], seq}, fn _, {acc, seq} ->
seq = seq + 1
{[%{seq: seq} | acc], seq}
end)

TestEventLogFactory.update_test_event_log_partitioned!(event, Enum.reverse(events), repo: UnboxedRepo)
seq
end)

assert wait_and_validate_data(inspect(ref), -1, transactions_count * transaction_queries_count)
stop_supervised!(sup)
end
end

describe "PostgresReplication end-to-end with http push" do
Expand Down Expand Up @@ -1993,4 +2065,22 @@ defmodule Sequin.PostgresReplicationTest do
defp list_messages(consumer) do
SlotMessageStore.peek_messages(consumer, 1000)
end

@spec wait_and_validate_data(binary(), non_neg_integer(), non_neg_integer()) :: any()
defp wait_and_validate_data(_, _, 0), do: true

defp wait_and_validate_data(ref, prev_seq, expected_count) do
receive do
%{"action" => "update", "record" => %{"source_table_schema" => ^ref, "seq" => current_seq}} ->
if current_seq > prev_seq,
do: wait_and_validate_data(ref, current_seq, expected_count - 1),
else: flunk("Received message with seq #{current_seq} which is less than prev_seq #{prev_seq}")

_ ->
wait_and_validate_data(ref, prev_seq, expected_count)
after
5_000 ->
flunk("Did not receive :simple_http_server_loaded within 5 seconds")
end
end
end
11 changes: 11 additions & 0 deletions test/support/factory/test_event_log_factory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,16 @@ if Mix.env() == :test do
|> TestEventLogPartitioned.changeset(attrs)
|> repo.insert!()
end

@spec update_test_event_log_partitioned!(map(), [map()], Keyword.t()) :: [map()]
def update_test_event_log_partitioned!(%TestEventLogPartitioned{} = event, attrs, opts \\ []) do
repo = Keyword.get(opts, :repo, Repo)

for attr <- attrs do
event
|> TestEventLogPartitioned.changeset(attr)
|> repo.update!()
end
end
end
end
58 changes: 58 additions & 0 deletions test/support/simple_http_server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
defmodule Sequin.TestSupport.SimpleHttpServer do
@moduledoc """
This module is a naive single-threaded HTTP server for webhook testing.
It accepts HTTP requests and forwards the parsed JSON body to the caller process.
Only intended for use in test environments.
"""
use GenServer

require Logger

@listen_options [:binary, packet: :raw, active: false, reuseaddr: true]
@response """
HTTP/1.1 204 No Content\r
Content-Length: 0\r
\r
"""

def start_link(opts), do: GenServer.start_link(__MODULE__, opts)

def init(opts) do
port = opts[:port] || Application.get_env(:sequin, :jepsen_http_port)
{:ok, listen_socket} = :gen_tcp.listen(port, @listen_options)

state = %{
caller: opts.caller,
listen_socket: listen_socket
}

{:ok, state, {:continue, :accept}}
end

def handle_continue(:accept, state) do
{:ok, socket} = :gen_tcp.accept(state.listen_socket)
handle_connection(state.caller, socket)
{:noreply, state, {:continue, :accept}}
end

## Internal Functions

@spec handle_connection(pid(), :gen_tcp.socket()) :: any()
defp handle_connection(caller, socket) do
with {:ok, request} <- :gen_tcp.recv(socket, 0),
[_headers, body] <- String.split(request, "\r\n\r\n", parts: 2),
:ok <- :gen_tcp.send(socket, @response),
:ok <- :gen_tcp.close(socket),
{:ok, event} <- Jason.decode(body) do
case event["data"] do
records when is_list(records) ->
Enum.each(records, fn record ->
send(caller, record)
end)

_ ->
send(caller, event)
end
end
end
end
Loading