A transport-agnostic relay server for secure tunneling between clients and agents. The relay enables PTY sessions, TCP/UDP tunnels with pluggable middleware, authorization, and storage backends. Designed for both single-node and distributed deployments.
┌─────────────────────────────────────────────────────────────┐
│ Relay Server │
│ │
┌─────────┐ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ ┌─────────┐
│ │ WebSocket/gRPC/etc │ │ Agent │ │ Client │ │ Session │ │ WebSocket/gRPC/etc │ │
│ Clients ├────────────────────────►│ │ Registry │ │ Registry │ │ Manager │ │◄────────────────────────┤ Agents │
│ │ │ └──────┬──────┘ └──────┬──────┘ └──────────┬──────────┘ │ │ │
└─────────┘ │ │ │ │ │ └─────────┘
│ └────────────────┼────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Authorizer │ │ Middleware │ │ Message Broker │ │
│ │ (optional) │ │ Chain │ │ + State Store │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
- Agents connect to the relay and register their capabilities (PTY, TCP tunnels, UDP tunnels)
- Clients connect and request sessions with specific agents
- The Hub orchestrates session creation, routing messages between clients and agents
- Sessions are bidirectional byte streams multiplexed over a single agent connection
- All data flows through optional Middleware for logging, filtering, or transformation
| Type | Description |
|---|---|
AgentID |
Uniquely identifies an agent (the remote endpoint providing resources) |
ClientID |
Uniquely identifies a client (the caller requesting sessions) |
SessionID |
Uniquely identifies an active session between a client and agent |
| Type | Capability | Description |
|---|---|---|
pty |
CapabilityPTY |
Interactive terminal (SSH-like shell access) |
tunnel:tcp |
CapabilityTCPTunnel |
TCP port forwarding |
tunnel:udp |
CapabilityUDPTunnel |
UDP port forwarding |
All communication uses a unified Message envelope:
type Message struct {
Type MessageType // What kind of message (register, session:data, etc.)
SessionID SessionID // Which session (empty for control messages)
Payload []byte // Raw data or JSON-encoded metadata
Metadata map[string]string // Optional key-value pairs
}Message types include:
- Control:
register,heartbeat - Session lifecycle:
session:open,session:close,session:ready - Data:
session:data,pty:resize,error
The relay is built on a set of composable interfaces. Implement these to customize behavior or integrate with different backends.
┌─────────────────────────────────────────────────────────────────────────┐
│ MessageConn │
│ Base interface for bidirectional message passing │
│ ├── Send(ctx, msg) error │
│ ├── Receive(ctx) (*Message, error) │
│ └── Close() error │
└─────────────────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ AgentConn │ │ ClientConn │ │ ServerConn │
│ (server │ │ (server │ │ (outbound │
│ side) │ │ side) │ │ from agent │
│ │ │ │ │ or client) │
│ + ID() │ │ + ID() │ │ + Connect() │
│ + Info() │ └─────────────┘ └─────────────┘
└─────────────┘
The base interface all connections implement:
type MessageConn interface {
Send(ctx context.Context, msg *Message) error
Receive(ctx context.Context) (*Message, error)
Close() error
}Server-side view of an agent connection:
type AgentConn interface {
MessageConn
ID() AgentID
Info() *AgentInfo
}Server-side view of a client connection:
type ClientConn interface {
MessageConn
ID() ClientID
}Outbound connection from an agent or client to the server:
type ServerConn interface {
MessageConn
Connect(ctx context.Context, url string) error
}┌─────────────────────────────────────────────────────────────────────────┐
│ AgentRegistry │
│ Manages agent connections and discovery │
│ ├── RegisterAgent(ctx, conn) error │
│ ├── UnregisterAgent(ctx, id) error │
│ ├── GetAgent(ctx, id) (*AgentInfo, error) │
│ ├── GetAgentConn(ctx, id) (AgentConn, error) │
│ ├── ListAgents(ctx, opts) ([]*AgentInfo, error) │
│ └── HandleHeartbeat(ctx, agentID) error │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ ClientRegistry │
│ Manages client connections │
│ ├── RegisterClient(ctx, conn) error │
│ ├── UnregisterClient(ctx, id) error │
│ └── GetClientConn(ctx, id) (ClientConn, error) │
└─────────────────────────────────────────────────────────────────────────┘
Registries store connection state. For distributed deployments, implement with Redis or similar to share state across nodes.
┌─────────────────────────────────────────────────────────────────────────┐
│ Session │
│ Bidirectional communication channel between client and agent │
│ ├── ID() SessionID │
│ ├── Type() SessionType │
│ ├── AgentID() AgentID │
│ ├── ClientID() ClientID │
│ ├── Write(ctx, data) error // Client → Agent │
│ ├── DeliverOutput(ctx, data) error // Agent → Client │
│ ├── NotifyClose(ctx) error │
│ ├── SetOnClose(callback) │
│ ├── Close(ctx) error │
│ └── Resize(ctx, rows, cols) error // PTY only │
└─────────────────────────────────────────────────────────────────────────┘
Sessions are the fundamental data path. They're bidirectional byte streams that:
- Route input from clients to agents via
Write() - Deliver output from agents to clients via
DeliverOutput() - Support PTY resize events (returns error for non-PTY sessions)
┌─────────────────────────────────────────────────────────────────────────┐
│ SessionManager │
│ Manages session lifecycle and message routing │
│ ├── OpenSession(ctx, clientID, agentID, type, opts) (SessionID, error) │
│ ├── CloseSession(ctx, sessionID) error │
│ ├── GetSession(ctx, sessionID) (Session, error) │
│ ├── HandleSessionMessage(ctx, agentID, msg) error │
│ ├── CloseAgentSessions(ctx, agentID) error │
│ ├── CloseClientSessions(ctx, clientID) error │
│ └── AdoptAgentSessions(ctx, agentID) error // For reconnection │
└─────────────────────────────────────────────────────────────────────────┘
The SessionManager coordinates session creation, tracks state, and handles agent reconnection in distributed deployments.
For multi-node deployments, the broker handles cross-node communication:
┌─────────────────────────────────────────────────────────────────────────┐
│ MessageBroker │
│ Routes messages between nodes │
│ ├── Publish(ctx, envelope) error │
│ ├── Subscribe(ctx, targetType, handler) error │
│ └── Close() error │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ StateStore │
│ Generic key-value storage for distributed state │
│ ├── Get(ctx, key) ([]byte, error) │
│ ├── Set(ctx, key, value) error │
│ ├── Delete(ctx, key) error │
│ └── List(ctx, prefix) ([][]byte, error) │
└─────────────────────────────────────────────────────────────────────────┘
The default channel-based implementations work for single-node. For horizontal scaling, implement with Redis, NATS, or similar.
type Authorizer interface {
Authorize(ctx context.Context, req *AuthRequest) (*AuthResponse, error)
}
type AuthRequest struct {
Subject string // Who (user ID, token)
Action AuthAction // What (agent:register, session:open)
Resource string // Target (agent ID)
Context map[string]string // Additional context
}Actions: agent:register, session:open, session:tunnel
// Intercepts data flowing between client and agent
type DataMiddleware interface {
ProcessClientToAgent(ctx context.Context, sessionID SessionID, data []byte) ([]byte, error)
ProcessAgentToClient(ctx context.Context, sessionID SessionID, data []byte) ([]byte, error)
}
// Receives session lifecycle events
type SessionMiddleware interface {
OnSessionOpen(ctx context.Context, agentID AgentID, sessionType SessionType, opts *SessionOptions) error
OnSessionClose(ctx context.Context, sessionID SessionID) error
}Use cases:
- Logging: Record sessions for audit
- Filtering: Block commands or patterns
- Transformation: Inject environment variables
- Rate limiting: Throttle throughput
.
├── *.go # Core interfaces (types, session, auth, conn, etc.)
├── agent/ # Agent implementation
│ ├── agent.go # Agent logic
│ ├── pty/ # PTY session handler
│ ├── tcp/ # TCP tunnel handler
│ └── udp/ # UDP tunnel handler
├── auth/ # Authorization implementations
│ └── webhook.go # Webhook-based authorizer
├── broker/ # Message broker abstractions
│ ├── broker.go # MessageBroker and StateStore interfaces
│ ├── store.go # Type-safe helper functions for StateStore
│ └── channel/ # In-memory channel-based implementation
├── cmd/ # CLI binaries
│ ├── server/ # Relay server
│ ├── agent/ # Agent binary
│ └── client/ # Client CLI
├── conn/ # Connection implementations
│ └── ws/ # WebSocket connections
├── internal/ # Internal utilities
│ └── wsutil/ # WebSocket helpers
└── server/ # Server-side implementations
├── hub/ # Central Hub coordinator
├── handler/ # HTTP handlers
├── middleware/ # Middleware interfaces
├── proxy/ # Session proxy implementation
└── store/distributed/ # Distributed registry and session storage
go run ./cmd/server -addr :8080 -log-level debuggo run ./cmd/agent -server ws://localhost:8080/agent/connect -id my-agent# Interactive PTY session
go run ./cmd/client -server ws://localhost:8080 -agent my-agent
# List agents via API
curl http://localhost:8080/api/agents| Endpoint | Description |
|---|---|
GET /api/agents |
List connected agents |
WS /agent/connect |
Agent WebSocket connection |
WS /client/connect?agent=<id> |
Client PTY session |
WS /tunnel/connect?agent=<id>&host=<host>&port=<port> |
TCP tunnel |
type GRPCServer struct {
hub *hub.Hub
}
func (s *GRPCServer) HandleAgentStream(stream pb.Relay_AgentStreamServer) {
agentConn := NewGRPCAgentConn(stream)
s.hub.RegisterAgent(ctx, agentConn)
s.hub.RunAgentLoop(ctx, agentConn)
s.hub.UnregisterAgent(ctx, agentConn.ID())
}Implement StateStore for distributed state:
type RedisStateStore struct {
client *redis.Client
}
func (r *RedisStateStore) Get(ctx context.Context, key string) ([]byte, error) {
return r.client.Get(ctx, key).Bytes()
}
func (r *RedisStateStore) Set(ctx context.Context, key string, value []byte) error {
return r.client.Set(ctx, key, value, 0).Err()
}
// ... implement Delete and Listtype AuditLogger struct {
db *sql.DB
}
func (a *AuditLogger) ProcessClientToAgent(ctx context.Context, sessionID relay.SessionID, data []byte) ([]byte, error) {
a.db.Exec("INSERT INTO audit_log (session_id, direction, data) VALUES (?, 'c2a', ?)", sessionID, data)
return data, nil
}
func (a *AuditLogger) ProcessAgentToClient(ctx context.Context, sessionID relay.SessionID, data []byte) ([]byte, error) {
a.db.Exec("INSERT INTO audit_log (session_id, direction, data) VALUES (?, 'a2c', ?)", sessionID, data)
return data, nil
}- Transport-agnostic: Core logic is interface-driven; transports just handle wire protocols
- Pluggable components: Registry, SessionManager, Authorizer, Middleware are all interfaces
- Sessions are tunnels: All session types share the same interface (bidirectional byte streams)
- Single-node and distributed: Channel broker for simple deployments, Redis/NATS for scaling
- Reconnection support: Sessions can survive agent reconnection via
AdoptAgentSessions - Middleware for cross-cutting concerns: Logging, filtering, transformation without modifying core logic