-
Notifications
You must be signed in to change notification settings - Fork 633
feat(go/plugins/firebase): added Firestore stream manager #4047
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Summary of ChangesHello @apascal07, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Genkit Go framework by introducing a durable streaming mechanism. This feature allows applications to maintain stream continuity even in the face of network interruptions or server restarts, as clients can re-subscribe to ongoing or completed streams using a unique identifier. The changes include a new generic Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a durable streaming capability, backed by an in-memory implementation and an experimental Firestore implementation. The changes are well-structured, introducing a StreamManager interface and integrating it into the HTTP server handler. I've identified some critical race conditions in both the in-memory and Firestore stream writer implementations that need to be addressed. Additionally, I have a suggestion to improve error status propagation in streaming responses and a comment on context propagation best practices. Overall, this is a great feature addition with a solid foundation.
I am having trouble creating individual review comments. Click here to see my feedback.
go/core/streaming.go (272-300)
This method has a race condition. It checks s.closed under s.mu, but then releases the lock before modifying s.state. Another goroutine can call Close() in the window between s.mu.Unlock() and s.state.mu.Lock(), causing a write to occur on a closed writer.
To fix this, the check for s.closed and the write operation need to be atomic. You can achieve this by holding s.mu for the duration of the method. This same race condition exists in Done() and Error() and should be fixed there as well.
func (s *inMemoryStreamInput) Write(chunk json.RawMessage) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return NewPublicError(FAILED_PRECONDITION, "stream writer is closed", nil)
}
s.state.mu.Lock()
defer s.state.mu.Unlock()
if s.state.status != streamStatusOpen {
return NewPublicError(FAILED_PRECONDITION, "stream has already completed", nil)
}
s.state.chunks = append(s.state.chunks, chunk)
s.state.lastTouched = time.Now()
event := StreamEvent{Type: StreamEventChunk, Chunk: chunk}
for _, ch := range s.state.subscribers {
select {
case ch <- event:
default:
// Channel full, skip (subscriber is slow)
}
}
return nil
}go/plugins/firebase/x/stream_manager.go (361-385)
This method has a race condition, similar to the one in inMemoryStreamInput.Write. It checks s.closed under s.mu, but then releases the lock before performing the Firestore update. A concurrent call to Close() can lead to a write operation on a closed writer.
To ensure atomicity, s.mu should be held for the entire duration of the method. This will serialize operations on the writer and prevent the race. This fix should also be applied to the Done() and Error() methods.
func (s *firestoreStreamInput) Write(chunk json.RawMessage) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return core.NewPublicError(core.FAILED_PRECONDITION, "stream writer is closed", nil)
}
ctx := context.Background()
_, err := s.docRef.Update(ctx, []firestore.Update{
{
Path: "stream",
Value: firestore.ArrayUnion(streamEntry{
Type: streamEventChunk,
Chunk: chunk,
UUID: uuid.New().String(),
}),
},
{
Path: "updatedAt",
Value: firestore.ServerTimestamp,
},
})
return err
}go/genkit/servers.go (371-377)
The error status is hardcoded to core.INTERNAL. It would be more informative for clients if the status from the original error was propagated. You can check if flowErr is a core.UserFacingError or core.GenkitError and use its status.
errStatus := core.INTERNAL
var ufErr *core.UserFacingError
var gkErr *core.GenkitError
if errors.As(flowErr, &ufErr) {
errStatus = ufErr.Status
} else if errors.As(flowErr, &gkErr) {
errStatus = gkErr.Status
}
resp := flowErrorResponse{
Error: &flowError{
Status: errStatus,
Message: "stream flow error",
Details: flowErr.Error(),
},
}go/plugins/firebase/x/stream_manager.go (369)
Using context.Background() for Firestore operations prevents cancellation and trace propagation from the calling context. The core.ActionStreamInput interface should be updated to accept a context.Context in its methods (Write, Done, Error) to follow Go best practices for context propagation.
While changing the interface is a larger task, it's a significant improvement for robustness and observability. For now, this implementation will not propagate cancellation or traces for stream write operations.
Added support for durable streaming using Firestore as the storage. This implementation is added to
firebase/xas an experimental feature, same ascore/x/streamingfor the core interfaces andInMemoryStreamManager.Usage:
Checklist (if applicable):