-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: update blockbuilder to use scheduler for fetching jobs #15224
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few things to fix, but giving approval to unblock you.
@@ -29,6 +29,7 @@ message GetJobResponse { | |||
message CompleteJobRequest { | |||
string builder_id = 1; | |||
Job job = 2; | |||
int64 LastConsumedOffset = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this? Jobs are completed all-or-nothing because tsdbs are built at the end. The job itself contains the offset range
|
||
lastConsumedOffset, err := i.processJob(ctx, job, logger) | ||
// TODO: pass lastConsumedOffset as a separate field | ||
job.Offsets.Max = lastConsumedOffset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's much simpler if the jobs are predetermined at the scheduler. This was the initial design and although it does introduce a bit of lag (we only process offsets known at the time of job creation), I think the simplicity & separation of concerns are more beneficial (at least for now).
|
||
exists, job, err := i.jobController.LoadJob(ctx) | ||
func (i *BlockBuilder) runOne(ctx context.Context, workerID string) error { | ||
// assuming GetJob blocks/polls until a job is available |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect we'll need to retry when there are no jobs here, but as you said it's also possible the transport handles this
if err != nil { | ||
return nil, err | ||
readerFactory := func(partitionID int32) (partition.Reader, error) { | ||
return partition.NewKafkaReader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will panic b/c it's creating new clients each time, each which use the same metrics namespacing internally. Instead, we could create a single client which creates cheap copies via a WithPartition(x) -> Self
or similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if it is safe to make copies of kgo.Client
. we'd need separate instances of it as we mutate it while setting offset for consumption.
working around this by registering metrics only once 786186a.
What this PR does / why we need it:
syncJob
to update the status of inflight jobsslimgester
toblockbuilder
appender.go
Special notes for your reviewer:
I renamed
slimgester.go
toblockbuilder.go
in the last commit, so github does not show the diff anymore and treats it as a new file. Please check the first two commits to view the new changes made in builder.goChecklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.md
deprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR