Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions lib/sequin/consumers/consumers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ defmodule Sequin.Consumers do
Enumerable.t(ConsumerRecord.t() | ConsumerEvent.t())
def stream_consumer_messages_for_consumer(%SinkConsumer{id: consumer_id} = consumer, opts \\ []) do
batch_size = Keyword.get(opts, :batch_size, 1000)
initial_cursor = Keyword.get(opts, :cursor)

module =
case consumer.message_kind do
Expand All @@ -527,6 +528,14 @@ defmodule Sequin.Consumers do
|> order_by([m], asc: m.commit_lsn, asc: m.commit_idx)
|> limit(^batch_size)

initial_query =
if is_nil(initial_cursor) do
initial_query
else
{commit_lsn, commit_idx} = initial_cursor
where(initial_query, [m], {m.commit_lsn, m.commit_idx} > {^commit_lsn, ^commit_idx})
end

# Query, prev_results, last_cursor
# last_cursor is {commit_lsn, commit_idx} of the last record
{initial_query, [], nil}
Expand Down
5 changes: 3 additions & 2 deletions lib/sequin/runtime/slot_message_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1006,10 +1006,11 @@ defmodule Sequin.Runtime.SlotMessageStore do
# Stream messages and stop when we reach max_memory_bytes
{time, {persisted_messages, current_size_bytes, message_count, all_loaded?}} =
:timer.tc(fn ->
current_cursor = State.max_wal_cursor(state)

state.consumer
|> Consumers.stream_consumer_messages_for_consumer()
|> Consumers.stream_consumer_messages_for_consumer(cursor: current_cursor)
|> Stream.filter(&(message_partition(&1, state.consumer.partition_count) == state.partition))
|> Stream.reject(&State.message_exists?(state, &1))
|> Enum.reduce_while({[], 0, 0, true}, fn msg, {messages, current_size, message_count, _all_loaded?} ->
msg = %{msg | payload_size_bytes: :erlang.external_size(msg.data)}
new_size = current_size + msg.payload_size_bytes
Expand Down
18 changes: 18 additions & 0 deletions lib/sequin/runtime/slot_message_store_state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,24 @@ defmodule Sequin.Runtime.SlotMessageStore.State do
|> Enum.take(limit)
end

@doc """
Returns the maximum WAL cursor present in the state, or nil if there are no messages.
The cursor is returned as a tuple of {commit_lsn, commit_idx}.
"""
@spec max_wal_cursor(State.t()) :: cursor_tuple() | nil
def max_wal_cursor(%State{} = state) do
if map_size(state.messages) == 0 do
nil
else
table = ordered_cursors_table(state)
# Get the last key from the ordered ETS table, which will be the max cursor
case :ets.last(table) do
:"$end_of_table" -> nil
max_cursor -> max_cursor
end
end
end

@doc """
Returns messages that are stuck in a delivering state - they are in produced_message_groups
but were delivered over 1 minute ago. This helps recover messages that may have been
Expand Down
57 changes: 57 additions & 0 deletions test/sequin/consumers_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2533,6 +2533,63 @@ defmodule Sequin.ConsumersTest do
streamed_lsns = Enum.map(streamed_messages, & &1.commit_lsn)
assert streamed_lsns == Enum.map(1..10, fn i -> i * 100 end)
end

test "initial cursor filters out messages before the cursor" do
consumer = ConsumersFactory.insert_sink_consumer!()

# Create messages with different commit LSNs and indexes
messages = [
# LSN: 100, IDX: 1
ConsumersFactory.insert_consumer_message!(
consumer_id: consumer.id,
message_kind: consumer.message_kind,
commit_lsn: 100,
commit_idx: 1
),
# LSN: 100, IDX: 2
ConsumersFactory.insert_consumer_message!(
consumer_id: consumer.id,
message_kind: consumer.message_kind,
commit_lsn: 100,
commit_idx: 2
),
# LSN: 200, IDX: 1
ConsumersFactory.insert_consumer_message!(
consumer_id: consumer.id,
message_kind: consumer.message_kind,
commit_lsn: 200,
commit_idx: 1
),
# LSN: 300, IDX: 1
ConsumersFactory.insert_consumer_message!(
consumer_id: consumer.id,
message_kind: consumer.message_kind,
commit_lsn: 300,
commit_idx: 1
)
]

# Set cursor to (100, 2) - should only return messages with cursor > (100, 2)
initial_cursor = {100, 2}

streamed_messages =
consumer
|> Consumers.stream_consumer_messages_for_consumer(cursor: initial_cursor)
|> Enum.to_list()

# Should only return the last two messages (200,1) and (300,1)
assert length(streamed_messages) == 2

# Check that we only got messages with cursors > (100, 2)
streamed_cursors = Enum.map(streamed_messages, &{&1.commit_lsn, &1.commit_idx})
assert Enum.all?(streamed_cursors, fn cursor -> cursor > initial_cursor end)

# Make sure the specific expected messages are returned
expected_messages = [Enum.at(messages, 2), Enum.at(messages, 3)]
expected_ids = expected_messages |> Enum.map(& &1.id) |> Enum.sort()
actual_ids = streamed_messages |> Enum.map(& &1.id) |> Enum.sort()
assert actual_ids == expected_ids
end
end

describe "consumer_partition_size_bytes/1" do
Expand Down
Loading