WebSocket API

The WebSocket API provides real-time, bidirectional communication with InputLayer. It supports authentication, query execution, streaming results, session rules, and push notifications.

Connection

Connect to the global WebSocket endpoint:

ws://host:port/ws

With TLS (via reverse proxy):

wss://host:port/ws

Optionally specify a knowledge graph via query parameter:

ws://host:port/ws?kg=mydb

Authentication

The first message after connection must authenticate. There are two methods:

Username/Password

{
  "type": "login",
  "username": "admin",
  "password": "admin"
}

API Key

{
  "type": "authenticate",
  "api_key": "your-api-key"
}

Auth Response

On success:

{
  "type": "authenticated",
  "session_id": "a1b2c3d4",
  "knowledge_graph": "default",
  "version": "0.1.0",
  "role": "admin"
}

On failure:

{
  "type": "auth_error",
  "message": "Invalid credentials"
}

The server allows 30 seconds for authentication before closing the connection.

Executing Statements

Send any InputLayer statement or meta command:

{
  "type": "execute",
  "program": "?edge(X, Y)"
}

The program field accepts any valid InputLayer statement: queries, inserts, rule definitions, meta commands, etc.

Result Response

{
  "type": "result",
  "columns": ["X", "Y"],
  "rows": [[1, 2], [2, 3], [3, 4]],
  "row_count": 3,
  "total_count": 3,
  "truncated": false,
  "execution_time_ms": 2
}

Error Response

{
  "type": "error",
  "message": "Unknown relation 'edge'"
}

Knowledge Graph Operations

Use meta commands via execute:

{"type": "execute", "program": ".kg create mydb"}
{"type": "execute", "program": ".kg use mydb"}
{"type": "execute", "program": ".kg list"}
{"type": "execute", "program": ".kg drop mydb"}

When .kg use switches knowledge graphs, the result includes a switched_kg field.

Streaming Results

For large result sets (> 1MB serialized), the server automatically streams results in chunks:

Stream Start

{
  "type": "result_start",
  "columns": ["X", "Y"],
  "total_count": 50000,
  "truncated": false,
  "execution_time_ms": 245
}

Stream Chunk

{
  "type": "result_chunk",
  "rows": [[1, 2], [2, 3]],
  "chunk_index": 0
}

Each chunk contains up to 500 rows. Chunks are sent sequentially.

Stream End

{
  "type": "result_end",
  "row_count": 50000,
  "chunk_count": 100
}

Small results (< 1MB) use the single result message, maintaining backward compatibility.

Session Rules

Session rules are ephemeral rules scoped to the current WebSocket connection. They are automatically cleared when the connection closes.

Define Session Rule

Session rules omit the + prefix:

{
  "type": "execute",
  "program": "temp(X, Y) <- edge(X, Y), X < Y"
}

Session Facts

Insert transient facts scoped to the session:

{
  "type": "execute",
  "program": "context(\"user\", \"alice\")"
}

Query with Session Rules

Session rules are automatically included when querying:

{
  "type": "execute",
  "program": "?temp(X, Y)"
}

Clear Session

{
  "type": "execute",
  "program": ".session clear"
}

Notifications

The server pushes notifications when persistent data changes in the session's knowledge graph.

Persistent Update

Sent when base facts in a relation change:

{
  "type": "persistent_update",
  "knowledge_graph": "default",
  "relation": "edge",
  "operation": "insert",
  "count": 5,
  "timestamp_ms": 1700000000000,
  "seq": 42
}

Rule Change

Sent when a rule is registered, removed, or dropped:

{
  "type": "rule_change",
  "knowledge_graph": "default",
  "rule_name": "reachable",
  "operation": "registered",
  "timestamp_ms": 1700000000000,
  "seq": 43
}

KG Change

Sent when a knowledge graph is created or dropped:

{
  "type": "kg_change",
  "knowledge_graph": "mydb",
  "operation": "created",
  "timestamp_ms": 1700000000000,
  "seq": 44
}

Schema Change

Sent when a relation is dropped:

{
  "type": "schema_change",
  "knowledge_graph": "default",
  "entity": "edge",
  "operation": "dropped",
  "timestamp_ms": 1700000000000,
  "seq": 45
}

The seq field is a monotonic sequence number for deduplication on reconnect.

Keep-Alive

Send a ping to keep the connection alive:

{"type": "ping"}

Response:

{"type": "pong"}

The server also sends WebSocket-level pings every 30 seconds to detect dead connections.

Connection Lifecycle

  1. Connect to ws://host:port/ws
  2. Authenticate with login or authenticate
  3. Execute statements and queries
  4. Receive notifications for data changes
  5. Close gracefully by sending a WebSocket Close frame

Graceful Close

Always send a Close frame before disconnecting to avoid server warnings:

# Python
await websocket.close()
// JavaScript
ws.close(1000, "Normal closure");

Reconnection

If the connection drops:

  1. Wait with exponential backoff (1s, 2s, 4s, 8s, max 30s)
  2. Reconnect and re-authenticate
  3. Session rules and facts are lost — re-define them after reconnecting
  4. Persistent data (facts, rules, indexes) is unaffected
  5. Use the seq field from notifications to detect missed events

Rate Limiting

The server applies per-connection rate limiting (default: 1000 messages/sec, configurable). Exceeding the limit returns:

{
  "type": "error",
  "message": "Rate limit exceeded (1000 msgs/sec)"
}

Example: Python Client





async def main():
    async with websockets.connect("ws://localhost:8080/ws") as ws:
        # Authenticate
        await ws.send(json.dumps({
            "type": "login",
            "username": "admin",
            "password": "admin"
        }))
        auth_resp = json.loads(await ws.recv())
        assert auth_resp["type"] == "authenticated"
        print(f"Session: {auth_resp['session_id']}")

        # Insert data
        await ws.send(json.dumps({
            "type": "execute",
            "program": "+edge[(1, 2), (2, 3), (3, 4)]"
        }))
        result = json.loads(await ws.recv())

        # Query
        await ws.send(json.dumps({
            "type": "execute",
            "program": "?edge(X, Y)"
        }))
        result = json.loads(await ws.recv())
        print(f"Got {result['row_count']} rows")

        # Close gracefully
        await ws.close()

asyncio.run(main())

Example: JavaScript Client

const ws = new WebSocket("ws://localhost:8080/ws");

ws.onopen = () => {
  ws.send(JSON.stringify({
    type: "login",
    username: "admin",
    password: "admin"
  }));
};

ws.onmessage = (event) => {
  const msg = JSON.parse(event.data);

  if (msg.type === "authenticated") {
    // Authenticated — start querying
    ws.send(JSON.stringify({
      type: "execute",
      program: "?edge(X, Y)"
    }));
  }

  if (msg.type === "result") {
    console.log(`Query returned ${msg.row_count} rows`);
  }

  // Handle streaming results
  if (msg.type === "result_start") {
    console.log(`Streaming ${msg.total_count} rows...`);
  }
  if (msg.type === "result_chunk") {
    console.log(`Received chunk ${msg.chunk_index}`);
  }
  if (msg.type === "result_end") {
    console.log(`Stream complete: ${msg.row_count} rows`);
  }

  // Handle notifications
  if (msg.type === "persistent_update") {
    console.log(`${msg.relation}: ${msg.operation} ${msg.count} rows`);
  }
};