Skip to content

Commit b6e4de7

Browse files
committed
[locker] add Close method
1 parent 60310b5 commit b6e4de7

File tree

3 files changed

+38
-3
lines changed

3 files changed

+38
-3
lines changed

internal/worker/locker/locker.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,17 @@ import (
55
"errors"
66
)
77

8+
// ErrLockNotAcquired is returned when a lock cannot be acquired within the configured timeout.
89
var ErrLockNotAcquired = errors.New("lock not acquired")
910

1011
type Locker interface {
12+
// AcquireLock attempts to acquire a lock for the given key.
13+
// Returns ErrLockNotAcquired if the lock cannot be acquired within the configured timeout.
1114
AcquireLock(ctx context.Context, key string) error
15+
// ReleaseLock releases a previously acquired lock for the given key.
16+
// Returns an error if the lock was not held by this instance.
1217
ReleaseLock(ctx context.Context, key string) error
18+
19+
// Close releases any held locks.
20+
Close() error
1321
}

internal/worker/locker/module.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package locker
22

33
import (
4+
"context"
45
"database/sql"
56

67
"github.com/go-core-fx/logger"
@@ -16,5 +17,15 @@ func Module() fx.Option {
1617
fx.Provide(func(db *sql.DB) Locker {
1718
return NewMySQLLocker(db, "worker:", timeoutSeconds)
1819
}),
20+
fx.Invoke(func(locker Locker, lc fx.Lifecycle) {
21+
lc.Append(fx.Hook{
22+
OnStart: func(_ context.Context) error {
23+
return nil
24+
},
25+
OnStop: func(_ context.Context) error {
26+
return locker.Close()
27+
},
28+
})
29+
}),
1930
)
2031
}

internal/worker/locker/mysql.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,21 @@ import (
55
"database/sql"
66
"fmt"
77
"sync"
8+
"time"
89
)
910

1011
type mySQLLocker struct {
1112
db *sql.DB
1213

1314
prefix string
14-
timeout uint
15+
timeout time.Duration
1516

1617
mu sync.Mutex
1718
conns map[string]*sql.Conn
1819
}
1920

20-
func NewMySQLLocker(db *sql.DB, prefix string, timeout uint) Locker {
21+
// NewMySQLLocker creates a new MySQL-based distributed locker.
22+
func NewMySQLLocker(db *sql.DB, prefix string, timeout time.Duration) Locker {
2123
return &mySQLLocker{
2224
db: db,
2325

@@ -39,7 +41,7 @@ func (m *mySQLLocker) AcquireLock(ctx context.Context, key string) error {
3941
}
4042

4143
var res sql.NullInt64
42-
if err := conn.QueryRowContext(ctx, "SELECT GET_LOCK(?, ?)", name, m.timeout).Scan(&res); err != nil {
44+
if err := conn.QueryRowContext(ctx, "SELECT GET_LOCK(?, ?)", name, m.timeout.Seconds()).Scan(&res); err != nil {
4345
_ = conn.Close()
4446
return fmt.Errorf("failed to get lock: %w", err)
4547
}
@@ -85,4 +87,18 @@ func (m *mySQLLocker) ReleaseLock(ctx context.Context, key string) error {
8587
return nil
8688
}
8789

90+
// Close closes all remaining pinned connections.
91+
// Should be called during shutdown to clean up resources.
92+
func (m *mySQLLocker) Close() error {
93+
m.mu.Lock()
94+
defer m.mu.Unlock()
95+
for key, conn := range m.conns {
96+
if conn != nil {
97+
_ = conn.Close()
98+
}
99+
delete(m.conns, key)
100+
}
101+
return nil
102+
}
103+
88104
var _ Locker = (*mySQLLocker)(nil)

0 commit comments

Comments
 (0)