rpc over websockets made easy, robust, and production ready
a fast and durable bidirectional json rpc channel over websockets. the easiest way to create a live async channel between two nodes via python (or other clients - for example, you can run the websocket server in docker or with Node.js, Ruby on Rails, or any other platform, and they will communicate statelessly with each other).
- both server and clients can easily expose python methods that can be called by the other side. method return values are sent back as rpc responses, which the other side can wait on.
- remote methods are easily called via the
.other.method()wrapper - connections are kept alive with a configurable retry mechanism (using tenacity)
- built-in support for binary data transfer and custom serialization
- automatic connection recovery and heartbeat monitoring
- support for multiple concurrent connections and load balancing
supports and tested on python >= 3.9
uv pip install fasterpc
for development with additional features:
pip install fasterpc[websocket-client,dev]
here im using print but shouldve used fckprint instead. :(
say the server exposes an "add" method:
class rpc_calculator(rpc_methods_base):
async def add(self, a, b):
return a + bcalling it is as easy as calling the method under the client's "other" property:
response = await client.other.add(a=1, b=2)
print(response.result) # 3from pydantic import BaseModel
from typing import List, Dict, Any
class user_data(BaseModel):
name: str
age: int
preferences: Dict[str, Any]
class data_processor(rpc_methods_base):
async def process_users(self, users: List[user_data]) -> Dict[str, int]:
return {user.name: user.age for user in users}
async def get_statistics(self, data: Dict[str, Any]) -> Dict[str, float]:
values = list(data.values())
return {
"mean": sum(values) / len(values),
"max": max(values),
"min": min(values)
}import uvicorn
from fastapi import fastapi
from fasterpc import rpc_methods_base, websocket_rpc_endpoint
# methods to expose to the clients
class concat_server(rpc_methods_base):
async def concat(self, a="", b=""):
return a + b
async def multiply(self, x: int, y: int) -> int:
return x * y
async def get_server_info(self) -> dict:
return {
"version": "1.0.0",
"status": "running",
"uptime": "2h 15m"
}
# init the fast-api app
app = fastapi()
# create an endpoint and load it with the methods to expose
endpoint = websocket_rpc_endpoint(concat_server())
# add the endpoint to the app
endpoint.register_route(app, "/ws")
# start the server
uvicorn.run(app, host="0.0.0.0", port=9000)import asyncio
from fasterpc import rpc_methods_base, websocket_rpc_client
async def run_client(uri):
async with websocket_rpc_client(uri, rpc_methods_base()) as client:
# call concat on the other side
response = await client.other.concat(a="hello", b=" world")
print(response.result) # will print "hello world"
# call multiply method
result = await client.other.multiply(x=5, y=3)
print(result.result) # will print 15
# get server info
info = await client.other.get_server_info()
print(info.result) # will print server info dict
# run the client
asyncio.get_event_loop().run_until_complete(
run_client("ws://localhost:9000/ws")
)class client_methods(rpc_methods_base):
async def notify_event(self, event_type: str, data: dict):
print(f"received event: {event_type} with data: {data}")
return {"status": "acknowledged"}
class server_methods(rpc_methods_base):
async def broadcast_message(self, message: str):
# this will be called by the server to notify all clients
return {"broadcast": message}
# client with bidirectional support
async def bidirectional_client():
async with websocket_rpc_client(
"ws://localhost:9000/ws",
client_methods()
) as client:
# client can call server methods
response = await client.other.get_data()
# server can call client methods via channel
await client.channel.other.notify_event("update", {"version": "2.0"})see the examples and tests folders for more server and client examples
from fasterpc import websocket_rpc_client, rpc_methods_base
from tenacity import retry, stop_after_attempt, wait_exponential
# custom retry configuration
retry_config = {
"retry": retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
}
async with websocket_rpc_client(
"ws://localhost:9000/ws",
rpc_methods_base(),
retry_config=retry_config
) as client:
# connection will automatically retry on failure
passfrom fasterpc import websocket_frame_type
class binary_handler(rpc_methods_base):
async def send_file(self, filename: str, data: bytes):
# handle binary data
with open(f"received_{filename}", "wb") as f:
f.write(data)
return {"status": "file_saved", "size": len(data)}
async def get_large_data(self) -> bytes:
# return binary data
return b"large_binary_data_here"import json
from datetime import datetime
class custom_serializer:
def serialize(self, obj):
if isinstance(obj, datetime):
return {"__datetime__": obj.isoformat()}
return obj
def deserialize(self, obj):
if isinstance(obj, dict) and "__datetime__" in obj:
return datetime.fromisoformat(obj["__datetime__"])
return obj- clients can call
client.other.method()- which is a shortcut for
channel.other.method()
- which is a shortcut for
- servers also get the channel object and can call remote methods via
channel.other.method() - see the bidirectional call example for calling client from server and server events (e.g.
on_connect).
websockets are ideal to create bi-directional realtime connections over the web:
- push updates and notifications
- remote control mechanism
- pub / sub patterns
- trigger events and workflows (see "tests/trigger_flow_test.py")
- node negotiations and distributed systems (see "tests/advanced_rpc_test.py :: test_recursive_rpc_calls")
- real-time dashboards and monitoring
- multiplayer game state synchronization
- iot device communication
- microservices inter-service communication
- multi-agent frameworks and ai agent orchestration
fasterpc excels at building distributed multi-agent systems where agents can communicate and collaborate in real-time. this is perfect for ai agent orchestration, microservices, and distributed computing.
agents can directly call methods on other agents using rpc:
# agent a calls agent b
async with websocket_rpc_client("ws://agent-b:9001/ws", rpc_methods_base()) as client:
result = await client.other.process_data(data=my_data)
print(f"agent b processed: {result.result}")see the agents directory for a full working example with 5 specialized agents:
class research_agent_methods(rpc_methods_base):
async def search_topic(self, topic: str) -> Dict[str, Any]:
# search for information on a topic
return {"topic": topic, "sources": ["source1", "source2"]}
async def validate_source(self, url: str) -> Dict[str, Any]:
# validate source credibility
return {"url": url, "credibility_score": 0.85}class analysis_agent_methods(rpc_methods_base):
async def analyze_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
# analyze provided data
return {"patterns_found": ["pattern1", "pattern2"]}
async def generate_insights(self, text: str) -> Dict[str, Any]:
# extract insights from text
return {"key_terms": ["term1", "term2"], "sentiment": "positive"}class writing_agent_methods(rpc_methods_base):
async def write_summary(self, data: Dict[str, Any]) -> Dict[str, Any]:
# create summaries from data
return {"content": "summary text", "word_count": 50}
async def generate_report(self, topic: str, data: Dict[str, Any]) -> Dict[str, Any]:
# generate comprehensive reports
return {"content": "full report", "sections": ["intro", "body", "conclusion"]}class storage_agent_methods(rpc_methods_base):
async def save_data(self, key: str, data: Dict[str, Any]) -> Dict[str, Any]:
# save data with key
return {"key": key, "status": "saved"}
async def retrieve_data(self, key: str) -> Dict[str, Any]:
# retrieve data by key
return {"key": key, "data": stored_data}class coordination_agent_methods(rpc_methods_base):
async def coordinate_workflow(self, workflow: Dict[str, Any]) -> Dict[str, Any]:
# orchestrate multi-step workflows
return {"workflow_id": "workflow_123", "status": "completed"}
async def delegate_task(self, task: str, agent_type: str) -> Dict[str, Any]:
# delegate tasks to specific agents
return {"task": task, "agent": agent_type, "status": "delegated"}cd tests/agents
python start_all_agents.pypython agent_orchestrator.pyasync def research_workflow():
# step 1: research agent searches for information
async with websocket_rpc_client("ws://localhost:9001/ws", rpc_methods_base()) as research:
search_result = await research.other.search_topic(topic="python websockets")
# step 2: analysis agent processes the results
async with websocket_rpc_client("ws://localhost:9002/ws", rpc_methods_base()) as analysis:
analysis_result = await analysis.other.analyze_data(data=search_result.result)
# step 3: writing agent creates a summary
async with websocket_rpc_client("ws://localhost:9003/ws", rpc_methods_base()) as writing:
summary = await writing.other.write_summary(data={
"topic": search_result.result["topic"],
"insights": analysis_result.result["patterns_found"]
})
# step 4: storage agent saves everything
async with websocket_rpc_client("ws://localhost:9004/ws", rpc_methods_base()) as storage:
await storage.other.save_data(key="workflow_result", data={
"search": search_result.result,
"analysis": analysis_result.result,
"summary": summary.result
})fasterpc can communicate with agents written in any language or any service that supports websockets:
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 9005 });
wss.on('connection', (ws) => {
ws.on('message', (data) => {
const request = JSON.parse(data);
// handle rpc calls
if (request.method === 'process_data') {
const result = processData(request.params);
ws.send(JSON.stringify({
id: request.id,
result: result
}));
}
});
});package main
import (
"github.com/gorilla/websocket"
"net/http"
)
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
for {
messageType, message, err := conn.ReadMessage()
if err != nil {
break
}
// handle rpc calls
response := handleRpcCall(message)
conn.WriteMessage(messageType, response)
}
}# coordination agent manages other agents
async def hierarchical_workflow():
async with websocket_rpc_client("ws://localhost:9000/ws", rpc_methods_base()) as coordinator:
workflow = {
"name": "research_analysis_workflow",
"steps": [
{"action": "search", "agent": "research"},
{"action": "analyze", "agent": "analysis"},
{"action": "write", "agent": "writing"},
{"action": "store", "agent": "storage"}
]
}
result = await coordinator.other.coordinate_workflow(workflow=workflow)# agents communicate directly with each other
async def peer_communication():
# agent a calls agent b
async with websocket_rpc_client("ws://agent-b:9001/ws", rpc_methods_base()) as client:
result = await client.other.process_request(data=my_data)
# agent b can call back to agent a
await client.channel.other.notify_completion(result=result.result)# multiple instances of the same agent type
agent_instances = [
"ws://agent-1:9001/ws",
"ws://agent-2:9001/ws",
"ws://agent-3:9001/ws"
]
async def load_balanced_call():
# round-robin or random selection
selected_agent = random.choice(agent_instances)
async with websocket_rpc_client(selected_agent, rpc_methods_base()) as client:
result = await client.other.process_task(task=my_task)the multi-agent system demonstrates excellent performance:
- 19+ calls/second with 100% success rate
- real-time communication between agents
- automatic reconnection and fault tolerance
- concurrent agent communication with asyncio
- ai research assistants - specialized agents for different research tasks
- content creation pipelines - research → analysis → writing → publishing
- data processing workflows - collection → processing → analysis → storage
- microservices architecture - each service as a specialized agent
- distributed computing - agents running on different machines
- real-time monitoring - agents monitoring different systems
- automated testing - agents for different types of testing
- rpc_channel - implements the rpc-protocol over the websocket
- sending rpc_requests per method call
- creating promises to track them (via unique call ids), and allow waiting for responses
- executing methods on the remote side and serializing return values
- receiving rpc_responses and delivering them to waiting callers
- handling connection state and reconnection logic
- rpc_methods - classes passed to both client and server-endpoint inits to expose callable methods to the other side.
- simply derive from rpc_methods_base and add your own async methods
- note currently only key-word arguments are supported
- checkout rpc_utility_methods for example methods, which are also useful debugging utilities
- support for method validation and error handling
- automatic reconnection with exponential backoff
- heartbeat monitoring and connection health checks
- connection pooling for multiple clients
- graceful shutdown and cleanup
- asyncio-based: built on python's asyncio for high-performance async operations
- fastapi integration: leverages fastapi's asgi platform for web server capabilities
- pydantic serialization: uses pydantic for robust data validation and serialization
- tenacity retry logic: configurable retry mechanisms for network resilience
- websockets library: comprehensive websocket client implementation
- type hints: full type annotation support for better development experience
fasterpc provides a helper logging module to control how it produces logs for you.
see fasterpc/logger.py.
use logging_config.set_mode or the 'ws_rpc_logging' environment variable to choose the logging method you prefer or override completely via default logging config.
example:
# set rpc to log like uvicorn
from fasterpc.logger import logging_config, logging_modes
logging_config.set_mode(logging_modes.uvicorn)
# custom logging configuration
import logging
logging.basicConfig(level=logging.debug)
logger = logging.getlogger("fasterpc")by default, fasterpc uses websockets module as websocket client handler. this does not support http(s) proxy, see python-websockets/websockets#364 . if the ability to use a proxy is important to you, another websocket client implementation can be used, e.g. websocket-client (https://websocket-client.readthedocs.io). here is how to use it:
installation:
pip install fasterpc[websocket-client]
usage:
import asyncio
from fasterpc import rpc_methods_base, websocket_rpc_client, proxy_enabled_websocket_client_handler
async def run_client(uri):
async with websocket_rpc_client(
uri,
rpc_methods_base(),
websocket_client_handler_cls=proxy_enabled_websocket_client_handler
) as client:
# your rpc calls here
passjust set standard environment variables (lowercase and uppercase works): http_proxy, https_proxy, and no_proxy before running python script.
- use connection pooling for multiple clients
- implement proper error handling and retry logic
- monitor memory usage with large data transfers
- use binary data for large payloads
- implement rate limiting for high-frequency calls
import time
import asyncio
async def benchmark_rpc_calls(client, iterations=1000):
start_time = time.time()
for i in range(iterations):
await client.other.echo(data=f"test_{i}")
end_time = time.time()
duration = end_time - start_time
calls_per_second = iterations / duration
print(f"completed {iterations} calls in {duration:.2f}s")
print(f"rate: {calls_per_second:.2f} calls/second")from fastapi import depends, header
async def verify_token(authorization: str = header(...)):
if not authorization.startswith("bearer "):
raise http_exception(status_code=401, detail="invalid token")
# implement your token verification logic here
return authorization.split(" ")[1]
# use in your endpoint
endpoint = websocket_rpc_endpoint(
concat_server(),
dependencies=[depends(verify_token)]
)from pydantic import validator
class secure_data_processor(rpc_methods_base):
@validator('input_data')
def validate_input(cls, v):
if len(v) > 1000000: # 1mb limit
raise valueerror("input too large")
return v
async def process_data(self, input_data: str):
# your processing logic here
pass- please include tests for new features
- follow the existing code style and patterns
- add documentation for new functionality
- ensure backward compatibility when possible