Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message deduplication #854

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
150 changes: 150 additions & 0 deletions spec/deduper_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
require "./spec_helper"
require "../src/lavinmq/deduplication.cr"

describe LavinMQ::Deduplication do
describe LavinMQ::Deduplication::MemoryCache do
it "should have a max size" do
cache = LavinMQ::Deduplication::MemoryCache(String).new(2)
cache.insert("item1")
cache.insert("item2")
cache.insert("item3")
cache.contains?("item1").should be_false
cache.contains?("item2").should be_true
cache.contains?("item3").should be_true
end

it "should store item without ttl" do
cache = LavinMQ::Deduplication::MemoryCache(String).new(10)
cache.insert("item1")
cache.contains?("item1").should be_true
cache.contains?("item2").should be_false
end

it "should respect ttl" do
cache = LavinMQ::Deduplication::MemoryCache(String).new(3)
cache.insert("item1", 1)
cache.insert("item2", 300)
cache.insert("item3")
sleep 0.2.seconds
cache.contains?("item1").should be_false
cache.contains?("item2").should be_true
cache.contains?("item3").should be_true
end
end
end

class MockCache < LavinMQ::Deduplication::Cache(AMQ::Protocol::Field)
@counter = Hash(String, Array({String, UInt32?})).new do |h, k|
h[k] = Array({String, UInt32?}).new
end

def contains?(key) : Bool
@counter["contains?"] << {key.as(String), nil}
false
end

def insert(key, ttl = nil)
@counter["insert"] << {key.as(String), ttl}
end

def calls(key : String)
@counter[key]
end
end

describe LavinMQ::Deduplication::Deduper do
describe "duplicate?" do
it "should return false if \"x-deduplication-header\" is missing (no identifier, always unique)" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock)
props = LavinMQ::AMQP::Properties.new
msg = LavinMQ::Message.new("ex", "rk", "body", props)
res = deduper.duplicate?(msg)
res.should be_false
end

it "should check cache if entry exists" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.duplicate?(msg)
mock.calls("contains?").size.should eq 1
end

it "should only insert into cache if header has a value" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new)
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
mock.calls("insert").size.should eq 0
end

it "should only insert into cache if header has a value" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
mock.calls("insert").size.should eq 1
end

it "should respect x-cache-ttl on message" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
"x-cache-ttl" => 10,
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
calls = mock.calls("insert")
calls.first[0].should eq "msg1"
calls.first[1].should eq 10
end

it "should fallback to default ttl" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock, 12)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
calls = mock.calls("insert")
calls.first[0].should eq "msg1"
calls.first[1].should eq 12
end

it "should prio message ttl over default ttl" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock, 12)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
"x-cache-ttl" => 10,
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
calls = mock.calls("insert")
calls.first[0].should eq "msg1"
calls.first[1].should eq 10
end
it "should allow checking any header for dedups" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock, 10, "custom")
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"custom" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
calls = mock.calls("insert")
calls.first[0].should eq "msg1"
calls.first[1].should eq 10
end
end
end
60 changes: 60 additions & 0 deletions spec/exchange_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,64 @@ describe LavinMQ::Exchange do
end
end
end
describe "message deduplication" do
it "should handle message deduplication" do
with_amqp_server do |s|
with_channel(s) do |ch|
args = AMQP::Client::Arguments.new({
"x-message-deduplication" => true,
"x-cache-size" => 10,
})
ch.exchange("test", "topic", args: args)
ch.queue.bind("test", "#")
ex = s.vhosts["/"].exchanges["test"]
q = s.vhosts["/"].queues.first_value
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
ex.publish(msg, false).should eq 1
ex.dedup_count.should eq 0
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
ex.publish(msg, false).should eq 0
ex.dedup_count.should eq 1

q.message_count.should eq 1
end
end
end

it "should handle message deduplication, on custom header" do
with_amqp_server do |s|
with_channel(s) do |ch|
args = AMQP::Client::Arguments.new({
"x-message-deduplication" => true,
"x-cache-size" => 10,
"x-deduplication-header" => "custom",
})
ch.exchange("test", "topic", args: args)
ch.queue.bind("test", "#")
ex = s.vhosts["/"].exchanges["test"]
q = s.vhosts["/"].queues.first_value
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"custom" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
ex.publish(msg, false).should eq 1
ex.dedup_count.should eq 0
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"custom" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
ex.publish(msg, false).should eq 0
ex.dedup_count.should eq 1

q.message_count.should eq 1
end
end
end
end
end
21 changes: 21 additions & 0 deletions spec/queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -412,4 +412,25 @@ describe LavinMQ::AMQP::Queue do
FileUtils.rm_rf tmpdir if tmpdir
end
end

describe "deduplication" do
it "should not except message if it's a duplicate" do
with_amqp_server do |s|
with_channel(s) do |ch|
queue_name = "dedup-queue"
q1 = ch.queue(queue_name, args: AMQP::Client::Arguments.new({
"x-message-deduplication" => true,
"x-cache-size" => 10,
}))
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
ch.default_exchange.publish_confirm("body", queue_name, props: props)
ch.default_exchange.publish_confirm("body", queue_name, props: props)
q1.get(no_ack: false).not_nil!.body_io.to_s.should eq "body"
q1.get(no_ack: false).should be_nil
end
end
end
end
end
19 changes: 18 additions & 1 deletion src/lavinmq/amqp/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require "./state"
require "./event"
require "./message_store"
require "../../unacked_message"
require "../../deduplication"

module LavinMQ::AMQP
class Queue < LavinMQ::Queue
Expand Down Expand Up @@ -111,7 +112,7 @@ module LavinMQ::AMQP

# Creates @[x]_count and @[x]_rate and @[y]_log
rate_stats(
{"ack", "deliver", "deliver_get", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable"},
{"ack", "deliver", "deliver_get", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable", "dedup"},
{"message_count", "unacked_count"})

getter name, arguments, vhost, consumers, last_get_time
Expand All @@ -127,6 +128,7 @@ module LavinMQ::AMQP
@data_dir : String
Log = LavinMQ::Log.for "queue"
@metadata : ::Log::Metadata
@deduper : Deduplication::Deduper?

def initialize(@vhost : VHost, @name : String,
@exclusive = false, @auto_delete = false,
Expand Down Expand Up @@ -264,6 +266,14 @@ module LavinMQ::AMQP
@single_active_consumer_queue = parse_header("x-single-active-consumer", Bool) == true
@consumer_timeout = parse_header("x-consumer-timeout", Int).try &.to_u64
validate_positive("x-consumer-timeout", @consumer_timeout)
if parse_header("x-message-deduplication", Bool)
size = parse_header("x-cache-size", Int).try(&.to_u32)
raise LavinMQ::Error::PreconditionFailed.new("Invalid x-cache-size for message deduplication") unless size
ttl = parse_header("x-cache-ttl", Int).try(&.to_u32)
header_key = parse_header("x-deduplication-header", String)
cache = Deduplication::MemoryCache(AMQ::Protocol::Field).new(size)
@deduper = Deduplication::Deduper.new(cache, ttl, header_key)
end
end

private macro parse_header(header, type)
Expand Down Expand Up @@ -414,6 +424,13 @@ module LavinMQ::AMQP

def publish(msg : Message) : Bool
return false if @deleted || @state.closed?
if d = @deduper
if d.duplicate?(msg)
@dedup_count += 1
return false
end
d.add(msg)
end
kickster97 marked this conversation as resolved.
Show resolved Hide resolved
reject_on_overflow(msg)
@msg_store_lock.synchronize do
@msg_store.push(msg)
Expand Down
66 changes: 66 additions & 0 deletions src/lavinmq/deduplication.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
module LavinMQ
module Deduplication
abstract class Cache(T)
abstract def contains?(key : T) : Bool
abstract def insert(key : T, ttl : UInt32?)
end

class MemoryCache(T) < Cache(T)
def initialize(@size : UInt32)
@store = Hash(T, Int64?).new
end
Comment on lines +8 to +11
Copy link
Member

@carlhoerberg carlhoerberg Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the purpose of the TTL is to keep the size of the cache reasonable small? So I think you should have a fiber that checks the TTL of each key and evict the ones that has expired. Not check the TTL in the contains? method.

monotonic is a better alternative when you only are interested in the timespan, not the exact point of time.

Suggested change
class MemoryCache(T) < Cache(T)
def initialize(@size : UInt32)
@store = Hash(T, Int64?).new
end
class MemoryCache(T) < Cache(T)
def initialize(@size : UInt32)
@store = Hash(T, Int64?).new
spawn(name: "memorycache expiry") do
loop do
sleep 60.seconds
@store.reject! { |_, ttd| tdd && RoughTime.monotonic > ttd }
end
end

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking more about it, maybe i'm wrong about the purpose of the TTL here.


def contains?(key : T) : Bool
return false unless @store.has_key?(key)
ttd = @store[key]
return true unless ttd
return true if ttd > RoughTime.unix_ms
@store.delete(key)
false
Comment on lines +14 to +19
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return false unless @store.has_key?(key)
ttd = @store[key]
return true unless ttd
return true if ttd > RoughTime.unix_ms
@store.delete(key)
false
@store.has_key?(key)

end

def insert(key : T, ttl : UInt32? = nil)
@store.shift if @store.size >= @size
@store[key] = ttl ? RoughTime.unix_ms + ttl : nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@store[key] = ttl ? RoughTime.unix_ms + ttl : nil
@store[key] = ttl ? RoughTime.monotonic + ttl : nil

end
end

class Deduper
DEFAULT_HEADER_KEY = "x-deduplication-header"

def initialize(@cache : Cache(AMQ::Protocol::Field), @default_ttl : UInt32? = nil,
@header_key : String? = nil)
end

def add(msg : Message)
key = dedup_key(msg)
return unless key
@cache.insert(key, dedup_ttl(msg))
end

def duplicate?(msg : Message) : Bool
key = dedup_key(msg)
return false unless key
@cache.contains?(key)
end

private def dedup_key(msg)
headers = msg.properties.headers
return unless headers
key = @header_key || DEFAULT_HEADER_KEY
headers[key]?
end

private def dedup_ttl(msg) : UInt32?
headers = msg.properties.headers
def_ttl = @default_ttl
return def_ttl unless headers
value = headers["x-cache-ttl"]?
return def_ttl unless value
value = value.try(&.as?(Int32))
return def_ttl unless value
value.to_u32 || def_ttl
end
end
end
end
Loading
Loading