Skip to content

Commit adcac77

Browse files
committed
🗂️ SinkConsumer SchemaFilter
1 parent 9584e0a commit adcac77

File tree

8 files changed

+155
-34
lines changed

8 files changed

+155
-34
lines changed

lib/sequin/consumers/consumers.ex

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ defmodule Sequin.Consumers do
1212
alias Sequin.Consumers.ConsumerRecord
1313
alias Sequin.Consumers.Function
1414
alias Sequin.Consumers.HttpEndpoint
15+
alias Sequin.Consumers.SchemaFilter
1516
alias Sequin.Consumers.SequenceFilter
1617
alias Sequin.Consumers.SequenceFilter.CiStringValue
1718
alias Sequin.Consumers.SequenceFilter.ColumnFilter
@@ -1098,12 +1099,24 @@ defmodule Sequin.Consumers do
10981099
end
10991100
end
11001101

1101-
# Source Table Matching
1102+
# Schema Matching
1103+
def matches_message?(
1104+
%SinkConsumer{schema_filter: %SchemaFilter{} = schema_filter} = consumer,
1105+
%SlotProcessor.Message{} = message
1106+
) do
1107+
matches? = message_matches_schema?(schema_filter, message)
1108+
1109+
Health.put_event(consumer, %Event{slug: :messages_filtered, status: :success})
1110+
1111+
matches?
1112+
end
1113+
1114+
# Sequence Matching
11021115
def matches_message?(
11031116
%{sequence: %Sequence{} = sequence, sequence_filter: %SequenceFilter{} = sequence_filter} = consumer,
11041117
%SlotProcessor.Message{} = message
11051118
) do
1106-
matches? = matches_message?(sequence, sequence_filter, message)
1119+
matches? = message_matches_sequence?(sequence, sequence_filter, message)
11071120

11081121
Health.put_event(consumer, %Event{slug: :messages_filtered, status: :success})
11091122

@@ -1123,27 +1136,14 @@ defmodule Sequin.Consumers do
11231136
reraise error, __STACKTRACE__
11241137
end
11251138

1139+
# Source Table Matching
11261140
def matches_message?(consumer_or_wal_pipeline, %SlotProcessor.Message{} = message) do
11271141
matches? =
11281142
Enum.any?(consumer_or_wal_pipeline.source_tables, fn %SourceTable{} = source_table ->
11291143
table_matches = source_table.oid == message.table_oid
11301144
action_matches = action_matches?(source_table.actions, message.action)
11311145
column_filters_match = column_filters_match_message?(source_table.column_filters, message)
11321146

1133-
# Logger.debug("""
1134-
# [Consumers]
1135-
# matches?: #{table_matches && action_matches && column_filters_match}
1136-
# table_matches: #{table_matches}
1137-
# action_matches: #{action_matches}
1138-
# column_filters_match: #{column_filters_match}
1139-
1140-
# consumer_or_wal_pipeline:
1141-
# #{inspect(consumer_or_wal_pipeline, pretty: true)}
1142-
1143-
# message:
1144-
# #{inspect(message, pretty: true)}
1145-
# """)
1146-
11471147
table_matches && action_matches && column_filters_match
11481148
end)
11491149

@@ -1168,13 +1168,20 @@ defmodule Sequin.Consumers do
11681168
reraise error, __STACKTRACE__
11691169
end
11701170

1171-
def matches_message?(%Sequence{} = _sequence, %SequenceFilter{} = sequence_filter, %SlotProcessor.Message{} = message) do
1172-
# table_matches? = sequence.table_oid == message.table_oid
1171+
defp message_matches_schema?(%SchemaFilter{} = schema_filter, %SlotProcessor.Message{} = message) do
1172+
message.table_schema == schema_filter.schema
1173+
end
1174+
1175+
defp message_matches_sequence?(
1176+
%Sequence{} = sequence,
1177+
%SequenceFilter{} = sequence_filter,
1178+
%SlotProcessor.Message{} = message
1179+
) do
1180+
table_matches? = sequence.table_oid == message.table_oid
11731181
actions_match? = action_matches?(sequence_filter.actions, message.action)
11741182
column_filters_match? = column_filters_match_message?(sequence_filter.column_filters, message)
11751183

1176-
# table_matches? and actions_match? and column_filters_match?
1177-
actions_match? and column_filters_match?
1184+
table_matches? and actions_match? and column_filters_match?
11781185
end
11791186

11801187
def matches_record?(

lib/sequin/consumers/schema_filter.ex

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
defmodule Sequin.Consumers.SchemaFilter do
2+
@moduledoc false
3+
use Ecto.Schema
4+
5+
import Ecto.Changeset
6+
7+
@derive {Jason.Encoder, only: [:schema]}
8+
9+
@type t :: %__MODULE__{
10+
schema: String.t()
11+
}
12+
13+
@primary_key false
14+
embedded_schema do
15+
field :schema, :string
16+
end
17+
18+
def create_changeset(schema_filter, attrs) do
19+
schema_filter
20+
|> cast(attrs, [:schema])
21+
|> validate_required([:schema])
22+
|> validate_format(:schema, ~r/^[a-zA-Z_][a-zA-Z0-9_]*$/, message: "must be a valid PostgreSQL identifier")
23+
end
24+
end

lib/sequin/consumers/sink_consumer.ex

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ defmodule Sequin.Consumers.SinkConsumer do
2020
alias Sequin.Consumers.RabbitMqSink
2121
alias Sequin.Consumers.RedisStreamSink
2222
alias Sequin.Consumers.RedisStringSink
23+
alias Sequin.Consumers.SchemaFilter
2324
alias Sequin.Consumers.SequenceFilter
2425
alias Sequin.Consumers.SequinStreamSink
2526
alias Sequin.Consumers.SnsSink
@@ -103,6 +104,7 @@ defmodule Sequin.Consumers.SinkConsumer do
103104
# FIXME: Refactor / remove both of these?
104105
belongs_to :sequence, Sequence
105106
embeds_one :sequence_filter, SequenceFilter, on_replace: :delete
107+
embeds_one :schema_filter, SchemaFilter, on_replace: :delete
106108

107109
belongs_to :account, Account
108110
belongs_to :replication_slot, PostgresReplicationSlot
@@ -150,6 +152,7 @@ defmodule Sequin.Consumers.SinkConsumer do
150152
])
151153
|> changeset(attrs)
152154
|> cast_embed(:sequence_filter, with: &SequenceFilter.create_changeset/2)
155+
|> cast_embed(:schema_filter, with: &SchemaFilter.create_changeset/2)
153156
|> foreign_key_constraint(:sequence_id)
154157
|> foreign_key_constraint(:transform_id)
155158
|> foreign_key_constraint(:routing_id)
@@ -160,17 +163,20 @@ defmodule Sequin.Consumers.SinkConsumer do
160163
name: "ensure_batch_size_one",
161164
message: "batch_size must be 1 when batch is false for webhook sinks"
162165
)
166+
|> validate_filter_constraints()
163167
|> Sequin.Changeset.validate_name()
164168
end
165169

166170
def update_changeset(consumer, attrs) do
167171
consumer
168172
|> changeset(attrs)
169173
|> cast_embed(:sequence_filter, with: &SequenceFilter.create_changeset/2)
174+
|> cast_embed(:schema_filter, with: &SchemaFilter.create_changeset/2)
170175
|> check_constraint(:batch_size,
171176
name: "ensure_batch_size_one",
172177
message: "batch_size must be 1 when batch is false for webhook sinks"
173178
)
179+
|> validate_filter_constraints()
174180
end
175181

176182
def changeset(consumer, attrs) do
@@ -236,6 +242,22 @@ defmodule Sequin.Consumers.SinkConsumer do
236242
end
237243
end
238244

245+
defp validate_filter_constraints(changeset) do
246+
sequence_filter = get_field(changeset, :sequence_filter)
247+
schema_filter = get_field(changeset, :schema_filter)
248+
249+
cond do
250+
is_nil(sequence_filter) and is_nil(schema_filter) ->
251+
add_error(changeset, :base, "either sequence_filter or schema_filter must be set")
252+
253+
not is_nil(sequence_filter) and not is_nil(schema_filter) ->
254+
add_error(changeset, :base, "cannot set both sequence_filter and schema_filter")
255+
256+
true ->
257+
changeset
258+
end
259+
end
260+
239261
defp put_defaults(changeset) do
240262
changeset
241263
|> put_change(:batch_size, get_field(changeset, :batch_size) || 1)

lib/sequin/runtime/message_handler.ex

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -527,14 +527,17 @@ defmodule Sequin.Runtime.MessageHandler do
527527
defp generate_group_id(consumer, message) do
528528
# This should be way more assertive - we should error if we don't find the source table
529529
# We have a lot of tests that do not line up consumer source_tables with the message table oid
530-
if consumer.sequence_filter.group_column_attnums do
531-
Enum.map_join(consumer.sequence_filter.group_column_attnums, ":", fn attnum ->
532-
fields = if message.action == :delete, do: message.old_fields, else: message.fields
533-
field = Sequin.Enum.find!(fields, &(&1.column_attnum == attnum))
534-
to_string(field.value)
535-
end)
536-
else
537-
Enum.map_join(message.ids, ":", &to_string/1)
530+
case consumer do
531+
%SinkConsumer{sequence_filter: %SequenceFilter{group_column_attnums: group_column_attnums}}
532+
when not is_nil(group_column_attnums) ->
533+
Enum.map_join(consumer.sequence_filter.group_column_attnums, ":", fn attnum ->
534+
fields = if message.action == :delete, do: message.old_fields, else: message.fields
535+
field = Sequin.Enum.find!(fields, &(&1.column_attnum == attnum))
536+
to_string(field.value)
537+
end)
538+
539+
_ ->
540+
Enum.map_join(message.ids, ":", &to_string/1)
538541
end
539542
end
540543

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
defmodule Sequin.Repo.Migrations.AddSchemaFilterToSinkConsumers do
2+
use Ecto.Migration
3+
4+
@config_schema Application.compile_env(:sequin, [Sequin.Repo, :config_schema_prefix])
5+
6+
def change do
7+
alter table(:sink_consumers, prefix: @config_schema) do
8+
add :schema_filter, :jsonb, null: true
9+
end
10+
end
11+
end

test/sequin/message_handler_test.exs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,14 +165,22 @@ defmodule Sequin.MessageHandlerTest do
165165
}
166166
end
167167

168-
@tag skip: "FIXME: This test is broken while we work on table fan-in"
169168
test "fans out messages correctly for mixed message_kind consumers and wal_pipelines" do
170169
account = AccountsFactory.insert_account!()
171170
database = DatabasesFactory.insert_postgres_database!(account_id: account.id)
172171

173172
field = ReplicationFactory.field()
174-
message1 = ReplicationFactory.postgres_message(table_oid: 123, action: :insert, fields: [field])
175-
message2 = ReplicationFactory.postgres_message(table_oid: 456, action: :update, fields: [field])
173+
table_schema1 = Factory.postgres_object()
174+
table_schema2 = Factory.postgres_object()
175+
176+
message1 =
177+
ReplicationFactory.postgres_message(table_oid: 123, action: :insert, fields: [field], table_schema: table_schema1)
178+
179+
message2 =
180+
ReplicationFactory.postgres_message(table_oid: 456, action: :update, fields: [field], table_schema: table_schema2)
181+
182+
message3 =
183+
ReplicationFactory.postgres_message(table_oid: 789, action: :insert, fields: [field], table_schema: table_schema1)
176184

177185
sequence1 =
178186
DatabasesFactory.insert_sequence!(
@@ -218,11 +226,24 @@ defmodule Sequin.MessageHandlerTest do
218226
source_tables: []
219227
)
220228

229+
schema_filter = ConsumersFactory.schema_filter_attrs(schema: table_schema1)
230+
231+
consumer3 =
232+
ConsumersFactory.insert_sink_consumer!(
233+
account_id: account.id,
234+
sequence_id: nil,
235+
sequence_filter: nil,
236+
source_tables: [],
237+
schema_filter: schema_filter
238+
)
239+
221240
start_supervised!({SlotMessageStoreSupervisor, [consumer: consumer1, test_pid: self(), persisted_mode?: false]})
222241
start_supervised!({SlotMessageStoreSupervisor, [consumer: consumer2, test_pid: self(), persisted_mode?: false]})
242+
start_supervised!({SlotMessageStoreSupervisor, [consumer: consumer3, test_pid: self(), persisted_mode?: false]})
223243

224244
consumer1 = Repo.preload(consumer1, [:postgres_database, :sequence, :filter])
225245
consumer2 = Repo.preload(consumer2, [:postgres_database, :sequence, :filter])
246+
consumer3 = Repo.preload(consumer3, [:postgres_database, :sequence, :filter])
226247

227248
database = Repo.preload(database, :sequences)
228249

@@ -236,16 +257,17 @@ defmodule Sequin.MessageHandlerTest do
236257
)
237258

238259
context = %MessageHandler.Context{
239-
consumers: [consumer1, consumer2],
260+
consumers: [consumer1, consumer2, consumer3],
240261
wal_pipelines: [wal_pipeline],
241262
replication_slot_id: UUID.uuid4(),
242263
postgres_database: database
243264
}
244265

245-
{:ok, 4} = MessageHandler.handle_messages(context, [message1, message2])
266+
{:ok, 6} = MessageHandler.handle_messages(context, [message1, message2, message3])
246267

247268
consumer1_messages = list_messages(consumer1)
248269
consumer2_messages = list_messages(consumer2)
270+
consumer3_messages = list_messages(consumer3)
249271
wal_events = Replication.list_wal_events(wal_pipeline.id)
250272

251273
assert length(consumer1_messages) == 1
@@ -254,6 +276,9 @@ defmodule Sequin.MessageHandlerTest do
254276
assert length(consumer2_messages) == 1
255277
assert hd(consumer2_messages).table_oid == 456
256278

279+
assert length(consumer3_messages) == 2
280+
assert Enum.all?(consumer3_messages, &(&1.data.metadata.table_schema == table_schema1))
281+
257282
assert length(wal_events) == 2
258283
end
259284

test/sequin/postgres_replication_test.exs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ defmodule Sequin.PostgresReplicationTest do
1515
import ExUnit.CaptureLog
1616

1717
alias Sequin.Consumers
18+
alias Sequin.Consumers.SchemaFilter
1819
alias Sequin.Consumers.SequenceFilter
1920
alias Sequin.Databases.ConnectionCache
2021
alias Sequin.Databases.DatabaseUpdateWorker
@@ -1492,6 +1493,16 @@ defmodule Sequin.PostgresReplicationTest do
14921493
# Randomly select a consumer
14931494
consumer = Enum.random([event_consumer, record_consumer])
14941495

1496+
# Attach a schema filter to the consumer
1497+
consumer
1498+
|> Ecto.Changeset.cast(%{schema_filter: ConsumersFactory.schema_filter_attrs(schema: "public")}, [])
1499+
|> Ecto.Changeset.cast_embed(:schema_filter, with: &SchemaFilter.create_changeset/2)
1500+
|> Repo.update!()
1501+
1502+
# Restart the consumer to apply the changes
1503+
Consumers.update_sink_consumer(consumer, %{})
1504+
Runtime.Supervisor.refresh_message_handler_ctx(consumer.replication_slot_id)
1505+
14951506
# Verify no consumer messages yet
14961507
assert list_messages(consumer) == []
14971508

test/support/factory/consumers_factory.ex

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ defmodule Sequin.Factory.ConsumersFactory do
1818
alias Sequin.Consumers.RabbitMqSink
1919
alias Sequin.Consumers.RedisStreamSink
2020
alias Sequin.Consumers.RedisStringSink
21+
alias Sequin.Consumers.SchemaFilter
2122
alias Sequin.Consumers.SequenceFilter
2223
alias Sequin.Consumers.SequenceFilter.ColumnFilter
2324
alias Sequin.Consumers.SequinStreamSink
@@ -133,7 +134,6 @@ defmodule Sequin.Factory.ConsumersFactory do
133134
def sink_consumer_attrs(attrs \\ []) do
134135
attrs
135136
|> sink_consumer()
136-
|> Sequin.Map.from_ecto()
137137
|> Map.update!(:sink, &Sequin.Map.from_ecto/1)
138138
|> Map.update!(:source_tables, fn source_tables ->
139139
Enum.map(source_tables, fn source_table ->
@@ -153,6 +153,7 @@ defmodule Sequin.Factory.ConsumersFactory do
153153
end)
154154
end
155155
end)
156+
|> Sequin.Map.from_ecto()
156157
end
157158

158159
def insert_sink_consumer!(attrs \\ []) do
@@ -815,6 +816,23 @@ defmodule Sequin.Factory.ConsumersFactory do
815816
|> Sequin.Map.from_ecto()
816817
end
817818

819+
def schema_filter(attrs \\ []) do
820+
attrs = Map.new(attrs)
821+
822+
merge_attributes(
823+
%SchemaFilter{
824+
schema: Factory.postgres_object()
825+
},
826+
attrs
827+
)
828+
end
829+
830+
def schema_filter_attrs(attrs \\ []) do
831+
attrs
832+
|> schema_filter()
833+
|> Sequin.Map.from_ecto(keep_nils: true)
834+
end
835+
818836
def backfill(attrs \\ []) do
819837
attrs = Map.new(attrs)
820838

0 commit comments

Comments
 (0)