Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cmd/whasapo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ func cmdPair() {

func cmdServe() {
dbPath := getDBPath()
releaseLock := acquireServeLock()
defer releaseLock()

var err error
wa, err = whatsapp.NewClient(dbPath)
Expand Down Expand Up @@ -327,6 +329,20 @@ func cmdServe() {
}
}

func acquireServeLock() func() {
lockPath := filepath.Join(dataDir(), "serve.lock")
lock, err := os.OpenFile(lockPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0600)
if err != nil {
fmt.Fprintf(os.Stderr, "whasapo: another serve process is already running; close/restart your AI app or remove stale lock %s\n", lockPath)
os.Exit(1)
}
fmt.Fprintf(lock, "%d\n", os.Getpid())
lock.Close()
return func() {
os.Remove(lockPath)
}
}

// --- status command ---

func cmdStatus() {
Expand Down
40 changes: 35 additions & 5 deletions whatsapp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync/atomic"
"time"

_ "modernc.org/sqlite"
"go.mau.fi/whatsmeow"
"go.mau.fi/whatsmeow/proto/waE2E"
"go.mau.fi/whatsmeow/proto/waHistorySync"
Expand All @@ -22,6 +21,7 @@ import (
"go.mau.fi/whatsmeow/types/events"
waLog "go.mau.fi/whatsmeow/util/log"
"google.golang.org/protobuf/proto"
_ "modernc.org/sqlite"
)

// StoredMessage holds a message.
Expand Down Expand Up @@ -55,7 +55,7 @@ type ChatDetail struct {
Topic string `json:"topic,omitempty"`
Participants []ParticipantInfo `json:"participants,omitempty"`
CreatedAt string `json:"created_at,omitempty"`
MessageCount int `json:"message_count"`
MessageCount int `json:"message_count"`
}

// ParticipantInfo holds group participant info.
Expand All @@ -67,16 +67,17 @@ type ParticipantInfo struct {

// Client wraps whatsmeow with persistent message storage.
type Client struct {
WM *whatsmeow.Client
Container *sqlstore.Container
db *sql.DB
WM *whatsmeow.Client
Container *sqlstore.Container
db *sql.DB
firstConn chan struct{} // closed on first successful connection
ready atomic.Bool
loggedOut atomic.Bool
disconnectAt atomic.Int64 // unix timestamp of last disconnect, 0 if connected
names map[string]string
namesMu sync.RWMutex
mediaDir string
reconnectMu sync.Mutex
}

const dsn = "file:%s?_pragma=foreign_keys(1)&_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)"
Expand Down Expand Up @@ -235,6 +236,7 @@ func (c *Client) eventHandler(evt interface{}) {
// Only log on transition from connected → disconnected (debounce)
c.disconnectAt.Store(time.Now().Unix())
fmt.Fprintf(os.Stderr, "whasapo: disconnected, reconnecting...\n")
c.scheduleReconnect()
}
case *events.LoggedOut:
c.ready.Store(false)
Expand All @@ -244,6 +246,7 @@ func (c *Client) eventHandler(evt interface{}) {
c.ready.Store(false)
c.disconnectAt.Store(time.Now().Unix())
fmt.Fprintf(os.Stderr, "whasapo: connection replaced by another client\n")
c.scheduleReconnect()
case *events.HistorySync:
c.handleHistorySync(v.Data)
case *events.Message:
Expand All @@ -260,6 +263,33 @@ func (c *Client) eventHandler(evt interface{}) {
}
}

func (c *Client) scheduleReconnect() {
go func() {
c.reconnectMu.Lock()
defer c.reconnectMu.Unlock()
if c.loggedOut.Load() || c.ready.Load() {
return
}
for attempt := 1; attempt <= 6; attempt++ {
time.Sleep(time.Duration(attempt*2) * time.Second)
if c.loggedOut.Load() || c.ready.Load() {
return
}
if err := c.WM.Connect(); err != nil {
fmt.Fprintf(os.Stderr, "whasapo: reconnect attempt %d failed: %v\n", attempt, err)
continue
}
deadline := time.Now().Add(10 * time.Second)
for time.Now().Before(deadline) {
if c.ready.Load() {
return
}
time.Sleep(250 * time.Millisecond)
}
}
}()
}

func (c *Client) messageToStored(info types.MessageInfo, msg *waE2E.Message) StoredMessage {
text, mediaType := extractTextAndMedia(msg)
sm := StoredMessage{
Expand Down