Skip to content

Commit

Permalink
Implement user event import from Bigquery
Browse files Browse the repository at this point in the history
Rewrite the existing Python Google Cloud function as a Ruby service
class. This receives an event type and a date, and makes the necessary
request to Discovery Engine to import BigQuery user event data.

- Add `DiscoveryEngine::UserEvents::Import` service to implement
  importing of user events from BigQuery along the lines of the original
  Python code
- Add `GOOGLE_CLOUD_PROJECT_ID` app configuration
- Add a rake task to call the service
- Ensure tests `require` Google API namespace for stubbing

see the `vertex_events_push` function on the infrastructure repo:
https://github.com/alphagov/search-v2-infrastructure/blob/2991588b5dae11a20fc80432393fd134c7acb53c/terraform/environment/files/vertex_events_push/main.py

Co-Authored-By: Chae Cramb <[email protected]>
  • Loading branch information
csutter and chaecramb committed Nov 27, 2024
1 parent 1e6f24c commit 5cb1cba
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 0 deletions.
92 changes: 92 additions & 0 deletions app/services/discovery_engine/user_events/import.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
module DiscoveryEngine::UserEvents
# Handles importing user events from an analytics dataset in BigQuery into Discovery Engine.
#
# This allows Discovery Engine to "learn" from user behaviour (for example. search results clicked
# or pages viewed) to deliver better results for users.
#
# There are several different event types, each of which have two tables in the BigQuery dataset:
# one for historical data (up until the previous day), and one for "intraday" data (live data from
# today but not 100% reliable). To ensure the model gets both the latest _and_ the most reliable
# data, we need to import from both tables (note that event imports are idempotent, so it's okay
# if the same event gets imported several times). This is done by invoking this service from a
# scheduled Rake task.
#
# Each instance of this class is scoped to an event type and date. If the given date is today, the
# data will be fetched from the event type's intraday table, otherwise the main table is used.
#
# The import process is asynchronous on Discovery Engine, but only takes a couple of minutes for
# an average day's worth of events. For simplicity, rather than configuring a Cloud Storage bucket
# for error logs, we just block until the import has completed and raise an exception if it
# failed. Any failures can be viewed in the Google Cloud Console UI.
#
# see https://cloud.google.com/generative-ai-app-builder/docs/import-user-events
class Import
# The name of the BigQuery dataset where the analytics events are stored (created through data
# pipelines defined in `govuk-infrastructure`)
BIGQUERY_DATASET = "analytics_events_vertex".freeze

# The event types we can import from the BigQuery dataset
EVENT_TYPES = %w[search view-item view-item-external-link].freeze

def self.import_all(date)
EVENT_TYPES.each do |event_type|
new(event_type, date:).call
end
end

def initialize(
event_type,
date: Time.zone.yesterday,
client: ::Google::Cloud::DiscoveryEngine.user_event_service(version: :v1)
)
raise ArgumentError, "Invalid event type: #{event_type}" unless event_type.in?(EVENT_TYPES)

@event_type = event_type
@date = date
@client = client
end

def call
logger.info("Triggering import_user_events operation")
operation = client.import_user_events(
bigquery_source: {
project_id: Rails.configuration.google_cloud_project_id,
dataset_id: BIGQUERY_DATASET,
table_id:,
partition_date:,
},
parent: Rails.configuration.discovery_engine_datastore,
)

logger.info("Waiting for import_user_events operation to finish remotely")
operation.wait_until_done! do |response|
results = response.results
raise results.message if response.error?

count = results.joined_events_count + results.unjoined_events_count

logger.info("Successfully imported #{count} user events")
end
end

private

attr_reader :event_type, :date, :client

def table_id
if date.today?
"#{event_type}-intraday-event"
else
"#{event_type}-event"
end
end

def partition_date
Google::Type::Date.new(year: date.year, month: date.month, day: date.day)
end

def logger
@logger ||= Rails.logger.tagged(self.class.name, event_type, date)
end
end
end
1 change: 1 addition & 0 deletions config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class Application < Rails::Application
config.discovery_engine_serving_config = ENV.fetch("DISCOVERY_ENGINE_SERVING_CONFIG")
config.discovery_engine_datastore = ENV.fetch("DISCOVERY_ENGINE_DATASTORE")
config.discovery_engine_datastore_branch = ENV.fetch("DISCOVERY_ENGINE_DATASTORE_BRANCH")
config.google_cloud_project_id = ENV.fetch("GOOGLE_CLOUD_PROJECT_ID")

# Document sync configuration
config.document_type_ignorelist = config_for(:document_type_ignorelist)
Expand Down
11 changes: 11 additions & 0 deletions lib/tasks/user_events.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace :user_events do
desc "Import yesterday's user events from the BigQuery analytics dataset"
task import_yesterdays_events: :environment do
DiscoveryEngine::UserEvents::Import.import_all(Time.zone.yesterday)
end

desc "Import today's (intraday) user events from the BigQuery analytics dataset"
task import_intraday_events: :environment do
DiscoveryEngine::UserEvents::Import.import_all(Time.zone.today)
end
end
118 changes: 118 additions & 0 deletions spec/services/discovery_engine/user_events/import_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
RSpec.describe DiscoveryEngine::UserEvents::Import do
subject(:import) { described_class.new(event_type, date:, client:) }

let(:event_type) { "search" }
let(:date) { Date.new(2000, 1, 1) }

let(:client) do
instance_double(
::Google::Cloud::DiscoveryEngine::V1::UserEventService::Client,
import_user_events: operation,
)
end
let(:operation) { instance_double(Gapic::Operation, wait_until_done!: nil) }

before do
allow(Rails.configuration).to receive_messages(
discovery_engine_datastore: "data/store",
google_cloud_project_id: "my-fancy-project",
)
end

describe ".import_all" do
let(:importer) { instance_double(described_class, call: nil) }

before do
allow(described_class).to receive(:new).and_return(importer)
end

it "triggers an individual import for each event type with the given date" do
described_class.import_all(date)

expect(described_class).to have_received(:new).with("search", date:)
expect(described_class).to have_received(:new).with("view-item", date:)
expect(described_class).to have_received(:new).with("view-item-external-link", date:)

expect(importer).to have_received(:call).exactly(3).times
end
end

describe "#initialize" do
context "with an invalid event type" do
let(:event_type) { "invalid" }

it "raises an error" do
expect { import }.to raise_error("Invalid event type: invalid")
end
end
end

describe "#call" do
before do
Timecop.freeze(Time.zone.local(1989, 12, 13, 1, 2, 3)) do
import.call
end
end

context "without a date" do
subject(:import) { described_class.new(event_type, client:) }

it "triggers an import of yesterday's user events" do
expect(client).to have_received(:import_user_events).with(
bigquery_source: {
project_id: "my-fancy-project",
dataset_id: "analytics_events_vertex",
table_id: "search-event",
partition_date: Google::Type::Date.new(year: 1989, month: 12, day: 12),
},
parent: "data/store",
)
end
end

context "with a specific date" do
let(:date) { Date.new(2000, 1, 1) }

it "triggers an import of that day's user events" do
expect(client).to have_received(:import_user_events).with(
bigquery_source: {
project_id: "my-fancy-project",
dataset_id: "analytics_events_vertex",
table_id: "search-event",
partition_date: Google::Type::Date.new(year: 2000, month: 1, day: 1),
},
parent: "data/store",
)
end
end

context "with today's date" do
let(:date) { Date.new(1989, 12, 13) }

it "triggers an import of today's intraday user events" do
expect(client).to have_received(:import_user_events).with(
bigquery_source: {
project_id: "my-fancy-project",
dataset_id: "analytics_events_vertex",
table_id: "search-intraday-event",
partition_date: Google::Type::Date.new(year: 1989, month: 12, day: 13),
},
parent: "data/store",
)
end
end

context "when an error occurs during import" do
let(:date) { Date.new(2000, 1, 1) }
let(:error_result) { double("error", error?: true, results: double(message: "BROKEN")) }

before do
allow(operation).to receive(:wait_until_done!).and_yield(error_result)
end

it "raises an error" do
expect { import.call }.to raise_error("BROKEN")
end
end
end
end
4 changes: 4 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

require "grpc_mock/rspec"

# Required to be able to stub Google classes (the actual code doesn't the `v1` namespace directly,
# but rather sets up clients from the main namespace which requires this internally)
require "google/cloud/discovery_engine/v1"

Timecop.safe_mode = true

require "redlock/testing"
Expand Down

0 comments on commit 5cb1cba

Please sign in to comment.