Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement user event import from Bigquery #351

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:
# Rails env rather than `development`
RAILS_ENV: test
# All Google client library calls are mocked, but the application needs this set to boot
GOOGLE_CLOUD_PROJECT_ID: not-used
DISCOVERY_ENGINE_SERVING_CONFIG: not-used
DISCOVERY_ENGINE_DATASTORE: not-used
DISCOVERY_ENGINE_DATASTORE_BRANCH: not-used
Expand Down
90 changes: 90 additions & 0 deletions app/services/discovery_engine/user_events/import.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
module DiscoveryEngine::UserEvents
# Handles importing user events from an analytics dataset in BigQuery into Discovery Engine.
#
# This allows Discovery Engine to "learn" from user behaviour (for example. search results clicked
# or pages viewed) to deliver better results for users.
#
# There are several different event types, each of which have two tables in the BigQuery dataset:
# one for historical data (up until the previous day), and one for "intraday" data (live data from
# today but not 100% reliable). To ensure the model gets both the latest _and_ the most reliable
# data, we need to import from both tables (note that event imports are idempotent, so it's okay
# if the same event gets imported several times). This is done by invoking this service from a
# scheduled Rake task.
#
# Each instance of this class is scoped to an event type and date. If the given date is today, the
# data will be fetched from the event type's intraday table, otherwise the main table is used.
#
# The import process is asynchronous on Discovery Engine, but only takes a couple of minutes for
# an average day's worth of events. For simplicity, rather than configuring a Cloud Storage bucket
# for error logs, we just block until the import has completed and raise an exception if it
# failed. Any failures can be viewed in the Google Cloud Console UI.
#
# see https://cloud.google.com/generative-ai-app-builder/docs/import-user-events
class Import
# The name of the BigQuery dataset where the analytics events are stored (created through data
# pipelines defined in `govuk-infrastructure`)
BIGQUERY_DATASET = "analytics_events_vertex".freeze

# The event types we can import from the BigQuery dataset
EVENT_TYPES = %w[search view-item view-item-external-link].freeze

def self.import_all(date)
EVENT_TYPES.each do |event_type|
new(event_type, date:).call
end
end

def initialize(
event_type,
date:,
client: ::Google::Cloud::DiscoveryEngine.user_event_service(version: :v1)
)
@event_type = event_type
@date = date
@client = client
end

def call
logger.info("Triggering import_user_events operation")
operation = client.import_user_events(
bigquery_source: {
project_id: Rails.configuration.google_cloud_project_id,
dataset_id: BIGQUERY_DATASET,
table_id:,
partition_date:,
},
parent: Rails.configuration.discovery_engine_datastore,
)

logger.info("Waiting for import_user_events operation to finish remotely")
operation.wait_until_done! do |response|
results = response.results
raise results.message if response.error?

count = results.joined_events_count + results.unjoined_events_count

logger.info("Successfully imported #{count} user events")
end
end

private

attr_reader :event_type, :date, :client

def table_id
if date.today?
"#{event_type}-intraday-event"
else
"#{event_type}-event"
end
end

def partition_date
Google::Type::Date.new(year: date.year, month: date.month, day: date.day)
end

def logger
@logger ||= Rails.logger.tagged(self.class.name, event_type, date)
end
end
end
1 change: 1 addition & 0 deletions config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class Application < Rails::Application
config.discovery_engine_serving_config = ENV.fetch("DISCOVERY_ENGINE_SERVING_CONFIG")
config.discovery_engine_datastore = ENV.fetch("DISCOVERY_ENGINE_DATASTORE")
config.discovery_engine_datastore_branch = ENV.fetch("DISCOVERY_ENGINE_DATASTORE_BRANCH")
config.google_cloud_project_id = ENV.fetch("GOOGLE_CLOUD_PROJECT_ID")

# Document sync configuration
config.document_type_ignorelist = config_for(:document_type_ignorelist)
Expand Down
17 changes: 17 additions & 0 deletions lib/tasks/user_events.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace :user_events do
desc "Import yesterday's user events from the BigQuery analytics dataset"
task import_yesterdays_events: :environment do
DiscoveryEngine::UserEvents::Import.import_all(Time.zone.yesterday)
end

desc "Import today's (intraday) user events from the BigQuery analytics dataset"
task import_intraday_events: :environment do
DiscoveryEngine::UserEvents::Import.import_all(Time.zone.today)
end

desc "Import user events from the BigQuery analytics dataset for a specific date"
task :import_events_for_date, [:date] => :environment do |_, args|
date = Time.zone.parse(args[:date]).to_date
DiscoveryEngine::UserEvents::Import.import_all(date)
end
end
92 changes: 92 additions & 0 deletions spec/services/discovery_engine/user_events/import_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
RSpec.describe DiscoveryEngine::UserEvents::Import do
subject(:import) { described_class.new(event_type, date:, client:) }

let(:event_type) { "search" }
let(:date) { Date.new(2000, 1, 1) }

let(:client) do
instance_double(
::Google::Cloud::DiscoveryEngine::V1::UserEventService::Client,
import_user_events: operation,
)
end
let(:operation) { instance_double(Gapic::Operation, wait_until_done!: nil) }

before do
allow(Rails.configuration).to receive_messages(
discovery_engine_datastore: "data/store",
google_cloud_project_id: "my-fancy-project",
)
end

describe ".import_all" do
let(:importer) { instance_double(described_class, call: nil) }

before do
allow(described_class).to receive(:new).and_return(importer)
end

it "triggers an individual import for each event type with the given date" do
described_class.import_all(date)

expect(described_class).to have_received(:new).with("search", date:)
expect(described_class).to have_received(:new).with("view-item", date:)
expect(described_class).to have_received(:new).with("view-item-external-link", date:)

expect(importer).to have_received(:call).exactly(3).times
end
end

describe "#call" do
before do
Timecop.freeze(Time.zone.local(1989, 12, 13, 1, 2, 3)) do
import.call
end
end

context "with a specific date" do
let(:date) { Date.new(2000, 1, 1) }

it "triggers an import of that day's user events" do
expect(client).to have_received(:import_user_events).with(
bigquery_source: {
project_id: "my-fancy-project",
dataset_id: "analytics_events_vertex",
table_id: "search-event",
partition_date: Google::Type::Date.new(year: 2000, month: 1, day: 1),
},
parent: "data/store",
)
end
end

context "with today's date" do
let(:date) { Date.new(1989, 12, 13) }

it "triggers an import of today's intraday user events" do
expect(client).to have_received(:import_user_events).with(
bigquery_source: {
project_id: "my-fancy-project",
dataset_id: "analytics_events_vertex",
table_id: "search-intraday-event",
partition_date: Google::Type::Date.new(year: 1989, month: 12, day: 13),
},
parent: "data/store",
)
end
end

context "when an error occurs during import" do
let(:date) { Date.new(2000, 1, 1) }
let(:error_result) { double("error", error?: true, results: double(message: "BROKEN")) }

before do
allow(operation).to receive(:wait_until_done!).and_yield(error_result)
end

it "raises an error" do
expect { import.call }.to raise_error("BROKEN")
end
end
end
end
4 changes: 4 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

require "grpc_mock/rspec"

# Required to be able to stub Google classes in tests (as classes from the `v1` namespace are not
# used directly in non-test code, they are not loaded by default)
require "google/cloud/discovery_engine/v1"

Timecop.safe_mode = true

require "redlock/testing"
Expand Down