Skip to content

Commit a100af4

Browse files
Updates
1 parent 557d7e6 commit a100af4

File tree

12 files changed

+182
-114
lines changed

12 files changed

+182
-114
lines changed

sdk/action/client.go

Lines changed: 58 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,29 @@ import (
1212
"github.com/cosmos/cosmos-sdk/crypto/keyring"
1313
)
1414

15+
// Client defines the interface for action operations
1516
type Client interface {
1617
StartCascade(ctx context.Context, fileHash string, actionID string, filePath string, signedData string) (string, error)
17-
18+
DeleteTask(ctx context.Context, taskID string) error
1819
GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool)
19-
20-
SubscribeToEvents(eventType event.EventType, handler event.Handler)
21-
22-
SubscribeToAllEvents(handler event.Handler)
20+
SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler)
21+
SubscribeToAllEvents(ctx context.Context, handler event.Handler)
2322
}
2423

24+
// ClientImpl implements the Client interface
2525
type ClientImpl struct {
2626
config config.Config
2727
taskManager task.Manager
2828
logger log.Logger
2929
keyring keyring.Keyring
3030
}
3131

32+
// Verify interface compliance at compile time
33+
var _ Client = (*ClientImpl)(nil)
34+
35+
// NewClient creates a new action client
3236
func NewClient(
37+
ctx context.Context,
3338
config config.Config,
3439
logger log.Logger,
3540
keyring keyring.Keyring,
@@ -38,7 +43,7 @@ func NewClient(
3843
logger = log.NewNoopLogger()
3944
}
4045

41-
taskManager, err := task.NewManager(config, logger, keyring)
46+
taskManager, err := task.NewManager(ctx, config, logger, keyring)
4247
if err != nil {
4348
return nil, fmt.Errorf("failed to create task manager: %w", err)
4449
}
@@ -51,69 +56,89 @@ func NewClient(
5156
}, nil
5257
}
5358

54-
func (ac *ClientImpl) StartCascade(
59+
// StartCascade initiates a cascade operation
60+
func (c *ClientImpl) StartCascade(
5561
ctx context.Context,
56-
fileHash string,
57-
actionID string,
58-
filePath string,
59-
signedData string,
62+
fileHash string, // Hash of the file to process
63+
actionID string, // ID of the action to perform
64+
filePath string, // Path to the file on disk
65+
signedData string, // Optional signed authorization data
6066
) (string, error) {
61-
ac.logger.Debug(ctx, "Starting cascade operation",
67+
c.logger.Debug(ctx, "Starting cascade operation",
6268
"fileHash", fileHash,
6369
"actionID", actionID,
6470
"filePath", filePath,
6571
)
6672

6773
if fileHash == "" {
68-
ac.logger.Error(ctx, "Empty file hash provided")
74+
c.logger.Error(ctx, "Empty file hash provided")
6975
return "", ErrEmptyFileHash
7076
}
7177
if actionID == "" {
72-
ac.logger.Error(ctx, "Empty action ID provided")
78+
c.logger.Error(ctx, "Empty action ID provided")
7379
return "", ErrEmptyActionID
7480
}
7581
if filePath == "" {
76-
ac.logger.Error(ctx, "Empty file path provided")
82+
c.logger.Error(ctx, "Empty file path provided")
7783
return "", ErrEmptyFilePath
7884
}
7985

80-
taskID, err := ac.taskManager.CreateCascadeTask(ctx, fileHash, actionID, filePath, signedData)
86+
taskID, err := c.taskManager.CreateCascadeTask(ctx, fileHash, actionID, filePath, signedData)
8187
if err != nil {
82-
ac.logger.Error(ctx, "Failed to create cascade task", "error", err)
88+
c.logger.Error(ctx, "Failed to create cascade task", "error", err)
8389
return "", fmt.Errorf("failed to create cascade task: %w", err)
8490
}
8591

86-
ac.logger.Info(ctx, "Cascade task created successfully", "taskID", taskID)
92+
c.logger.Info(ctx, "Cascade task created successfully", "taskID", taskID)
8793
return taskID, nil
8894
}
8995

90-
func (ac *ClientImpl) GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool) {
91-
ac.logger.Debug(ctx, "Getting task", "taskID", taskID)
92-
task, found := ac.taskManager.GetTask(ctx, taskID)
96+
// GetTask retrieves a task by its ID
97+
func (c *ClientImpl) GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool) {
98+
c.logger.Debug(ctx, "Getting task", "taskID", taskID)
99+
task, found := c.taskManager.GetTask(ctx, taskID)
93100
if !found {
94-
ac.logger.Debug(ctx, "Task not found", "taskID", taskID)
101+
c.logger.Debug(ctx, "Task not found", "taskID", taskID)
95102
} else {
96-
ac.logger.Debug(ctx, "Task found", "taskID", taskID, "status", task.Status)
103+
c.logger.Debug(ctx, "Task found", "taskID", taskID, "status", task.Status)
97104
}
98105
return task, found
99106
}
100107

108+
// DeleteTask removes a task by its ID
109+
func (c *ClientImpl) DeleteTask(ctx context.Context, taskID string) error {
110+
c.logger.Debug(ctx, "Deleting task", "taskID", taskID)
111+
if taskID == "" {
112+
c.logger.Error(ctx, "Empty task ID provided")
113+
return fmt.Errorf("task ID cannot be empty")
114+
}
115+
116+
err := c.taskManager.DeleteTask(ctx, taskID)
117+
if err != nil {
118+
c.logger.Error(ctx, "Failed to delete task", "taskID", taskID, "error", err)
119+
return fmt.Errorf("failed to delete task: %w", err)
120+
}
121+
122+
c.logger.Info(ctx, "Task deleted successfully", "taskID", taskID)
123+
return nil
124+
}
125+
101126
// SubscribeToEvents registers a handler for specific event types
102-
func (ac *ClientImpl) SubscribeToEvents(eventType event.EventType, handler event.Handler) {
103-
ac.logger.Debug(context.Background(), "Subscribing to events via task manager", "eventType", eventType)
104-
if ac.taskManager != nil {
105-
ac.taskManager.SubscribeToEvents(eventType, handler)
127+
func (c *ClientImpl) SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) {
128+
c.logger.Debug(ctx, "Subscribing to events via task manager", "eventType", eventType)
129+
if c.taskManager != nil {
130+
c.taskManager.SubscribeToEvents(ctx, eventType, handler)
106131
} else {
107-
ac.logger.Warn(context.Background(), "TaskManager is nil, cannot subscribe to events")
132+
c.logger.Warn(ctx, "TaskManager is nil, cannot subscribe to events")
108133
}
109134
}
110135

111136
// SubscribeToAllEvents registers a handler for all events
112-
func (ac *ClientImpl) SubscribeToAllEvents(handler event.Handler) {
113-
ac.logger.Debug(context.Background(), "Subscribing to all events via task manager")
114-
if ac.taskManager != nil {
115-
ac.taskManager.SubscribeToAllEvents(handler)
137+
func (c *ClientImpl) SubscribeToAllEvents(ctx context.Context, handler event.Handler) {
138+
c.logger.Debug(ctx, "Subscribing to all events via task manager")
139+
if c.taskManager != nil {
140+
c.taskManager.SubscribeToAllEvents(ctx, handler)
116141
} else {
117-
ac.logger.Warn(context.Background(), "TaskManager is nil, cannot subscribe to all events")
142+
c.logger.Warn(ctx, "TaskManager is nil, cannot subscribe to all events")
118143
}
119144
}

sdk/adapters/lumera/adapter.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ func NewAdapter(
4141
logger = log.NewNoopLogger()
4242
}
4343

44+
logger.Debug(ctx, "Creating Lumera adapter",
45+
"grpcAddr", config.GRPCAddr,
46+
"chainID", config.ChainID,
47+
"timeout", config.Timeout)
48+
4449
// Create client options from the config
4550
options := []lumeraclient.Option{
4651
lumeraclient.WithGRPCAddr(config.GRPCAddr),
@@ -61,9 +66,12 @@ func NewAdapter(
6166
// Initialize the client
6267
client, err := lumeraclient.NewClient(ctx, options...)
6368
if err != nil {
69+
logger.Error(ctx, "Failed to initialize Lumera client", "error", err)
6470
return nil, fmt.Errorf("failed to initialize Lumera client: %w", err)
6571
}
6672

73+
logger.Info(ctx, "Lumera adapter created successfully")
74+
6775
return &Adapter{
6876
client: client,
6977
logger: logger,

sdk/adapters/supernodeservice/adapter.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ type cascadeAdapter struct {
1414
logger log.Logger
1515
}
1616

17-
func NewCascadeAdapter(client cascade.CascadeServiceClient, logger log.Logger) CascadeServiceClient {
17+
func NewCascadeAdapter(ctx context.Context, client cascade.CascadeServiceClient, logger log.Logger) CascadeServiceClient {
1818
if logger == nil {
1919
logger = log.NewNoopLogger()
2020
}
2121

22+
logger.Debug(ctx, "Creating cascade service adapter")
23+
2224
return &cascadeAdapter{
2325
client: client,
2426
logger: logger,

sdk/event/bus.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
// Handler is a function that processes events
11-
type Handler func(Event)
11+
type Handler func(ctx context.Context, e Event)
1212

1313
// Bus manages event subscriptions and dispatching
1414
type Bus struct {
@@ -21,7 +21,7 @@ type Bus struct {
2121
}
2222

2323
// NewBus creates a new event bus
24-
func NewBus(logger log.Logger, maxWorkers int) *Bus {
24+
func NewBus(ctx context.Context, logger log.Logger, maxWorkers int) *Bus {
2525
// Use default logger if nil is provided
2626
if logger == nil {
2727
logger = log.NewNoopLogger()
@@ -32,6 +32,8 @@ func NewBus(logger log.Logger, maxWorkers int) *Bus {
3232
maxWorkers = 50
3333
}
3434

35+
logger.Debug(ctx, "Creating new event bus", "maxWorkers", maxWorkers)
36+
3537
return &Bus{
3638
subscribers: make(map[EventType][]Handler),
3739
logger: logger,
@@ -41,11 +43,10 @@ func NewBus(logger log.Logger, maxWorkers int) *Bus {
4143
}
4244

4345
// Subscribe registers a handler for a specific event type
44-
func (b *Bus) Subscribe(eventType EventType, handler Handler) {
46+
func (b *Bus) Subscribe(ctx context.Context, eventType EventType, handler Handler) {
4547
b.mu.Lock()
4648
defer b.mu.Unlock()
4749

48-
ctx := context.Background()
4950
b.logger.Debug(ctx, "Subscribing handler to event type", "eventType", eventType)
5051

5152
if _, exists := b.subscribers[eventType]; !exists {
@@ -55,17 +56,16 @@ func (b *Bus) Subscribe(eventType EventType, handler Handler) {
5556
}
5657

5758
// SubscribeAll registers a handler for all event types
58-
func (b *Bus) SubscribeAll(handler Handler) {
59+
func (b *Bus) SubscribeAll(ctx context.Context, handler Handler) {
5960
b.mu.Lock()
6061
defer b.mu.Unlock()
6162

62-
ctx := context.Background()
6363
b.logger.Debug(ctx, "Subscribing handler to all event types")
6464
b.wildcardHandlers = append(b.wildcardHandlers, handler)
6565
}
6666

6767
// safelyCallHandler executes a handler with proper panic recovery
68-
func (b *Bus) safelyCallHandler(handler Handler, event Event) {
68+
func (b *Bus) safelyCallHandler(ctx context.Context, handler Handler, event Event) {
6969
// Acquire a worker slot
7070
b.workerPool <- struct{}{}
7171

@@ -77,7 +77,6 @@ func (b *Bus) safelyCallHandler(handler Handler, event Event) {
7777
// Recover from panics
7878
if r := recover(); r != nil {
7979
stackTrace := debug.Stack()
80-
ctx := context.Background()
8180
b.logger.Error(ctx,
8281
"Event handler panicked",
8382
"error", r,
@@ -90,8 +89,8 @@ func (b *Bus) safelyCallHandler(handler Handler, event Event) {
9089
// Create a deep copy of the event to avoid race conditions
9190
eventCopy := copyEvent(event)
9291

93-
// Execute the handler
94-
handler(eventCopy)
92+
// Execute the handler with the provided context
93+
handler(ctx, eventCopy)
9594
}()
9695
}
9796

@@ -115,11 +114,10 @@ func copyEvent(e Event) Event {
115114
}
116115

117116
// Publish sends an event to all relevant subscribers
118-
func (b *Bus) Publish(event Event) {
117+
func (b *Bus) Publish(ctx context.Context, event Event) {
119118
b.mu.RLock()
120119
defer b.mu.RUnlock()
121120

122-
ctx := context.Background()
123121
b.logger.Debug(ctx, "Publishing event",
124122
"type", event.Type,
125123
"taskID", event.TaskID,
@@ -132,7 +130,7 @@ func (b *Bus) Publish(event Event) {
132130
"handlerCount", len(handlers))
133131

134132
for _, handler := range handlers {
135-
b.safelyCallHandler(handler, event)
133+
b.safelyCallHandler(ctx, handler, event)
136134
}
137135
}
138136

@@ -142,13 +140,15 @@ func (b *Bus) Publish(event Event) {
142140
"handlerCount", len(b.wildcardHandlers))
143141

144142
for _, handler := range b.wildcardHandlers {
145-
b.safelyCallHandler(handler, event)
143+
b.safelyCallHandler(ctx, handler, event)
146144
}
147145
}
148146
}
149147

150148
// WaitForHandlers waits for all event handlers to complete
151-
func (b *Bus) WaitForHandlers() {
149+
func (b *Bus) WaitForHandlers(ctx context.Context) {
150+
b.logger.Debug(ctx, "Waiting for all handlers to complete")
151+
152152
// Fill the worker pool to capacity (blocks until all workers are free)
153153
for i := 0; i < b.maxWorkers; i++ {
154154
b.workerPool <- struct{}{}
@@ -161,6 +161,7 @@ func (b *Bus) WaitForHandlers() {
161161
}
162162

163163
// Close releases resources used by the event bus
164-
func (b *Bus) Close() {
165-
b.WaitForHandlers()
164+
func (b *Bus) Close(ctx context.Context) {
165+
b.logger.Debug(ctx, "Closing event bus")
166+
b.WaitForHandlers(ctx)
166167
}

sdk/event/types.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package event
22

33
import (
44
"action/adapters/lumera"
5+
"context"
56
"time"
67
)
78

@@ -41,7 +42,7 @@ type SupernodeData struct {
4142
Error string // Error message if applicable
4243
}
4344

44-
func NewEvent(eventType EventType, taskID, taskType string, data map[string]interface{}) Event {
45+
func NewEvent(ctx context.Context, eventType EventType, taskID, taskType string, data map[string]interface{}) Event {
4546
if data == nil {
4647
data = make(map[string]interface{})
4748
}

sdk/net/factory.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,19 @@ type ClientFactory struct {
2727

2828
// NewClientFactory creates a new client factory with the provided dependencies
2929
func NewClientFactory(
30+
ctx context.Context,
3031
logger log.Logger,
3132
keyring keyring.Keyring,
3233
config FactoryConfig,
3334
) *ClientFactory {
35+
if logger == nil {
36+
logger = log.NewNoopLogger()
37+
}
38+
39+
logger.Debug(ctx, "Creating supernode client factory",
40+
"localAddress", config.LocalCosmosAddress,
41+
"defaultPort", config.DefaultSupernodePort)
42+
3443
return &ClientFactory{
3544
logger: logger,
3645
keyring: keyring,

sdk/net/impl.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,23 @@ import (
1717
"google.golang.org/grpc/health/grpc_health_v1"
1818
)
1919

20+
// supernodeClient implements the SupernodeClient interface
2021
type supernodeClient struct {
2122
cascadeClient supernodeservice.CascadeServiceClient
2223
healthClient grpc_health_v1.HealthClient
2324
conn *grpc.ClientConn
2425
logger log.Logger
2526
}
2627

28+
// Verify interface compliance at compile time
29+
var _ SupernodeClient = (*supernodeClient)(nil)
30+
2731
// NewSupernodeClient creates a new supernode client
2832
func NewSupernodeClient(
2933
ctx context.Context,
3034
logger log.Logger,
3135
keyring keyring.Keyring,
3236
localCosmosAddress string,
33-
3437
targetSupernode lumera.Supernode,
3538
clientOptions *client.ClientOptions,
3639
) (SupernodeClient, error) {
@@ -80,6 +83,7 @@ func NewSupernodeClient(
8083

8184
// Create service clients
8285
cascadeClient := supernodeservice.NewCascadeAdapter(
86+
ctx,
8387
cascade.NewCascadeServiceClient(conn),
8488
logger,
8589
)

0 commit comments

Comments
 (0)