Skip to content

ctrlplanedev/relay

Repository files navigation

Relay

A transport-agnostic relay server for secure tunneling between clients and agents. The relay enables PTY sessions, TCP/UDP tunnels with pluggable middleware, authorization, and storage backends. Designed for both single-node and distributed deployments.

System Overview

                                    ┌─────────────────────────────────────────────────────────────┐
                                    │                         Relay Server                        │
                                    │                                                             │
┌─────────┐                         │  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐  │                         ┌─────────┐
│         │  WebSocket/gRPC/etc     │  │   Agent     │  │   Client    │  │   Session           │  │  WebSocket/gRPC/etc     │         │
│ Clients ├────────────────────────►│  │  Registry   │  │  Registry   │  │   Manager           │  │◄────────────────────────┤ Agents  │
│         │                         │  └──────┬──────┘  └──────┬──────┘  └──────────┬──────────┘  │                         │         │
└─────────┘                         │         │                │                    │             │                         └─────────┘
                                    │         └────────────────┼────────────────────┘             │
                                    │                          │                                  │
                                    │                          ▼                                  │
                                    │  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐  │
                                    │  │ Authorizer  │  │ Middleware  │  │   Message Broker    │  │
                                    │  │ (optional)  │  │   Chain     │  │   + State Store     │  │
                                    │  └─────────────┘  └─────────────┘  └─────────────────────┘  │
                                    │                                                             │
                                    └─────────────────────────────────────────────────────────────┘

How It Works

  1. Agents connect to the relay and register their capabilities (PTY, TCP tunnels, UDP tunnels)
  2. Clients connect and request sessions with specific agents
  3. The Hub orchestrates session creation, routing messages between clients and agents
  4. Sessions are bidirectional byte streams multiplexed over a single agent connection
  5. All data flows through optional Middleware for logging, filtering, or transformation

Core Concepts

Identity Types

Type Description
AgentID Uniquely identifies an agent (the remote endpoint providing resources)
ClientID Uniquely identifies a client (the caller requesting sessions)
SessionID Uniquely identifies an active session between a client and agent

Session Types

Type Capability Description
pty CapabilityPTY Interactive terminal (SSH-like shell access)
tunnel:tcp CapabilityTCPTunnel TCP port forwarding
tunnel:udp CapabilityUDPTunnel UDP port forwarding

Message Protocol

All communication uses a unified Message envelope:

type Message struct {
    Type      MessageType       // What kind of message (register, session:data, etc.)
    SessionID SessionID         // Which session (empty for control messages)
    Payload   []byte            // Raw data or JSON-encoded metadata
    Metadata  map[string]string // Optional key-value pairs
}

Message types include:

  • Control: register, heartbeat
  • Session lifecycle: session:open, session:close, session:ready
  • Data: session:data, pty:resize, error

Interfaces

The relay is built on a set of composable interfaces. Implement these to customize behavior or integrate with different backends.

Connection Interfaces

┌─────────────────────────────────────────────────────────────────────────┐
│                           MessageConn                                   │
│  Base interface for bidirectional message passing                       │
│  ├── Send(ctx, msg) error                                               │
│  ├── Receive(ctx) (*Message, error)                                     │
│  └── Close() error                                                      │
└─────────────────────────────────────────────────────────────────────────┘
           │                    │                         │
           ▼                    ▼                         ▼
    ┌─────────────┐      ┌─────────────┐          ┌─────────────┐
    │  AgentConn  │      │ ClientConn  │          │ ServerConn  │
    │  (server    │      │ (server     │          │ (outbound   │
    │   side)     │      │  side)      │          │  from agent │
    │             │      │             │          │  or client) │
    │ + ID()      │      │ + ID()      │          │ + Connect() │
    │ + Info()    │      └─────────────┘          └─────────────┘
    └─────────────┘

MessageConn

The base interface all connections implement:

type MessageConn interface {
    Send(ctx context.Context, msg *Message) error
    Receive(ctx context.Context) (*Message, error)
    Close() error
}

AgentConn

Server-side view of an agent connection:

type AgentConn interface {
    MessageConn
    ID() AgentID
    Info() *AgentInfo
}

ClientConn

Server-side view of a client connection:

type ClientConn interface {
    MessageConn
    ID() ClientID
}

ServerConn

Outbound connection from an agent or client to the server:

type ServerConn interface {
    MessageConn
    Connect(ctx context.Context, url string) error
}

Registry Interfaces

┌─────────────────────────────────────────────────────────────────────────┐
│                          AgentRegistry                                  │
│  Manages agent connections and discovery                                │
│  ├── RegisterAgent(ctx, conn) error                                     │
│  ├── UnregisterAgent(ctx, id) error                                     │
│  ├── GetAgent(ctx, id) (*AgentInfo, error)                              │
│  ├── GetAgentConn(ctx, id) (AgentConn, error)                           │
│  ├── ListAgents(ctx, opts) ([]*AgentInfo, error)                        │
│  └── HandleHeartbeat(ctx, agentID) error                                │
└─────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│                          ClientRegistry                                 │
│  Manages client connections                                             │
│  ├── RegisterClient(ctx, conn) error                                    │
│  ├── UnregisterClient(ctx, id) error                                    │
│  └── GetClientConn(ctx, id) (ClientConn, error)                         │
└─────────────────────────────────────────────────────────────────────────┘

Registries store connection state. For distributed deployments, implement with Redis or similar to share state across nodes.


Session Interface

┌─────────────────────────────────────────────────────────────────────────┐
│                             Session                                     │
│  Bidirectional communication channel between client and agent           │
│  ├── ID() SessionID                                                     │
│  ├── Type() SessionType                                                 │
│  ├── AgentID() AgentID                                                  │
│  ├── ClientID() ClientID                                                │
│  ├── Write(ctx, data) error              // Client → Agent              │
│  ├── DeliverOutput(ctx, data) error      // Agent → Client              │
│  ├── NotifyClose(ctx) error                                             │
│  ├── SetOnClose(callback)                                               │
│  ├── Close(ctx) error                                                   │
│  └── Resize(ctx, rows, cols) error       // PTY only                    │
└─────────────────────────────────────────────────────────────────────────┘

Sessions are the fundamental data path. They're bidirectional byte streams that:

  • Route input from clients to agents via Write()
  • Deliver output from agents to clients via DeliverOutput()
  • Support PTY resize events (returns error for non-PTY sessions)

SessionManager Interface

┌─────────────────────────────────────────────────────────────────────────┐
│                          SessionManager                                 │
│  Manages session lifecycle and message routing                          │
│  ├── OpenSession(ctx, clientID, agentID, type, opts) (SessionID, error) │
│  ├── CloseSession(ctx, sessionID) error                                 │
│  ├── GetSession(ctx, sessionID) (Session, error)                        │
│  ├── HandleSessionMessage(ctx, agentID, msg) error                      │
│  ├── CloseAgentSessions(ctx, agentID) error                             │
│  ├── CloseClientSessions(ctx, clientID) error                           │
│  └── AdoptAgentSessions(ctx, agentID) error   // For reconnection       │
└─────────────────────────────────────────────────────────────────────────┘

The SessionManager coordinates session creation, tracks state, and handles agent reconnection in distributed deployments.


Broker Interfaces (Distributed)

For multi-node deployments, the broker handles cross-node communication:

┌─────────────────────────────────────────────────────────────────────────┐
│                          MessageBroker                                  │
│  Routes messages between nodes                                          │
│  ├── Publish(ctx, envelope) error                                       │
│  ├── Subscribe(ctx, targetType, handler) error                          │
│  └── Close() error                                                      │
└─────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│                           StateStore                                    │
│  Generic key-value storage for distributed state                        │
│  ├── Get(ctx, key) ([]byte, error)                                      │
│  ├── Set(ctx, key, value) error                                         │
│  ├── Delete(ctx, key) error                                             │
│  └── List(ctx, prefix) ([][]byte, error)                                │
└─────────────────────────────────────────────────────────────────────────┘

The default channel-based implementations work for single-node. For horizontal scaling, implement with Redis, NATS, or similar.


Authorization Interface

type Authorizer interface {
    Authorize(ctx context.Context, req *AuthRequest) (*AuthResponse, error)
}

type AuthRequest struct {
    Subject  string            // Who (user ID, token)
    Action   AuthAction        // What (agent:register, session:open)
    Resource string            // Target (agent ID)
    Context  map[string]string // Additional context
}

Actions: agent:register, session:open, session:tunnel


Middleware Interfaces

// Intercepts data flowing between client and agent
type DataMiddleware interface {
    ProcessClientToAgent(ctx context.Context, sessionID SessionID, data []byte) ([]byte, error)
    ProcessAgentToClient(ctx context.Context, sessionID SessionID, data []byte) ([]byte, error)
}

// Receives session lifecycle events
type SessionMiddleware interface {
    OnSessionOpen(ctx context.Context, agentID AgentID, sessionType SessionType, opts *SessionOptions) error
    OnSessionClose(ctx context.Context, sessionID SessionID) error
}

Use cases:

  • Logging: Record sessions for audit
  • Filtering: Block commands or patterns
  • Transformation: Inject environment variables
  • Rate limiting: Throttle throughput

Directory Structure

.
├── *.go                    # Core interfaces (types, session, auth, conn, etc.)
├── agent/                  # Agent implementation
│   ├── agent.go            # Agent logic
│   ├── pty/                # PTY session handler
│   ├── tcp/                # TCP tunnel handler
│   └── udp/                # UDP tunnel handler
├── auth/                   # Authorization implementations
│   └── webhook.go          # Webhook-based authorizer
├── broker/                 # Message broker abstractions
│   ├── broker.go           # MessageBroker and StateStore interfaces
│   ├── store.go            # Type-safe helper functions for StateStore
│   └── channel/            # In-memory channel-based implementation
├── cmd/                    # CLI binaries
│   ├── server/             # Relay server
│   ├── agent/              # Agent binary
│   └── client/             # Client CLI
├── conn/                   # Connection implementations
│   └── ws/                 # WebSocket connections
├── internal/               # Internal utilities
│   └── wsutil/             # WebSocket helpers
└── server/                 # Server-side implementations
    ├── hub/                # Central Hub coordinator
    ├── handler/            # HTTP handlers
    ├── middleware/         # Middleware interfaces
    ├── proxy/              # Session proxy implementation
    └── store/distributed/  # Distributed registry and session storage

Usage

Start the Server

go run ./cmd/server -addr :8080 -log-level debug

Start an Agent

go run ./cmd/agent -server ws://localhost:8080/agent/connect -id my-agent

Connect as Client

# Interactive PTY session
go run ./cmd/client -server ws://localhost:8080 -agent my-agent

# List agents via API
curl http://localhost:8080/api/agents

Endpoints

Endpoint Description
GET /api/agents List connected agents
WS /agent/connect Agent WebSocket connection
WS /client/connect?agent=<id> Client PTY session
WS /tunnel/connect?agent=<id>&host=<host>&port=<port> TCP tunnel

Extending

Custom Transport (e.g., gRPC)

type GRPCServer struct {
    hub *hub.Hub
}

func (s *GRPCServer) HandleAgentStream(stream pb.Relay_AgentStreamServer) {
    agentConn := NewGRPCAgentConn(stream)
    s.hub.RegisterAgent(ctx, agentConn)
    s.hub.RunAgentLoop(ctx, agentConn)
    s.hub.UnregisterAgent(ctx, agentConn.ID())
}

Custom Storage (e.g., Redis)

Implement StateStore for distributed state:

type RedisStateStore struct {
    client *redis.Client
}

func (r *RedisStateStore) Get(ctx context.Context, key string) ([]byte, error) {
    return r.client.Get(ctx, key).Bytes()
}

func (r *RedisStateStore) Set(ctx context.Context, key string, value []byte) error {
    return r.client.Set(ctx, key, value, 0).Err()
}

// ... implement Delete and List

Custom Middleware

type AuditLogger struct {
    db *sql.DB
}

func (a *AuditLogger) ProcessClientToAgent(ctx context.Context, sessionID relay.SessionID, data []byte) ([]byte, error) {
    a.db.Exec("INSERT INTO audit_log (session_id, direction, data) VALUES (?, 'c2a', ?)", sessionID, data)
    return data, nil
}

func (a *AuditLogger) ProcessAgentToClient(ctx context.Context, sessionID relay.SessionID, data []byte) ([]byte, error) {
    a.db.Exec("INSERT INTO audit_log (session_id, direction, data) VALUES (?, 'a2c', ?)", sessionID, data)
    return data, nil
}

Design Principles

  1. Transport-agnostic: Core logic is interface-driven; transports just handle wire protocols
  2. Pluggable components: Registry, SessionManager, Authorizer, Middleware are all interfaces
  3. Sessions are tunnels: All session types share the same interface (bidirectional byte streams)
  4. Single-node and distributed: Channel broker for simple deployments, Redis/NATS for scaling
  5. Reconnection support: Sessions can survive agent reconnection via AdoptAgentSessions
  6. Middleware for cross-cutting concerns: Logging, filtering, transformation without modifying core logic

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages