- PostgreSQL-based service discovery for rpcx by @smallnest
- Real-time service updates using PostgreSQL NOTIFY/LISTEN
- Automatic cleanup of stale services
- PostgreSQL 13 or higher
The TABLE, FUNCTION and TRIGGER will be automatically created on start. The plugin can be configured with the following options:
type PostgresRegisterPlugin struct {
Table string // Table name for service registration (Default: "rpcx_services")
ServicePath string // Service registration path
ServiceAddress string // Service address (e.g., "tcp@localhost:8972")
UpdateInterval time.Duration // Interval for updating service TTL
type PostgresDiscoveryOption struct {
Table string // Table name for service discovery (Default: "rpcx_services")
RetryCount int // Retry count for watch operations (-1 for infinite)
Filter client.ServiceDiscoveryFilter // Optional filter for services
Here's a complete example showing both server and client:
package main
import (
pgregistry "github.com/dapona/rpcx-postgres"
type Arith struct{}
func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
reply.C = args.A * args.B
return nil
type Args struct {
A int `msg:"a"`
B int `msg:"b"`
type Reply struct {
C int `msg:"c"`
// Server example
func runServer(pool *pgxpool.Pool) error {
s := server.NewServer()
// Create the plugin
plugin, err := pgregistry.NewPostgresRegisterPlugin(
"tcp@localhost:8972", // service address
"examples/arith", // service path
"rpcx_services", // table name
30*time.Second, // update interval
if err != nil {
return err
err = plugin.Start(context.Background())
if err != nil {
return err
defer plugin.Stop()
// Add plugin to server
// Register service
s.RegisterName("Arith", new(Arith), "")
// Start server
return s.Serve("tcp", ":8972")
// Client example
func runClient(pool *pgxpool.Pool) error {
// Create service discovery
discovery, err := pgregistry.NewPostgresDiscoveryWithPool(
"examples/arith", // service path
"tcp@localhost:8973", // client address
RetryCount: -1, // infinite retries
"rpcx_services", // table name
if err != nil {
return err
defer discovery.Close()
// Create rpcx client
xclient := client.NewXClient("Arith", client.Failtry, client.RoundRobin, discovery, client.DefaultOption)
defer xclient.Close()
// Call service
args := &Args{A: 10, B: 20}
reply := &Reply{}
err = xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
return err
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
return nil
func main() {
var postgresURL = flag.String("postgres-url", "postgres://localhost:5432/mydb", "PostgreSQL connection URL")
// Create PostgreSQL connection pool
pool, err := pgxpool.New(context.Background(), *postgresURL)
if err != nil {
log.Fatalf("Failed to create pool: %v", err)
defer pool.Close()
// Run server in a goroutine
go func() {
if err := runServer(pool); err != nil {
log.Printf("Server error: %v", err)
// Wait for server to start
// Run client
if err := runClient(pool); err != nil {
log.Printf("Client error: %v", err)
MIT License