Skip to content

Commit

Permalink
Merge pull request #85 from Lenart12/main
Browse files Browse the repository at this point in the history
API context propagation
  • Loading branch information
wimaha authored Jan 14, 2025
2 parents 5ce0e24 + 539f228 commit 954dea0
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 56 deletions.
23 changes: 17 additions & 6 deletions internal/api/handlers/tesla.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ func commonDefer(w http.ResponseWriter, response *models.Response) {
ret.Response = *response

w.Header().Set("Content-Type", "application/json")
if response.Result {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
status := http.StatusOK
if !response.Result {
status = http.StatusServiceUnavailable
}
w.WriteHeader(status)
if err := json.NewEncoder(w).Encode(ret); err != nil {
log.Fatal("failed to send response", "error", err)
}
log.Debug("response", "command", response.Command, "status", status, "result", response.Result, "reason", response.Reason)
}

func checkBleControl(response *models.Response) bool {
Expand All @@ -43,6 +44,7 @@ func checkBleControl(response *models.Response) bool {
}

func Command(w http.ResponseWriter, r *http.Request) {
ShowRequest(r, "Command")
params := mux.Vars(r)
vin := params["vin"]
command := params["command"]
Expand Down Expand Up @@ -78,6 +80,7 @@ func Command(w http.ResponseWriter, r *http.Request) {
var apiResponse models.ApiResponse
wg := sync.WaitGroup{}
apiResponse.Wait = &wg
apiResponse.Ctx = r.Context()

wg.Add(1)
control.BleControlInstance.PushCommand(command, vin, body, &apiResponse)
Expand All @@ -101,6 +104,7 @@ func Command(w http.ResponseWriter, r *http.Request) {
}

func VehicleData(w http.ResponseWriter, r *http.Request) {
ShowRequest(r, "VehicleData")
params := mux.Vars(r)
vin := params["vin"]
command := "vehicle_data"
Expand Down Expand Up @@ -135,6 +139,7 @@ func VehicleData(w http.ResponseWriter, r *http.Request) {
var apiResponse models.ApiResponse
wg := sync.WaitGroup{}
apiResponse.Wait = &wg
apiResponse.Ctx = r.Context()

wg.Add(1)
control.BleControlInstance.PushCommand(command, vin, map[string]interface{}{"endpoints": endpoints}, &apiResponse)
Expand All @@ -154,6 +159,7 @@ func VehicleData(w http.ResponseWriter, r *http.Request) {
}

func BodyControllerState(w http.ResponseWriter, r *http.Request) {
ShowRequest(r, "BodyControllerState")
params := mux.Vars(r)
vin := params["vin"]

Expand All @@ -169,7 +175,8 @@ func BodyControllerState(w http.ResponseWriter, r *http.Request) {

var apiResponse models.ApiResponse

ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
apiResponse.Ctx = ctx
defer cancel()
cmd := &commands.Command{
Command: "body-controller-state",
Expand All @@ -185,7 +192,7 @@ func BodyControllerState(w http.ResponseWriter, r *http.Request) {
defer car.Disconnect()
defer log.Debug("disconnect vehicle (A)")

_, err := control.BleControlInstance.ExecuteCommand(car, cmd)
_, err, _ := control.BleControlInstance.ExecuteCommand(car, cmd, context.Background())
if err != nil {
response.Result = false
response.Reason = err.Error()
Expand All @@ -208,6 +215,10 @@ func BodyControllerState(w http.ResponseWriter, r *http.Request) {
}
}

func ShowRequest(r *http.Request, handler string) {
log.Debug("received", "handler", handler, "method", r.Method, "url", r.URL, "from", r.RemoteAddr)
}

func SetCacheControl(w http.ResponseWriter, maxAge int) {
if maxAge > 0 {
w.Header().Set("Cache-Control", fmt.Sprintf("public, max-age=%d, must-revalidate", maxAge))
Expand Down
2 changes: 2 additions & 0 deletions internal/api/models/apiResponse.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package models

import (
"context"
"encoding/json"
"sync"
)
Expand All @@ -10,4 +11,5 @@ type ApiResponse struct {
Result bool
Error string
Response json.RawMessage
Ctx context.Context
}
157 changes: 108 additions & 49 deletions internal/ble/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ func (bc *BleControl) Loop() {
for {
time.Sleep(1 * time.Second)
if retryCommand != nil {
log.Debug("retrying command from loop", "command", retryCommand.Command, "body", retryCommand.Body)
retryCommand = bc.connectToVehicleAndOperateConnection(retryCommand)
} else {
log.Debug("waiting for command")
// Wait for the next command
select {
case command, ok := <-bc.providerStack:
Expand All @@ -89,20 +91,50 @@ func (bc *BleControl) PushCommand(command string, vin string, body map[string]in

func (bc *BleControl) connectToVehicleAndOperateConnection(firstCommand *commands.Command) *commands.Command {
log.Info("connecting to Vehicle ...")
defer log.Debug("connecting to Vehicle done")

var sleep = 3 * time.Second
var retryCount = 3
var lastErr error

commandError := func(err error) *commands.Command {
log.Error("can't connect to vehicle", "error", err)
if firstCommand.Response != nil {
firstCommand.Response.Error = err.Error()
firstCommand.Response.Result = false
if firstCommand.Response.Wait != nil {
firstCommand.Response.Wait.Done()
}
}
return nil
}

var parentCtx context.Context
if firstCommand.Response != nil && firstCommand.Response.Ctx != nil {
parentCtx = firstCommand.Response.Ctx
if parentCtx.Err() != nil {
return commandError(parentCtx.Err())
}
} else {
if firstCommand.Response != nil {
log.Warn("no context provided, using default", "command", firstCommand.Command, "body", firstCommand.Body)
}
parentCtx = context.Background()
}

for i := 0; i < retryCount; i++ {
if i > 0 {
log.Warn(lastErr)
log.Info(fmt.Sprintf("retrying in %d seconds", sleep/time.Second))
time.Sleep(sleep)
select {
case <-time.After(sleep):
case <-parentCtx.Done():
return commandError(parentCtx.Err())
}
sleep *= 2
}
log.Debug("trying connecting to vehicle", "attempt", i+1)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
ctx, cancel := context.WithTimeout(parentCtx, 15*time.Second)
defer cancel()
conn, car, retry, err := bc.TryConnectToVehicle(ctx, firstCommand)
if err == nil {
Expand All @@ -113,30 +145,15 @@ func (bc *BleControl) connectToVehicleAndOperateConnection(firstCommand *command
defer log.Debug("disconnect vehicle (A)")
cmd := bc.operateConnection(car, firstCommand)
return cmd
} else if !retry {
} else if !retry || parentCtx.Err() != nil {
//Failed but no retry possible
log.Error("can't connect to vehicle", "error", err)
if firstCommand.Response != nil {
firstCommand.Response.Error = err.Error()
firstCommand.Response.Result = false
if firstCommand.Response.Wait != nil {
firstCommand.Response.Wait.Done()
}
}
return nil
return commandError(err)
} else {
lastErr = err
}
}
log.Error(fmt.Sprintf("stop retrying after %d attempts", retryCount), "error", lastErr)
if firstCommand.Response != nil {
firstCommand.Response.Error = lastErr.Error()
firstCommand.Response.Result = false
if firstCommand.Response.Wait != nil {
firstCommand.Response.Wait.Done()
}
}
return nil
return commandError(lastErr)
}

func (bc *BleControl) TryConnectToVehicle(ctx context.Context, firstCommand *commands.Command) (*ble.Connection, *vehicle.Vehicle, bool, error) {
Expand Down Expand Up @@ -220,57 +237,79 @@ func (bc *BleControl) TryConnectToVehicle(ctx context.Context, firstCommand *com
}

func (bc *BleControl) operateConnection(car *vehicle.Vehicle, firstCommand *commands.Command) *commands.Command {
log.Debug("operating connection ...")
defer log.Debug("operating connection done")
connectionCtx, cancel := context.WithTimeout(context.Background(), 29*time.Second)
defer cancel()

if firstCommand.Command != "wake_up" {
cmd, err := bc.ExecuteCommand(car, firstCommand)
cmd, err, _ := bc.ExecuteCommand(car, firstCommand, connectionCtx)
if err != nil {
return cmd
}
}

timeout := time.After(29 * time.Second)
handleCommand := func(command *commands.Command) (doReturn bool, retryCommand *commands.Command) {
//If new VIN, close connection
if command.Vin != firstCommand.Vin {
log.Debug("new VIN, so close connection")
return true, command
}

cmd, err, ctx := bc.ExecuteCommand(car, command, connectionCtx)

// If the connection context is done, return to reoperate the connection
if connectionCtx.Err() != nil {
return true, cmd
}
// If the context is not done, return to retry the command
if err != nil && ctx.Err() == nil {
return true, cmd
}

// Successful or api context done so no retry
return false, nil
}

for {
select {
case <-timeout:
case <-connectionCtx.Done():
log.Debug("connection Timeout")
return nil
case command, ok := <-bc.providerStack:
if !ok {
return nil
}

//If new VIN, close connection
if command.Vin != firstCommand.Vin {
log.Debug("new VIN, so close connection")
return &command
}

cmd, err := bc.ExecuteCommand(car, &command)
if err != nil {
return cmd
doReturn, retryCommand := handleCommand(&command)
if doReturn {
return retryCommand
}
case command, ok := <-bc.commandStack:
if !ok {
return nil
}

//If new VIN, close connection
if command.Vin != firstCommand.Vin {
log.Debug("new VIN, so close connection")
return &command
}

cmd, err := bc.ExecuteCommand(car, &command)
if err != nil {
return cmd
doReturn, retryCommand := handleCommand(&command)
if doReturn {
return retryCommand
}
}
}
}

func (bc *BleControl) ExecuteCommand(car *vehicle.Vehicle, command *commands.Command) (retryCommand *commands.Command, retErr error) {
func (bc *BleControl) ExecuteCommand(car *vehicle.Vehicle, command *commands.Command, connectionCtx context.Context) (retryCommand *commands.Command, retErr error, ctx context.Context) {
log.Info("sending", "command", command.Command, "body", command.Body)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if command.Response != nil && command.Response.Ctx != nil {
ctx = command.Response.Ctx
} else {
if command.Response != nil {
log.Debug("no context provided, using default", "command", command.Command, "body", command.Body)
}
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
}

var sleep = 3 * time.Second
var retryCount = 3
Expand All @@ -290,30 +329,50 @@ func (bc *BleControl) ExecuteCommand(car *vehicle.Vehicle, command *commands.Com
}
}()

// If the context is already done, return immediately
if ctx.Err() != nil {
return nil, ctx.Err(), ctx
}

// Wrap ctx with connectionCtx
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-connectionCtx.Done():
cancel()
case <-ctx.Done():
}
}()

for i := 0; i < retryCount; i++ {
if i > 0 {
log.Warn(lastErr)
log.Info(fmt.Sprintf("retrying in %d seconds", sleep/time.Second))
time.Sleep(sleep)
select {
case <-time.After(sleep):
case <-ctx.Done():
return nil, ctx.Err(), ctx
}
sleep *= 2
}

retry, err := command.Send(ctx, car)
if err == nil {
//Successful
log.Info("successfully executed", "command", command.Command, "body", command.Body)
return nil, nil
return nil, nil, ctx
} else if !retry {
return nil, nil
return nil, nil, ctx
} else {
//closed pipe
if strings.Contains(err.Error(), "closed pipe") {
//connection lost, returning the command so it can be executed again
return command, err
return command, err, ctx
}
lastErr = err
}
}
log.Error("canceled", "command", command.Command, "body", command.Body, "err", lastErr)
return nil, lastErr
return nil, lastErr, ctx
}
2 changes: 1 addition & 1 deletion internal/ble/control/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func SendKeysToVehicle(vin string) error {
defer car.Disconnect()
defer log.Debug("disconnect vehicle (A)")

_, err := tempBleControl.ExecuteCommand(car, cmd)
_, err, _ := tempBleControl.ExecuteCommand(car, cmd, context.Background())
if err != nil {
return err
}
Expand Down

0 comments on commit 954dea0

Please sign in to comment.