Skip to content

Commit

Permalink
Merge pull request #237 from alphagov/redlock
Browse files Browse the repository at this point in the history
Lock documents during updates/deletes
  • Loading branch information
csutter authored Apr 5, 2024
2 parents 81fe476 + d43da1f commit 9731550
Show file tree
Hide file tree
Showing 10 changed files with 316 additions and 18 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
contents: read
security-events: write
actions: read

codeql-sast:
name: CodeQL SAST scan
uses: alphagov/govuk-infrastructure/.github/workflows/codeql-analysis.yml@main
Expand All @@ -31,7 +31,7 @@ jobs:
dependency-review:
name: Dependency Review scan
uses: alphagov/govuk-infrastructure/.github/workflows/dependency-review.yml@main

lint-ruby:
name: Lint Ruby
uses: alphagov/govuk-infrastructure/.github/workflows/rubocop.yml@main
Expand All @@ -50,11 +50,15 @@ jobs:
# All Google client library calls are mocked, but the application needs this set to boot
DISCOVERY_ENGINE_SERVING_CONFIG: not-used
DISCOVERY_ENGINE_DATASTORE_BRANCH: not-used
# Redis running through govuk-infrastructure action
REDIS_URL: redis://localhost:6379
steps:
- uses: actions/checkout@v3
with:
ref: ${{ inputs.ref || github.ref }}
- uses: ruby/setup-ruby@v1
with:
bundler-cache: true
- name: Setup Redis
uses: alphagov/govuk-infrastructure/.github/actions/setup-redis@main
- run: bin/rake
3 changes: 3 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ gem "activesupport", RAILS_GEMS_VERSION
gem "railties", RAILS_GEMS_VERSION

gem "bootsnap", require: false
gem "connection_pool"
gem "google-cloud-discovery_engine"
gem "govuk_app_config"
gem "govuk_message_queue_consumer"
gem "jsonpath"
gem "loofah"
gem "prometheus-client"
gem "redis"
gem "redlock"

group :test do
gem "grpc_mock"
Expand Down
9 changes: 9 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,12 @@ GEM
rbtree (0.4.6)
rdoc (6.6.3.1)
psych (>= 4.0.0)
redis (5.1.0)
redis-client (>= 0.17.0)
redis-client (0.21.0)
connection_pool
redlock (2.0.6)
redis-client (>= 0.14.1, < 1.0.0)
regexp_parser (2.9.0)
reline (0.5.0)
io-console (~> 0.5)
Expand Down Expand Up @@ -551,6 +557,7 @@ DEPENDENCIES
activesupport (~> 7.1.3)
bootsnap
brakeman
connection_pool
debug
google-cloud-discovery_engine
govuk_app_config
Expand All @@ -562,6 +569,8 @@ DEPENDENCIES
loofah
prometheus-client
railties (~> 7.1.3)
redis
redlock
rspec-rails
rubocop-govuk
simplecov
Expand Down
19 changes: 18 additions & 1 deletion app/services/discovery_engine/sync/delete.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
module DiscoveryEngine::Sync
class Delete
include DocumentName
include Locking
include Logging

def initialize(client: ::Google::Cloud::DiscoveryEngine.document_service(version: :v1))
@client = client
end

def call(content_id, payload_version: nil)
client.delete_document(name: document_name(content_id))
with_locked_document(content_id, payload_version:) do
if payload_newer_than_remote?(content_id, payload_version:)
log(
Logger::Severity::INFO,
"Ignored as newer version (#{latest_synced_version(content_id)}) already synced",
content_id:, payload_version:,
)
Metrics::Exported.increment_counter(
:discovery_engine_requests, type: "delete", status: "ignored_outdated"
)
return
end

client.delete_document(name: document_name(content_id))

set_latest_synced_version(content_id, payload_version)
end

log(Logger::Severity::INFO, "Successfully deleted", content_id:, payload_version:)
Metrics::Exported.increment_counter(
Expand Down
90 changes: 90 additions & 0 deletions app/services/discovery_engine/sync/locking.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
module DiscoveryEngine::Sync
# Mixin providing a mechanism for sync operations (`Put`, `Delete`) to lock documents while they
# are being operated on so that other workers cannot simultaneously update the same document.
module Locking
# Redis key prefix for locks and versions
LOCK_KEY_PREFIX = "search_api_v2:sync_lock".freeze
VERSION_KEY_PREFIX = "search_api_v2:latest_synced_version".freeze

# Time-to-live for document locks if not explicitly released
DOCUMENT_LOCK_TTL = 30.seconds.in_milliseconds

# Options for Redlock client around handling retries
RETRY_COUNT = 10
RETRY_DELAY = 5.seconds.in_milliseconds
RETRY_JITTER = 5.seconds.in_milliseconds

# Locks a document while a critical section block is executed to avoid multiple workers
# competing to update the same document.
def with_locked_document(content_id, payload_version:, &critical_section)
redlock_client.lock!(
"#{LOCK_KEY_PREFIX}:#{content_id}",
DOCUMENT_LOCK_TTL,
) do
Rails.logger.add(
Logger::Severity::INFO,
"[#{self.class.name}] Lock acquired for document: #{content_id}, " \
"payload_version: #{payload_version}",
)

critical_section.call

Rails.logger.add(
Logger::Severity::INFO,
"[#{self.class.name}] Releasing lock for document: #{content_id}, " \
"payload_version: #{payload_version}",
)
end
rescue Redlock::LockError => e
# This should be a very rare occurrence (for example, if/when Redis is down). Our "least
# worst" fallback option is to perform the action anyway without the lock (which is the
# previous behaviour from before we had this locking mechanism).
Rails.logger.add(
Logger::Severity::ERROR,
"[#{self.class.name}] Failed to acquire lock for document: #{content_id}, " \
"payload_version: #{payload_version}. Continuing without lock.",
)
GovukError.notify(e)

critical_section.call
end

def payload_newer_than_remote?(content_id, payload_version:)
# Sense check: This shouldn't ever come through as nil from Publishing API, but if it does,
# the only really useful thing we can do is ignore this check entirely because we can't
# meaningfully make a comparison.
return true if payload_version.nil?

# If there is no remote version yet, our version is always newer by definition
remote_version = latest_synced_version(content_id)
return true if remote_version.nil?

remote_version.to_i >= payload_version.to_i
end

# Gets the latest synced version for a document from Redis
def latest_synced_version(content_id)
Rails.application.config.redis_pool.with do |redis|
redis.get("#{VERSION_KEY_PREFIX}:#{content_id}")&.to_i
end
end

# Sets the latest synced version for a document in Redis
def set_latest_synced_version(content_id, version)
Rails.application.config.redis_pool.with do |redis|
redis.set("#{VERSION_KEY_PREFIX}:#{content_id}", version)
end
end

private

def redlock_client
@redlock_client ||= Redlock::Client.new(
Rails.configuration.redlock_redis_instances,
retry_count: RETRY_COUNT,
retry_delay: RETRY_DELAY,
retry_jitter: RETRY_JITTER,
)
end
end
end
41 changes: 29 additions & 12 deletions app/services/discovery_engine/sync/put.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,43 @@ class Put
MIME_TYPE = "text/html".freeze

include DocumentName
include Locking
include Logging

def initialize(client: ::Google::Cloud::DiscoveryEngine.document_service(version: :v1))
@client = client
end

def call(content_id, metadata, content: "", payload_version: nil)
client.update_document(
document: {
id: content_id,
name: document_name(content_id),
json_data: metadata.merge(payload_version:).to_json,
content: {
mime_type: MIME_TYPE,
# The Google client expects an IO object to extract raw byte content from
raw_bytes: StringIO.new(content),
with_locked_document(content_id, payload_version:) do
if payload_newer_than_remote?(content_id, payload_version:)
log(
Logger::Severity::INFO,
"Ignored as newer version (#{latest_synced_version(content_id)}) already synced",
content_id:, payload_version:,
)
Metrics::Exported.increment_counter(
:discovery_engine_requests, type: "put", status: "ignored_outdated"
)
return
end

client.update_document(
document: {
id: content_id,
name: document_name(content_id),
json_data: metadata.merge(payload_version:).to_json,
content: {
mime_type: MIME_TYPE,
# The Google client expects an IO object to extract raw byte content from
raw_bytes: StringIO.new(content),
},
},
},
allow_missing: true,
)
allow_missing: true,
)

set_latest_synced_version(content_id, payload_version)
end

log(Logger::Severity::INFO, "Successfully added/updated", content_id:, payload_version:)
Metrics::Exported.increment_counter(
Expand Down
10 changes: 10 additions & 0 deletions config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,15 @@ class Application < Rails::Application

# Query configuration
config.best_bets = config_for(:best_bets)

# Redis configuration
config.redis_url = ENV.fetch("REDIS_URL")
config.redis_pool = ConnectionPool.new(size: 5, timeout: 5) { Redis.new(url: config.redis_url) }

# Redlock configuration
## Note: Redlock allows us to specify multiple Redis URLs for distributed locking, but we're
## currently only using a single instance (the Publishing "shared" Redis). If we ever need to
## use multiple Redis instances, this is the only place that needs updating.
config.redlock_redis_instances = [config.redis_url]
end
end
67 changes: 65 additions & 2 deletions spec/services/discovery_engine/sync/delete_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@

let(:client) { double("DocumentService::Client", delete_document: nil) }
let(:logger) { double("Logger", add: nil) }
let(:redlock_client) { instance_double(Redlock::Client) }
let(:redis_client) { instance_double(Redis, get: "0", set: nil) }

before do
allow(Rails).to receive(:logger).and_return(logger)
allow(Rails.configuration).to receive(:discovery_engine_datastore_branch).and_return("branch")
allow(GovukError).to receive(:notify)

allow(Redlock::Client).to receive(:new).and_return(redlock_client)
allow(redlock_client).to receive(:lock!)
.with("search_api_v2:sync_lock:some_content_id", anything)
.and_yield

allow(Rails.application.config.redis_pool).to receive(:with).and_yield(redis_client)
end

context "when the delete succeeds" do
before do
allow(client).to receive(:delete_document)

delete.call("some_content_id", payload_version: "1")
end

Expand All @@ -22,6 +29,13 @@
.with(name: "branch/documents/some_content_id")
end

it "sets the new latest remote version" do
expect(redis_client).to have_received(:set).with(
"search_api_v2:latest_synced_version:some_content_id",
"1",
)
end

it "logs the delete operation" do
expect(logger).to have_received(:add).with(
Logger::Severity::INFO,
Expand All @@ -30,6 +44,55 @@
end
end

context "when the incoming document is older than the remote version" do
before do
allow(redis_client).to receive(:get)
.with("search_api_v2:latest_synced_version:some_content_id").and_return("42")

delete.call("some_content_id", payload_version: "1")
end

it "does not delete the document" do
expect(client).not_to have_received(:delete_document)
end

it "does not set the remote version" do
expect(redis_client).not_to have_received(:set)
end

it "logs that the document hasn't been deleted" do
expect(logger).to have_received(:add).with(
Logger::Severity::INFO,
"[DiscoveryEngine::Sync::Delete] Ignored as newer version (42) already synced content_id:some_content_id payload_version:1",
)
end
end

context "when locking the document fails" do
let(:error) { Redlock::LockError.new("resource") }

before do
allow(redlock_client).to receive(:lock!).and_raise(error)

delete.call("some_content_id", payload_version: "1")
end

it "deletes the document regardless" do
expect(client).to have_received(:delete_document)
end

it "logs the failure" do
expect(logger).to have_received(:add).with(
Logger::Severity::ERROR,
"[DiscoveryEngine::Sync::Delete] Failed to acquire lock for document: some_content_id, payload_version: 1. Continuing without lock.",
)
end

it "sends the error to Sentry" do
expect(GovukError).to have_received(:notify).with(error)
end
end

context "when the delete fails because the document doesn't exist" do
let(:err) { Google::Cloud::NotFoundError.new("It got lost") }

Expand Down
Loading

0 comments on commit 9731550

Please sign in to comment.