Skip to content

Add worker queue to enable sending message from S -> C #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions lib/ruby_mcp/capabilities/prompts.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module RubyMCP::Capabilities::Prompts
def add_prompt(...)
@prompts.add(...)

RubyMCP.logger.info(@transport)
send_prompts_list_changed if @transport
end

private

def send_prompts_list_changed
@transport.enqueue(jsonrpc: "2.0", method: "notifications/prompts/list_changed")
end
end
7 changes: 2 additions & 5 deletions lib/ruby_mcp/server.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module RubyMCP
class Server
include Capabilities::Logging
include Capabilities::Prompts

attr_reader :lifecycle, :prompts, :resources

Expand All @@ -21,17 +22,13 @@ def connect(transport)
start_transport
end

def add_prompt(...)
@prompts.add(...)
end

def add_resource(...)
@resources.add(...)
end

def send_message(message)
RubyMCP.logger.debug "S -> C : #{message}"
@transport.send(message)
@transport.enqueue(message)
end

def answer(request, result)
Expand Down
31 changes: 27 additions & 4 deletions lib/ruby_mcp/transport/stdio.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
module RubyMCP
class Transport
class Stdio < Transport
def initialize
@queue = Queue.new
end

def start
@running = true
start_message_worker

while @running
begin
line = $stdin.gets

break if line.nil?

@on_message.call(line.strip)
@queue << [ :incoming, line.strip ]
rescue StandardError => e
RubyMCP.logger.error("Exception: #{e}")
end
Expand All @@ -19,15 +24,33 @@ def start
@on_close.call
end

def send(message)
$stdout.puts(JSON.generate(message))
$stdout.flush
def enqueue(message)
@queue << [ :outgoing, JSON.generate(message) ]
end


def on_close(&block)
@on_close = block
end

private

def start_message_worker
sleep 0.2
RubyMCP.logger.info("Starting worker thread")
@worker = Thread.new do
while @running
type, message = @queue.pop

if type == :incoming
@on_message.call(message)
else
$stdout.puts(message)
$stdout.flush
end
end
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/ruby_mcp/transport/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def start
@running = true
end

def send(message)
def enqueue(message)
@responses << JSON.generate(message)
end

Expand Down
39 changes: 39 additions & 0 deletions test/capabilities/test_prompts_capability.rb
Original file line number Diff line number Diff line change
Expand Up @@ -329,4 +329,43 @@ def test_prompt_get_missing_required_argument_with_multiple_required_args
}
)
end

def test_adding_prompt_sends_prompts_list_changed
@server.add_prompt(
name: "refactor",
description: "Review this code",
arguments: [
{
name: "code",
description: "code to review",
required: true,
completions: ->(*) { [ "some", "completion", "value" ] }
},
{
name: "language",
description: "Programming language",
required: true,
completions: ->(*) { [ "some", "completion", "value" ] }
}
],
result: ->() {
{
description: "demo",
messages: [
{
role: "user",
content: {
type: "text",
text: "demo"
}
}
]
}
},
)

assert_last_response(
jsonrpc: "2.0", method: "notifications/prompts/list_changed"
)
end
end