Skip to content

Commit

Permalink
bugfix: avoid logging non-200 response and don't publish mesasge when…
Browse files Browse the repository at this point in the history
… there is error (#573)

<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->
# Description
<!-- Briefly describe the motivation for the change. Please include
illustrations where appropriate. -->
Currently, empty message might be published when there is non-200
response, or when there is error building the Kafka log. The observation
publisher will crash as a result because the consumer currently doesn't
handle the scenario when the Kafka message value is null.

# Modifications
<!-- Summarize the key code changes. -->
- Prevent producing Kafka message when the response code is not 200, or
when there is error building the Kafka log.
- Skip message with null message for the observation publisher

# Tests
<!-- Besides the existing / updated automated tests, what specific
scenarios should be tested? Consider the backward compatibility of the
changes, whether corner cases are covered, etc. Please describe the
tests and check the ones that have been completed. Eg:
- [x] Deploying new and existing standard models
- [ ] Deploying PyFunc models
-->

# Checklist
- [ ] Added PR label
- [ ] Added unit test, integration, and/or e2e tests
- [ ] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduces API
changes

# Release Notes
<!--
Does this PR introduce a user-facing change?
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note

```
  • Loading branch information
khorshuheng committed Apr 18, 2024
1 parent 5623e85 commit 7ad999b
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
5 changes: 4 additions & 1 deletion api/pkg/inference-logger/logger/mlobs_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/rand"
"net/http"

"google.golang.org/protobuf/types/known/timestamppb"

Expand Down Expand Up @@ -168,16 +169,18 @@ func (m *MLObsSink) buildNewKafkaMessage(predictionLog *upiv1.PredictionLog) (*k

func (m *MLObsSink) Sink(rawLogEntries []*LogEntry) error {
for _, rawLogEntry := range rawLogEntries {
if rand.Float64() >= SamplingRate {
if rawLogEntry.ResponsePayload.StatusCode != http.StatusOK || rand.Float64() >= SamplingRate {
continue
}
predictionLog, err := m.newPredictionLog(rawLogEntry)
if err != nil {
m.logger.Errorf("unable to convert log entry: %v", err)
continue
}
kafkaMessage, err := m.buildNewKafkaMessage(predictionLog)
if err != nil {
m.logger.Errorf("unable to build kafka message: %v", err)
continue
}
err = m.producer.Produce(kafkaMessage, m.producer.Events())
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def poll_new_logs(self) -> List[PredictionLog]:
return [
parse_message_to_prediction_log(msg.value())
for msg in messages
if (msg is not None and msg.error() is None)
if (msg is not None and msg.error() is None and msg.value() is not None)
]

def commit(self):
Expand Down

0 comments on commit 7ad999b

Please sign in to comment.