Skip to content

Commit 1c2c66b

Browse files
M09Icclaude
andcommitted
feat(localrpc): implement StreamCommand with EventHook-based streaming
Add StreamCommand to LocalRPCServer for streaming long-running task output. Uses the existing EventHook system instead of polling or DoneCallbacks: - Register EventHook before command execution (zero race window) - Read task ID from Session.LastTask (no polling, no regex) - EventHook filters by task ID and renders via InternalFunctions - Capture LastTask within mutex to prevent concurrent access races Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
1 parent 1e1e33b commit 1c2c66b

2 files changed

Lines changed: 157 additions & 1 deletion

File tree

client/core/localrpc.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,17 @@ package core
22

33
import (
44
"context"
5+
"fmt"
56
"github.com/chainreactors/IoM-go/client"
67
"github.com/chainreactors/IoM-go/consts"
8+
"github.com/chainreactors/IoM-go/proto/client/clientpb"
79
"github.com/chainreactors/IoM-go/proto/services/localrpc"
10+
"github.com/chainreactors/malice-network/helper/intermediate"
11+
"github.com/kballard/go-shellquote"
812
"google.golang.org/grpc"
913
"net"
1014
"runtime/debug"
15+
"strings"
1116
"sync/atomic"
1217
"time"
1318
)
@@ -163,6 +168,157 @@ func (s *LocalRPCServer) SearchCommands(ctx context.Context, req *localrpc.Searc
163168
}, nil
164169
}
165170

171+
// StreamCommand executes a command and continuously streams back task event output.
172+
// It is a general-purpose streaming RPC: any command that produces persistent EventTaskDone
173+
// events (tapping, poison, etc.) will have its rendered output streamed to the caller.
174+
//
175+
// Design:
176+
// 1. Register an EventHook BEFORE executing the command (no race window).
177+
// 2. Execute the command via cobra; read Session.LastTask for the task ID (no polling).
178+
// 3. EventHook filters events by task ID, renders via InternalFunctions, writes to channel.
179+
// 4. Main loop reads channel and streams to gRPC client.
180+
// 5. On context cancel: remove EventHook, return.
181+
func (s *LocalRPCServer) StreamCommand(req *localrpc.ExecuteCommandRequest, stream localrpc.CommandService_StreamCommandServer) error {
182+
reqID := atomic.AddUint64(&localRPCRequestSeq, 1)
183+
client.Log.Infof("LocalRPC[%d]: StreamCommand start (session=%s, command=%q)\n", reqID, req.SessionId, req.Command)
184+
185+
ch := make(chan string, 128)
186+
ctx := stream.Context()
187+
188+
// taskID is written after command execution, read by the EventHook goroutine.
189+
var taskID atomic.Uint32
190+
191+
// 1. Register EventHook BEFORE executing the command.
192+
// This ensures zero race window — events are captured from the moment the task is created.
193+
// The hook matches all task-done events; filtering by taskID happens inside.
194+
hookCondition := client.EventCondition{
195+
Type: consts.EventTask,
196+
Op: consts.CtrlTaskCallback,
197+
}
198+
hookFn := client.OnEventFunc(func(event *clientpb.Event) (bool, error) {
199+
task := event.GetTask()
200+
if task == nil {
201+
return false, nil
202+
}
203+
204+
// Filter: only forward events for our task on our session.
205+
tid := taskID.Load()
206+
if tid == 0 || task.TaskId != tid || task.SessionId != req.SessionId {
207+
return false, nil
208+
}
209+
210+
tctx := wrapToTaskContext(event)
211+
fn, ok := intermediate.InternalFunctions[task.Type]
212+
if !ok || fn.DoneCallback == nil {
213+
return false, nil
214+
}
215+
formatted, err := fn.DoneCallback(tctx)
216+
if err != nil || formatted == "" {
217+
return false, nil
218+
}
219+
220+
select {
221+
case ch <- formatted:
222+
default:
223+
// Drop if consumer is slow — never block the event dispatch goroutine.
224+
}
225+
return false, nil
226+
})
227+
s.console.AddEventHook(hookCondition, hookFn)
228+
defer s.console.removeEventHook(hookCondition, hookFn)
229+
230+
// 2. Execute the command; LastTask is returned from within the lock (no race).
231+
syncOutput, lastTask, err := executeStreamCommand(s.console, req.Command, req.SessionId)
232+
if err != nil {
233+
client.Log.Errorf("LocalRPC[%d]: StreamCommand exec failed: %v\n", reqID, err)
234+
return stream.Send(&localrpc.ExecuteCommandResponse{
235+
Output: syncOutput,
236+
Error: err.Error(),
237+
Success: false,
238+
})
239+
}
240+
241+
if lastTask == nil {
242+
client.Log.Debugf("LocalRPC[%d]: StreamCommand no task created, returning sync output\n", reqID)
243+
return stream.Send(&localrpc.ExecuteCommandResponse{
244+
Output: syncOutput,
245+
Success: true,
246+
})
247+
}
248+
taskID.Store(lastTask.TaskId)
249+
client.Log.Infof("LocalRPC[%d]: StreamCommand streaming task %d (session=%s)\n",
250+
reqID, lastTask.TaskId, req.SessionId)
251+
252+
// 3. Send initial ACK with sync output.
253+
if err := stream.Send(&localrpc.ExecuteCommandResponse{
254+
Output: syncOutput + "\n",
255+
Success: true,
256+
}); err != nil {
257+
return err
258+
}
259+
260+
// 4. Stream events until the client cancels.
261+
for {
262+
select {
263+
case <-ctx.Done():
264+
client.Log.Infof("LocalRPC[%d]: StreamCommand context cancelled\n", reqID)
265+
return nil
266+
case msg := <-ch:
267+
if err := stream.Send(&localrpc.ExecuteCommandResponse{
268+
Output: msg + "\n",
269+
Success: true,
270+
}); err != nil {
271+
return err
272+
}
273+
}
274+
}
275+
}
276+
277+
// executeStreamCommand runs a cobra command for StreamCommand.
278+
// It acquires commandExecMu only for the duration of command execution (no polling).
279+
// Returns the sync console output and the task created by the command (nil if none).
280+
func executeStreamCommand(con *Console, command, sessionID string) (string, *clientpb.Task, error) {
281+
if command == "" {
282+
return "", nil, fmt.Errorf("command is required")
283+
}
284+
285+
commandExecMu.Lock()
286+
defer commandExecMu.Unlock()
287+
288+
restore := con.WithNonInteractiveExecution(true)
289+
defer restore()
290+
291+
if err := switchSessionWithCallee(con, sessionID, consts.CalleeRPC); err != nil {
292+
return "", nil, err
293+
}
294+
295+
// Clear LastTask so we can detect whether the command created a new one.
296+
sess := con.GetInteractive()
297+
if sess != nil {
298+
sess.LastTask = nil
299+
}
300+
301+
args, err := shellquote.Split(command)
302+
if err != nil {
303+
return "", nil, err
304+
}
305+
args = stripWaitFlag(args)
306+
307+
start := time.Now()
308+
if err := con.App.Execute(con.Context(), con.App.ActiveMenu(), args, false); err != nil {
309+
return "", nil, err
310+
}
311+
312+
syncOutput := strings.TrimSpace(client.RemoveANSI(client.Stdout.Range(start, time.Now())))
313+
314+
// Capture LastTask while still holding the lock.
315+
var task *clientpb.Task
316+
if sess != nil {
317+
task = sess.LastTask
318+
}
319+
return syncOutput, task, nil
320+
}
321+
166322
// LocalRPC wraps the gRPC server instance
167323
type LocalRPC struct {
168324
server *grpc.Server

external/IoM-go

0 commit comments

Comments
 (0)