-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement user event import from Bigquery
Rewrite the existing Python Google Cloud function as a Ruby service class. This receives an event type and a date, and makes the necessary request to Discovery Engine to import BigQuery user event data. - Add `DiscoveryEngine::UserEvents::Import` service to implement importing of user events from BigQuery along the lines of the original Python code - Add `GOOGLE_CLOUD_PROJECT_ID` app configuration - Add a rake task to call the service - Ensure tests `require` Google API namespace for stubbing see the `vertex_events_push` function on the infrastructure repo: https://github.com/alphagov/search-v2-infrastructure/blob/2991588b5dae11a20fc80432393fd134c7acb53c/terraform/environment/files/vertex_events_push/main.py Co-Authored-By: Chae Cramb <[email protected]>
- Loading branch information
Showing
6 changed files
with
233 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
module DiscoveryEngine::UserEvents | ||
# Handles importing user events from an analytics dataset in BigQuery into Discovery Engine. | ||
# | ||
# This allows Discovery Engine to "learn" from user behaviour (for example. search results clicked | ||
# or pages viewed) to deliver better results for users. | ||
# | ||
# There are several different event types, each of which have two tables in the BigQuery dataset: | ||
# one for historical data (up until the previous day), and one for "intraday" data (live data from | ||
# today but not 100% reliable). To ensure the model gets both the latest _and_ the most reliable | ||
# data, we need to import from both tables (note that event imports are idempotent, so it's okay | ||
# if the same event gets imported several times). This is done by invoking this service from a | ||
# scheduled Rake task. | ||
# | ||
# Each instance of this class is scoped to an event type and date. If the given date is today, the | ||
# data will be fetched from the event type's intraday table, otherwise the main table is used. | ||
# | ||
# The import process is asynchronous on Discovery Engine, but only takes a couple of minutes for | ||
# an average day's worth of events. For simplicity, rather than configuring a Cloud Storage bucket | ||
# for error logs, we just block until the import has completed and raise an exception if it | ||
# failed. Any failures can be viewed in the Google Cloud Console UI. | ||
# | ||
# see https://cloud.google.com/generative-ai-app-builder/docs/import-user-events | ||
class Import | ||
# The name of the BigQuery dataset where the analytics events are stored (created through data | ||
# pipelines defined in `govuk-infrastructure`) | ||
BIGQUERY_DATASET = "analytics_events_vertex".freeze | ||
|
||
# The event types we can import from the BigQuery dataset | ||
EVENT_TYPES = %w[search view-item view-item-external-link].freeze | ||
|
||
def self.import_all(date) | ||
EVENT_TYPES.each do |event_type| | ||
new(event_type, date:).call | ||
end | ||
end | ||
|
||
def initialize( | ||
event_type, | ||
date: Time.zone.yesterday, | ||
client: ::Google::Cloud::DiscoveryEngine.user_event_service(version: :v1) | ||
) | ||
raise ArgumentError, "Invalid event type: #{event_type}" unless event_type.in?(EVENT_TYPES) | ||
|
||
@event_type = event_type | ||
@date = date | ||
@client = client | ||
end | ||
|
||
def call | ||
logger.info("Triggering import_user_events operation") | ||
operation = client.import_user_events( | ||
bigquery_source: { | ||
project_id: Rails.configuration.google_cloud_project_id, | ||
dataset_id: BIGQUERY_DATASET, | ||
table_id:, | ||
partition_date:, | ||
}, | ||
parent: Rails.configuration.discovery_engine_datastore, | ||
) | ||
|
||
logger.info("Waiting for import_user_events operation to finish remotely") | ||
operation.wait_until_done! do |response| | ||
results = response.results | ||
raise results.message if response.error? | ||
|
||
count = results.joined_events_count + results.unjoined_events_count | ||
|
||
logger.info("Successfully imported #{count} user events") | ||
end | ||
end | ||
|
||
private | ||
|
||
attr_reader :event_type, :date, :client | ||
|
||
def table_id | ||
if date.today? | ||
"#{event_type}-intraday-event" | ||
else | ||
"#{event_type}-event" | ||
end | ||
end | ||
|
||
def partition_date | ||
Google::Type::Date.new(year: date.year, month: date.month, day: date.day) | ||
end | ||
|
||
def logger | ||
@logger ||= Rails.logger.tagged(self.class.name, event_type, date) | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
namespace :user_events do | ||
desc "Import yesterday's user events from the BigQuery analytics dataset" | ||
task import_yesterdays_events: :environment do | ||
DiscoveryEngine::UserEvents::Import.import_all(Time.zone.yesterday) | ||
end | ||
|
||
desc "Import today's (intraday) user events from the BigQuery analytics dataset" | ||
task import_intraday_events: :environment do | ||
DiscoveryEngine::UserEvents::Import.import_all(Time.zone.today) | ||
end | ||
|
||
desc "Import user events from the BigQuery analytics dataset for a specific date" | ||
task :import_events_for_date, [:date] => :environment do |_, args| | ||
date = Time.zone.parse(args[:date]).to_date | ||
DiscoveryEngine::UserEvents::Import.import_all(date) | ||
end | ||
end |
118 changes: 118 additions & 0 deletions
118
spec/services/discovery_engine/user_events/import_spec.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
RSpec.describe DiscoveryEngine::UserEvents::Import do | ||
subject(:import) { described_class.new(event_type, date:, client:) } | ||
|
||
let(:event_type) { "search" } | ||
let(:date) { Date.new(2000, 1, 1) } | ||
|
||
let(:client) do | ||
instance_double( | ||
::Google::Cloud::DiscoveryEngine::V1::UserEventService::Client, | ||
import_user_events: operation, | ||
) | ||
end | ||
let(:operation) { instance_double(Gapic::Operation, wait_until_done!: nil) } | ||
|
||
before do | ||
allow(Rails.configuration).to receive_messages( | ||
discovery_engine_datastore: "data/store", | ||
google_cloud_project_id: "my-fancy-project", | ||
) | ||
end | ||
|
||
describe ".import_all" do | ||
let(:importer) { instance_double(described_class, call: nil) } | ||
|
||
before do | ||
allow(described_class).to receive(:new).and_return(importer) | ||
end | ||
|
||
it "triggers an individual import for each event type with the given date" do | ||
described_class.import_all(date) | ||
|
||
expect(described_class).to have_received(:new).with("search", date:) | ||
expect(described_class).to have_received(:new).with("view-item", date:) | ||
expect(described_class).to have_received(:new).with("view-item-external-link", date:) | ||
|
||
expect(importer).to have_received(:call).exactly(3).times | ||
end | ||
end | ||
|
||
describe "#initialize" do | ||
context "with an invalid event type" do | ||
let(:event_type) { "invalid" } | ||
|
||
it "raises an error" do | ||
expect { import }.to raise_error("Invalid event type: invalid") | ||
end | ||
end | ||
end | ||
|
||
describe "#call" do | ||
before do | ||
Timecop.freeze(Time.zone.local(1989, 12, 13, 1, 2, 3)) do | ||
import.call | ||
end | ||
end | ||
|
||
context "without a date" do | ||
subject(:import) { described_class.new(event_type, client:) } | ||
|
||
it "triggers an import of yesterday's user events" do | ||
expect(client).to have_received(:import_user_events).with( | ||
bigquery_source: { | ||
project_id: "my-fancy-project", | ||
dataset_id: "analytics_events_vertex", | ||
table_id: "search-event", | ||
partition_date: Google::Type::Date.new(year: 1989, month: 12, day: 12), | ||
}, | ||
parent: "data/store", | ||
) | ||
end | ||
end | ||
|
||
context "with a specific date" do | ||
let(:date) { Date.new(2000, 1, 1) } | ||
|
||
it "triggers an import of that day's user events" do | ||
expect(client).to have_received(:import_user_events).with( | ||
bigquery_source: { | ||
project_id: "my-fancy-project", | ||
dataset_id: "analytics_events_vertex", | ||
table_id: "search-event", | ||
partition_date: Google::Type::Date.new(year: 2000, month: 1, day: 1), | ||
}, | ||
parent: "data/store", | ||
) | ||
end | ||
end | ||
|
||
context "with today's date" do | ||
let(:date) { Date.new(1989, 12, 13) } | ||
|
||
it "triggers an import of today's intraday user events" do | ||
expect(client).to have_received(:import_user_events).with( | ||
bigquery_source: { | ||
project_id: "my-fancy-project", | ||
dataset_id: "analytics_events_vertex", | ||
table_id: "search-intraday-event", | ||
partition_date: Google::Type::Date.new(year: 1989, month: 12, day: 13), | ||
}, | ||
parent: "data/store", | ||
) | ||
end | ||
end | ||
|
||
context "when an error occurs during import" do | ||
let(:date) { Date.new(2000, 1, 1) } | ||
let(:error_result) { double("error", error?: true, results: double(message: "BROKEN")) } | ||
|
||
before do | ||
allow(operation).to receive(:wait_until_done!).and_yield(error_result) | ||
end | ||
|
||
it "raises an error" do | ||
expect { import.call }.to raise_error("BROKEN") | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters