Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message deduplication #854

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open

Message deduplication #854

wants to merge 12 commits into from

Conversation

snichme
Copy link
Member

@snichme snichme commented Nov 19, 2024

WHAT is this pull request doing?

Implementation for message deduplication on exchanges.
It works for any exchange type and is enabled by setting the argument x-message-deduplication = true on any exchange.

Features

  • Deduplication on Exchanges
  • Deduplication cache storage in memory
  • TTL for each message
  • Default TTL
  • Deduplication on queues

Fixes #833

HOW can this pull request be tested?

Specs.

@snichme snichme requested a review from a team as a code owner November 19, 2024 21:22
@snichme
Copy link
Member Author

snichme commented Nov 19, 2024

TODO: Expose the dedup_count as prometheus metric as well.

@snichme
Copy link
Member Author

snichme commented Nov 20, 2024

Documentation for the website: https://gist.github.com/snichme/40a33df7ea9a02c9a7d77a5c81d1f52d

src/lavinmq/amqp/queue/queue.cr Show resolved Hide resolved
src/lavinmq/exchange/dedup_ext.cr Outdated Show resolved Hide resolved
src/lavinmq/exchange/dedup_ext.cr Outdated Show resolved Hide resolved
src/lavinmq/exchange/dedup_ext.cr Outdated Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this file really be called dedup_ext.cr and be in the exchange dir? I'm thinking it doesn't seem to have much in common with other exchange types, and doesn't inherit Exchange. So maybe this should be called something like deduplication.cr instead and be in another dir?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats why I called it _ext for extension. There are mixed files in different places (like in queue/ there are are queues, stores and events) so that is why I did like this.

It's also used for both queues and exchanges so have it somewhere else makes sense. directly under /src/lavinmq maybe?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats why I called it _ext for extension.

Okay 👍

There are mixed files in different places (like in queue/ there are are queues, stores and events) so that is why I did like this.

Hmm, yeah, you're right. Maybe we should give our overall file structure some more thought! 🙂

It's also used for both queues and exchanges so have it somewhere else makes sense. directly under /src/lavinmq maybe?

Yeah, I think directly under /src/lavinmq makes more sense! And if we add more "extensions" in the future we could move those to a separate extensions dir?

Comment on lines +8 to +11
class MemoryCache(T) < Cache(T)
def initialize(@size : UInt32)
@store = Hash(T, Int64?).new
end
Copy link
Member

@carlhoerberg carlhoerberg Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the purpose of the TTL is to keep the size of the cache reasonable small? So I think you should have a fiber that checks the TTL of each key and evict the ones that has expired. Not check the TTL in the contains? method.

monotonic is a better alternative when you only are interested in the timespan, not the exact point of time.

Suggested change
class MemoryCache(T) < Cache(T)
def initialize(@size : UInt32)
@store = Hash(T, Int64?).new
end
class MemoryCache(T) < Cache(T)
def initialize(@size : UInt32)
@store = Hash(T, Int64?).new
spawn(name: "memorycache expiry") do
loop do
sleep 60.seconds
@store.reject! { |_, ttd| tdd && RoughTime.monotonic > ttd }
end
end

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking more about it, maybe i'm wrong about the purpose of the TTL here.

Comment on lines +14 to +19
return false unless @store.has_key?(key)
ttd = @store[key]
return true unless ttd
return true if ttd > RoughTime.unix_ms
@store.delete(key)
false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return false unless @store.has_key?(key)
ttd = @store[key]
return true unless ttd
return true if ttd > RoughTime.unix_ms
@store.delete(key)
false
@store.has_key?(key)


def insert(key : T, ttl : UInt32? = nil)
@store.shift if @store.size >= @size
@store[key] = ttl ? RoughTime.unix_ms + ttl : nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@store[key] = ttl ? RoughTime.unix_ms + ttl : nil
@store[key] = ttl ? RoughTime.monotonic + ttl : nil

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Message deduplication
4 participants