WebSocket API
The WebSocket API provides real-time, bidirectional communication with InputLayer. It supports authentication, query execution, streaming results, session rules, and push notifications.
Connection
Connect to the global WebSocket endpoint:
ws://host:port/ws
With TLS (via reverse proxy):
wss://host:port/ws
Optionally specify a knowledge graph via query parameter:
ws://host:port/ws?kg=mydb
Authentication
The first message after connection must authenticate. There are two methods:
Username/Password
{
"type": "login",
"username": "admin",
"password": "admin"
}
API Key
{
"type": "authenticate",
"api_key": "your-api-key"
}
Auth Response
On success:
{
"type": "authenticated",
"session_id": "a1b2c3d4",
"knowledge_graph": "default",
"version": "0.1.0",
"role": "admin"
}
On failure:
{
"type": "auth_error",
"message": "Invalid credentials"
}
The server allows 30 seconds for authentication before closing the connection.
Executing Statements
Send any InputLayer statement or meta command:
{
"type": "execute",
"program": "?edge(X, Y)"
}
The program field accepts any valid InputLayer statement: queries, inserts, rule definitions, meta commands, etc.
Result Response
{
"type": "result",
"columns": ["X", "Y"],
"rows": [[1, 2], [2, 3], [3, 4]],
"row_count": 3,
"total_count": 3,
"truncated": false,
"execution_time_ms": 2
}
Error Response
{
"type": "error",
"message": "Unknown relation 'edge'"
}
Knowledge Graph Operations
Use meta commands via execute:
{"type": "execute", "program": ".kg create mydb"}
{"type": "execute", "program": ".kg use mydb"}
{"type": "execute", "program": ".kg list"}
{"type": "execute", "program": ".kg drop mydb"}
When .kg use switches knowledge graphs, the result includes a switched_kg field.
Streaming Results
For large result sets (> 1MB serialized), the server automatically streams results in chunks:
Stream Start
{
"type": "result_start",
"columns": ["X", "Y"],
"total_count": 50000,
"truncated": false,
"execution_time_ms": 245
}
Stream Chunk
{
"type": "result_chunk",
"rows": [[1, 2], [2, 3]],
"chunk_index": 0
}
Each chunk contains up to 500 rows. Chunks are sent sequentially.
Stream End
{
"type": "result_end",
"row_count": 50000,
"chunk_count": 100
}
Small results (< 1MB) use the single result message, maintaining backward compatibility.
Session Rules
Session rules are ephemeral rules scoped to the current WebSocket connection. They are automatically cleared when the connection closes.
Define Session Rule
Session rules omit the + prefix:
{
"type": "execute",
"program": "temp(X, Y) <- edge(X, Y), X < Y"
}
Session Facts
Insert transient facts scoped to the session:
{
"type": "execute",
"program": "context(\"user\", \"alice\")"
}
Query with Session Rules
Session rules are automatically included when querying:
{
"type": "execute",
"program": "?temp(X, Y)"
}
Clear Session
{
"type": "execute",
"program": ".session clear"
}
Notifications
The server pushes notifications when persistent data changes in the session's knowledge graph.
Persistent Update
Sent when base facts in a relation change:
{
"type": "persistent_update",
"knowledge_graph": "default",
"relation": "edge",
"operation": "insert",
"count": 5,
"timestamp_ms": 1700000000000,
"seq": 42
}
Rule Change
Sent when a rule is registered, removed, or dropped:
{
"type": "rule_change",
"knowledge_graph": "default",
"rule_name": "reachable",
"operation": "registered",
"timestamp_ms": 1700000000000,
"seq": 43
}
KG Change
Sent when a knowledge graph is created or dropped:
{
"type": "kg_change",
"knowledge_graph": "mydb",
"operation": "created",
"timestamp_ms": 1700000000000,
"seq": 44
}
Schema Change
Sent when a relation is dropped:
{
"type": "schema_change",
"knowledge_graph": "default",
"entity": "edge",
"operation": "dropped",
"timestamp_ms": 1700000000000,
"seq": 45
}
The seq field is a monotonic sequence number for deduplication on reconnect.
Keep-Alive
Send a ping to keep the connection alive:
{"type": "ping"}
Response:
{"type": "pong"}
The server also sends WebSocket-level pings every 30 seconds to detect dead connections.
Connection Lifecycle
- Connect to
ws://host:port/ws - Authenticate with
loginorauthenticate - Execute statements and queries
- Receive notifications for data changes
- Close gracefully by sending a WebSocket Close frame
Graceful Close
Always send a Close frame before disconnecting to avoid server warnings:
# Python
await websocket.close()
// JavaScript
ws.close(1000, "Normal closure");
Reconnection
If the connection drops:
- Wait with exponential backoff (1s, 2s, 4s, 8s, max 30s)
- Reconnect and re-authenticate
- Session rules and facts are lost — re-define them after reconnecting
- Persistent data (facts, rules, indexes) is unaffected
- Use the
seqfield from notifications to detect missed events
Rate Limiting
The server applies per-connection rate limiting (default: 1000 messages/sec, configurable). Exceeding the limit returns:
{
"type": "error",
"message": "Rate limit exceeded (1000 msgs/sec)"
}
Example: Python Client
async def main():
async with websockets.connect("ws://localhost:8080/ws") as ws:
# Authenticate
await ws.send(json.dumps({
"type": "login",
"username": "admin",
"password": "admin"
}))
auth_resp = json.loads(await ws.recv())
assert auth_resp["type"] == "authenticated"
print(f"Session: {auth_resp['session_id']}")
# Insert data
await ws.send(json.dumps({
"type": "execute",
"program": "+edge[(1, 2), (2, 3), (3, 4)]"
}))
result = json.loads(await ws.recv())
# Query
await ws.send(json.dumps({
"type": "execute",
"program": "?edge(X, Y)"
}))
result = json.loads(await ws.recv())
print(f"Got {result['row_count']} rows")
# Close gracefully
await ws.close()
asyncio.run(main())
Example: JavaScript Client
const ws = new WebSocket("ws://localhost:8080/ws");
ws.onopen = () => {
ws.send(JSON.stringify({
type: "login",
username: "admin",
password: "admin"
}));
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === "authenticated") {
// Authenticated — start querying
ws.send(JSON.stringify({
type: "execute",
program: "?edge(X, Y)"
}));
}
if (msg.type === "result") {
console.log(`Query returned ${msg.row_count} rows`);
}
// Handle streaming results
if (msg.type === "result_start") {
console.log(`Streaming ${msg.total_count} rows...`);
}
if (msg.type === "result_chunk") {
console.log(`Received chunk ${msg.chunk_index}`);
}
if (msg.type === "result_end") {
console.log(`Stream complete: ${msg.row_count} rows`);
}
// Handle notifications
if (msg.type === "persistent_update") {
console.log(`${msg.relation}: ${msg.operation} ${msg.count} rows`);
}
};