Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout options #207

Merged
merged 5 commits into from
Mar 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions cmd/tesla-control/main.go
Original file line number Diff line number Diff line change
@@ -56,8 +56,8 @@ func Usage() {
}
}

func runCommand(acct *account.Account, car *vehicle.Vehicle, args []string) int {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
func runCommand(acct *account.Account, car *vehicle.Vehicle, args []string, timeout time.Duration) int {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

if err := execute(ctx, acct, car, args); err != nil {
@@ -73,7 +73,7 @@ func runCommand(acct *account.Account, car *vehicle.Vehicle, args []string) int
return 0
}

func runInteractiveShell(acct *account.Account, car *vehicle.Vehicle) int {
func runInteractiveShell(acct *account.Account, car *vehicle.Vehicle, timeout time.Duration) int {
scanner := bufio.NewScanner(os.Stdin)
for fmt.Printf("> "); scanner.Scan(); fmt.Printf("> ") {
args, err := shlex.Split(scanner.Text())
@@ -87,7 +87,7 @@ func runInteractiveShell(acct *account.Account, car *vehicle.Vehicle) int {
writeErr("Invalid command: %s", err)
continue
}
runCommand(acct, car, args)
runCommand(acct, car, args, timeout)
}
if err := scanner.Err(); err != nil {
writeErr("Error reading command: %s", err)
@@ -103,8 +103,10 @@ func main() {
}()

var (
debug bool
forceBLE bool
debug bool
forceBLE bool
commandTimeout time.Duration
connTimeout time.Duration
)
config, err := cli.NewConfig(cli.FlagAll)
if err != nil {
@@ -114,6 +116,8 @@ func main() {
flag.Usage = Usage
flag.BoolVar(&debug, "debug", false, "Enable verbose debugging messages")
flag.BoolVar(&forceBLE, "ble", false, "Force BLE connection even if OAuth environment variables are defined")
flag.DurationVar(&commandTimeout, "command-timeout", 5*time.Second, "Set timeout for commands sent to the vehicle.")
flag.DurationVar(&connTimeout, "connect-timeout", 20*time.Second, "Set timeout for establishing initial connection.")

config.RegisterCommandLineFlags()
flag.Parse()
@@ -150,7 +154,7 @@ func main() {
return
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), connTimeout)
defer cancel()

acct, car, err := config.Connect(ctx)
@@ -171,8 +175,8 @@ func main() {
}

if flag.NArg() > 0 {
status = runCommand(acct, car, flag.Args())
status = runCommand(acct, car, flag.Args(), commandTimeout)
} else {
status = runInteractiveShell(acct, car)
status = runInteractiveShell(acct, car, commandTimeout)
}
}
4 changes: 4 additions & 0 deletions cmd/tesla-http-proxy/main.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"os"
"time"

"github.com/teslamotors/vehicle-command/internal/log"
"github.com/teslamotors/vehicle-command/pkg/cli"
@@ -43,6 +44,7 @@ func main() {
verbose bool
host string
port int
timeout time.Duration
)

config, err := cli.NewConfig(cli.FlagPrivateKey)
@@ -64,6 +66,7 @@ func main() {
flag.BoolVar(&verbose, "verbose", false, "Enable verbose logging")
flag.StringVar(&host, "host", "localhost", "Proxy server `hostname`")
flag.IntVar(&port, "port", defaultPort, "`Port` to listen on")
flag.DurationVar(&timeout, "timeout", proxy.DefaultTimeout, "Timeout interval when sending commands")
flag.Usage = Usage
config.RegisterCommandLineFlags()
flag.Parse()
@@ -97,6 +100,7 @@ func main() {
if err != nil {
return
}
p.Timeout = timeout
addr := fmt.Sprintf("%s:%d", host, port)
log.Info("Listening on %s", addr)

84 changes: 55 additions & 29 deletions internal/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
@@ -18,16 +18,16 @@ import (
universal "github.com/teslamotors/vehicle-command/pkg/protocol/protobuf/universalmessage"
)

var sessionInfoRequestTimeout = 5 * time.Second
var commandTimeout = 5 * time.Second

// Dispatcher objects send (encrypted) messages to a vehicle and route incoming messages to the
// appropriate receiver object.
type Dispatcher struct {
conn connector.Connector
privateKey authentication.ECDHPrivateKey
address []byte

latencyLock sync.Mutex
maxLatency time.Duration

doneLock sync.Mutex
terminate chan struct{}
done chan bool
@@ -43,6 +43,7 @@ type Dispatcher struct {
func New(conn connector.Connector, privateKey authentication.ECDHPrivateKey) (*Dispatcher, error) {
dispatcher := Dispatcher{
conn: conn,
maxLatency: conn.AllowedLatency(),
address: make([]byte, addressLength),
sessions: make(map[universal.Domain]*session),
handlers: make(map[receiverKey]*receiver),
@@ -52,12 +53,17 @@ func New(conn connector.Connector, privateKey authentication.ECDHPrivateKey) (*D
if _, err := rand.Read(dispatcher.address); err != nil {
return nil, err
}
// Only connections to these domains will be allowed
dispatcher.sessions[universal.Domain_DOMAIN_VEHICLE_SECURITY] = nil
dispatcher.sessions[universal.Domain_DOMAIN_INFOTAINMENT] = nil
return &dispatcher, nil
}

func (d *Dispatcher) SetMaxLatency(latency time.Duration) {
if latency > 0 {
d.latencyLock.Lock()
d.maxLatency = latency
d.latencyLock.Unlock()
}
}

// RetryInterval fetches the transport-layer dependent recommended delay between retry attempts.
func (d *Dispatcher) RetryInterval() time.Duration {
return d.conn.RetryInterval()
@@ -71,6 +77,7 @@ func (d *Dispatcher) StartSession(ctx context.Context, domain universal.Domain)
s, ok := d.sessions[domain]
if !ok {
d.sessions[domain], err = NewSession(d.privateKey, d.conn.VIN())
s = d.sessions[domain]
} else if s != nil && s.ctx != nil {
log.Info("Session for %s loaded from cache", domain)
sessionReady = true
@@ -79,16 +86,27 @@ func (d *Dispatcher) StartSession(ctx context.Context, domain universal.Domain)
if err != nil || sessionReady {
return err
}
recv, err := d.RequestSessionInfo(ctx, domain)
if err != nil {
return err
}
defer recv.Close()
select {
case reply := <-recv.Recv():
return protocol.GetError(reply)
case <-ctx.Done():
return ctx.Err()
for {
recv, err := d.RequestSessionInfo(ctx, domain)
if err != nil {
return err
}
defer recv.Close()
select {
case reply := <-recv.Recv():
if err = protocol.GetError(reply); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
case <-s.readySignal:
return nil
}
select {
case <-time.After(d.conn.RetryInterval()):
case <-ctx.Done():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for not doing a sleep

return ctx.Err()
}
}
}

@@ -153,6 +171,20 @@ func (d *Dispatcher) checkForSessionUpdate(message *universal.RoutableMessage, h
return
}

if d.privateKey == nil {
log.Warning("[%02x] Discarding session info because client does not have a private key", message.GetRequestUuid())
return
}

d.latencyLock.Lock()
maxLatency := d.maxLatency
d.latencyLock.Unlock()

if handler.expired(maxLatency) {
log.Warning("[%02x] Discarding session info because it was received more than %s after request", message.GetRequestUuid(), maxLatency)
return
}

tag := message.GetSignatureData().GetSessionInfoTag().GetTag()
if tag == nil {
log.Warning("[%02x] Discarding unauthenticated session info", message.GetRequestUuid())
@@ -168,17 +200,8 @@ func (d *Dispatcher) checkForSessionUpdate(message *universal.RoutableMessage, h
return
}

if session == nil {
if session, err = NewSession(d.privateKey, d.conn.VIN()); err != nil {
log.Error("[%02x] Error creating new session: %s", message.GetRequestUuid(), err)
return
}
d.sessions[domain] = session
}

if err = session.ProcessHello(message.GetRequestUuid(), sessionInfo, tag); err != nil {
log.Warning("[%02x] Session info error: %s", message.GetRequestUuid(), err)
d.sessions[domain] = nil
return
}
log.Info("[%02x] Updated session info for %s", message.GetRequestUuid(), domain)
@@ -238,9 +261,7 @@ func (d *Dispatcher) process(message *universal.RoutableMessage) {
// have been a desync. This typically accompanies an error message, and so
// the reply still needs to be passed down to the handler after updating
// session info.
if !handler.expired() && d.privateKey != nil {
d.checkForSessionUpdate(message, handler)
}
d.checkForSessionUpdate(message, handler)

select {
case handler.ch <- message:
@@ -350,8 +371,13 @@ func (d *Dispatcher) Send(ctx context.Context, message *universal.RoutableMessag
if auth != connector.AuthMethodNone {
d.sessionLock.Lock()
session, ok := d.sessions[message.GetToDestination().GetDomain()]
if ok {
session.lock.Lock()
ok = session.ready
session.lock.Unlock()
}
d.sessionLock.Unlock()
if !ok || session == nil {
if !ok {
log.Warning("No session available for %s", message.GetToDestination().GetDomain())
return nil, protocol.ErrNoSession
}
Loading