Python SDK

InputLayer's Python SDK is an Object-Logic Mapper (OLM) that lets you work with your knowledge graph using plain Python classes. You define schemas as typed dataclasses, build queries with lambdas and operators, and the SDK compiles everything into InputLayer Query Language (IQL) behind the scenes. You never have to write IQL by hand for normal application code.

The SDK connects over WebSocket, so you get persistent connections, real-time notifications, and session-scoped state out of the box. For LangChain users, see the dedicated LangChain Integration guide.

Installation

pip install inputlayer-client-dev

# With pandas DataFrame support
pip install inputlayer-client-dev[pandas]

Requirements: Python 3.10+ and a running InputLayer server.

The package also installs the il CLI for managing schema migrations. See Migrations for the full guide.

Connecting

The InputLayer client manages a WebSocket connection with automatic reconnection. You can authenticate with username/password or API keys.


from inputlayer import InputLayer

async def main():
    async with InputLayer("ws://localhost:8080/ws", username="admin", password="admin") as il:
        print(f"Connected to InputLayer {il.server_version}")

asyncio.run(main())

If you prefer API key authentication:

async with InputLayer("ws://localhost:8080/ws", api_key="il_key_abc123") as il:
    ...

The client accepts a few optional parameters for connection resilience:

il = InputLayer(
    "ws://localhost:8080/ws",
    username="admin",
    password="admin",
    auto_reconnect=True,         # reconnect on disconnect (default: True)
    reconnect_delay=1.0,         # seconds between attempts (default: 1.0)
    max_reconnect_attempts=10,   # give up after N failures (default: 10)
)
await il.connect()
# ... use the client ...
await il.close()

Sync Client

If you're working in a script, notebook, or anywhere async isn't practical, use InputLayerSync instead. It wraps the async client and exposes the same API, just without await:

from inputlayer import InputLayerSync

with InputLayerSync("ws://localhost:8080/ws", username="admin", password="admin") as il:
    kg = il.knowledge_graph("myapp")
    kg.define(Employee)
    result = kg.query(Employee)
    for emp in result:
        print(emp.name)

Every method shown in this guide works the same way on the sync client - just drop the await.

Knowledge Graphs

Once connected, you work within a knowledge graph. Think of it as a namespace or database that holds your relations, rules, and indexes.

# Get or create a knowledge graph
kg = il.knowledge_graph("myapp")

# List all knowledge graphs on the server
graphs = await il.list_knowledge_graphs()

# Drop a knowledge graph and all its data
await il.drop_knowledge_graph("myapp")

Defining Schemas

Schemas are defined as Python classes that extend Relation. Each field becomes a typed column in the knowledge graph.

from inputlayer import Relation, Vector, Timestamp

class Employee(Relation):
    id: int
    name: str
    department: str
    salary: float
    active: bool

class Document(Relation):
    id: int
    title: str
    content: str
    embedding: Vector[384]
    created_at: Timestamp

Deploy your schema to the server with define(). This is idempotent - calling it multiple times is safe and won't duplicate anything.

kg = il.knowledge_graph("myapp")
await kg.define(Employee, Document)

You can inspect what's been deployed:

# List all relations
relations = await kg.relations()
for r in relations:
    print(f"{r.name}: {r.row_count} rows")

# Describe a specific relation's schema
desc = await kg.describe(Employee)
for col in desc.columns:
    print(f"  {col.name}: {col.type}")

Supported Types

Python TypeInputLayer TypeDescription
intint64-bit integer
floatfloat64-bit floating point
strstringUTF-8 string
boolboolBoolean
Vector[N]vector(N)N-dimensional float32 vector
VectorInt8[N]vector_int8(N)N-dimensional int8 quantized vector
TimestamptimestampUnix epoch milliseconds

Custom Relation Names

By default, the SDK converts your class name to snake_case (e.g., SensorReading becomes sensor_reading). You can override this:

class SensorReading(Relation):
    __relation_name__ = "readings"
    sensor_id: int
    value: float

Inserting Data

You can insert data in several ways, depending on what's most convenient.

# A single fact
await kg.insert(Employee(id=1, name="Alice", department="eng", salary=120000.0, active=True))

# A batch of facts
await kg.insert([
    Employee(id=2, name="Bob", department="hr", salary=90000.0, active=True),
    Employee(id=3, name="Charlie", department="eng", salary=110000.0, active=False),
])

# From a dictionary
await kg.insert(Employee, data={"id": 4, "name": "Diana", "department": "eng", "salary": 105000.0, "active": True})

# From a list of dictionaries
await kg.insert(Employee, data=[
    {"id": 5, "name": "Eve", "department": "sales", "salary": 95000.0, "active": True},
    {"id": 6, "name": "Frank", "department": "sales", "salary": 88000.0, "active": True},
])

Inserting from pandas DataFrames

If you installed with pip install inputlayer-client-dev[pandas], you can load data directly from DataFrames:



df = pd.DataFrame({
    "id": [7, 8, 9],
    "name": ["Grace", "Hank", "Ivy"],
    "department": ["eng", "hr", "eng"],
    "salary": [115000.0, 92000.0, 108000.0],
    "active": [True, True, False],
})
result = await kg.insert(Employee, data=df)
print(f"Inserted {result.count} rows")

Deleting Data

Delete specific facts or use a filter to remove matching rows:

# Delete a specific fact
await kg.delete(Employee(id=1, name="Alice", department="eng", salary=120000.0, active=True))

# Delete by condition
result = await kg.delete(Employee, where=lambda e: e.active == False)
print(f"Deleted {result.count} rows")

Querying

Queries are built with Python expressions. The SDK compiles your lambdas, column references, and aggregation calls into the right query language behind the scenes.

Basic Queries

# All rows from a relation
result = await kg.query(Employee)
for emp in result:
    print(f"{emp.name} - {emp.department}")

# With a filter
engineers = await kg.query(
    Employee,
    where=lambda e: (e.department == "eng") & (e.active == True),
)

Selecting Specific Columns

Instead of fetching full rows, you can select just the columns you need:

result = await kg.query(
    Employee.name, Employee.salary,
    join=[Employee],
    where=lambda e: e.department == "eng",
)
for row in result:
    print(f"{row.name}: ${row.salary}")

Joins

When your query spans multiple relations, use join and on to combine them:

class Department(Relation):
    name: str
    budget: float

result = await kg.query(
    Employee.name, Department.budget,
    join=[Employee, Department],
    on=lambda e, d: e.department == d.name,
)

Self-Joins

For queries that need to compare rows within the same relation, use refs() to create aliased references:

e1, e2 = Employee.refs(2)

result = await kg.query(
    e1.name, e2.name,
    join=[e1, e2],
    on=lambda a, b: (a.department == b.department) & (a.id != b.id),
)

Computed Columns

You can define computed values inline using keyword arguments:

result = await kg.query(
    Employee.name,
    join=[Employee],
    bonus=Employee.salary * 0.1,
)
for row in result:
    print(f"{row.name}: bonus = ${row.bonus}")

Ordering and Pagination

# Top 10 highest paid
result = await kg.query(
    Employee,
    order_by=Employee.salary.desc(),
    limit=10,
)

# Second page
result = await kg.query(
    Employee,
    order_by=Employee.name.asc(),
    limit=10,
    offset=10,
)

Aggregations

The SDK includes standard aggregation functions that you can use in queries:

from inputlayer import count, count_distinct, sum_, avg, min_, max_

# Group by department with stats
result = await kg.query(
    Employee.department,
    count(Employee.id),
    avg(Employee.salary),
    max_(Employee.salary),
    join=[Employee],
)
for row in result:
    print(f"{row.department}: {row.count} employees, avg ${row.avg}")

For more specialized aggregation, top_k lets you find the top entries per group:

from inputlayer import top_k

# Top 3 highest-paid employees per department
result = await kg.query(
    Employee.department, Employee.name, Employee.salary,
    join=[Employee],
    top_k(3, Employee.department, order_by=Employee.salary, desc=True),
)

Working with Results

Every query returns a ResultSet with several ways to access the data:

result = await kg.query(Employee)

# Iterate as typed objects
for emp in result:
    print(emp.name)

# Check result metadata
print(f"Rows: {len(result)}, Total: {result.total_count}")
print(f"Execution time: {result.execution_time_ms}ms")

# Get the first row (or None if empty)
first = result.first()

# Get a single scalar value
total = (await kg.query(count(Employee.id), join=[Employee])).scalar()

# Convert to different formats
dicts = result.to_dicts()     # list[dict]
tuples = result.to_tuples()   # list[tuple]
df = result.to_df()           # pandas DataFrame (requires pandas)

Query Plans

To understand how a query will execute without running it, use debug():

plan = await kg.debug(
    Employee,
    where=lambda e: e.department == "eng",
)
print(plan.iql)   # compiled IQL string
print(plan.plan)  # execution plan

Raw IQL

If you need to drop down to raw InputLayer Query Language for something the OLM doesn't cover:

result = await kg.execute("?employee(Id, Name, D, Salary, A), Salary > 100000")

The query body is whatever follows ?. Variable names must be capitalized (lowercase identifiers parse as constants). For agent-style or chain-style use, prefer the safe parameter binding helpers from the LangChain Integration guide rather than building IQL strings with f"...".

Derived Relations (Rules)

Derived relations are computed views that InputLayer keeps up to date automatically. When the underlying data changes, derived results are recomputed incrementally - you never need to manually refresh them.

Define them using Derived with the From(...).where(...).select(...) builder:

from typing import ClassVar
from inputlayer import Derived, From

class HighEarner(Derived):
    name: str
    salary: float
    rules: ClassVar[list] = [
        From(Employee)
            .where(lambda e: e.salary > 100000)
            .select(name=Employee.name, salary=Employee.salary),
    ]

Recursive Rules

One of InputLayer's most powerful features is native support for recursive logic. You define it naturally - a base case and a recursive case:

class Edge(Relation):
    src: int
    dst: int

class Reachable(Derived):
    src: int
    dst: int
    rules: ClassVar[list] = []

Reachable.rules = [
    # Base case: direct edges are reachable
    From(Edge).select(src=Edge.src, dst=Edge.dst),
    # Recursive case: if A reaches B and B reaches C, then A reaches C
    From(Reachable, Edge)
        .where(lambda r, e: r.dst == e.src)
        .select(src=Reachable.src, dst=Edge.dst),
]

Deploy and query rules just like regular relations:

# Deploy the rule (persistent - survives restarts)
await kg.define_rules(Reachable)

# Query it
result = await kg.query(Reachable, where=lambda r: r.src == 1)
for row in result:
    print(f"1 can reach {row.dst}")

Managing Rules

# List all deployed rules
rules = await kg.list_rules()
for r in rules:
    print(f"{r.name}: {r.clause_count} clause(s)")

# View a rule's compiled IQL definition
clauses = await kg.rule_definition("reachable")
for clause in clauses:
    print(clause)

# Drop a specific rule
await kg.drop_rule("high_earner")

# Clear a rule's materialized data (rule stays, data recomputes)
await kg.clear_rule("reachable")

InputLayer supports HNSW indexes for approximate nearest-neighbor search over vector columns.

Creating an Index

from inputlayer import HnswIndex

index = HnswIndex(
    name="doc_emb_idx",
    relation=Document,
    column="embedding",
    metric="cosine",       # cosine, euclidean, manhattan, dot_product
    m=32,                  # connections per node (higher = more accurate, more memory)
    ef_construction=200,   # build-time search width
    ef_search=50,          # query-time search width
)
await kg.create_index(index)

Searching

query_embedding = [0.1, 0.2, ...]  # your query vector

# Top-k nearest neighbors
result = await kg.vector_search(
    Document,
    query_vec=query_embedding,
    k=10,
    metric="cosine",
)

# Radius-based search (all vectors within distance)
result = await kg.vector_search(
    Document,
    query_vec=query_embedding,
    radius=0.3,
    metric="cosine",
)

# Combined with a filter
result = await kg.vector_search(
    Document,
    query_vec=query_embedding,
    k=10,
    metric="cosine",
    where=lambda d: d.created_at > Timestamp.from_datetime(cutoff_date),
)

Managing Indexes

# List all indexes
indexes = await kg.list_indexes()
for idx in indexes:
    print(f"{idx.name}: {idx.row_count} vectors, metric={idx.metric}")

# Get detailed stats
stats = await kg.index_stats("doc_emb_idx")
print(f"Layers: {stats.layers}, Memory: {stats.memory_bytes} bytes")

# Rebuild after large data changes
await kg.rebuild_index("doc_emb_idx")

# Drop an index
await kg.drop_index("doc_emb_idx")

Sessions

Sessions let you inject ephemeral facts and rules that exist only for the lifetime of your WebSocket connection. They're useful for user-specific context, A/B testing, or temporary views that shouldn't persist.

# Insert session-scoped facts (only visible to this connection)
await kg.session.insert([
    Employee(id=999, name="Temp", department="eng", salary=0.0, active=True),
])

# Define session-scoped rules
await kg.session.define_rules(MyTempView)

# Query as normal - session facts mix with persistent data
result = await kg.query(MyTempView)

# List session rules
session_rules = await kg.session.list_rules()

# Clean up (or just disconnect - session state is automatically cleared)
await kg.session.clear()

Notifications

Subscribe to real-time events as data changes in the knowledge graph. This is useful for building reactive pipelines, dashboards, or audit logs.

# Register a callback for a specific relation
@il.on("persistent_update", relation="sensor_reading")
def on_sensor_update(event):
    print(f"[{event.relation}] {event.operation}: {event.count} rows")
    print(f"  sequence: {event.seq}, timestamp: {event.timestamp_ms}")

# Listen for any knowledge graph change
@il.on("kg_change")
def on_kg_change(event):
    print(f"KG {event.knowledge_graph} changed")

# You can also iterate over events
async for event in il.notifications():
    print(f"Event: {event.type} seq={event.seq}")

Event types include persistent_update, rule_change, kg_change, and schema_change. You can filter by relation or knowledge graph.

Built-in Functions

The SDK exposes InputLayer's built-in function library for use in queries and rules. Import them from inputlayer.functions:

from inputlayer import functions as fn

Distance Functions

For computing vector similarity outside of index-based search:

result = await kg.query(
    Document.title,
    join=[Document],
    distance=fn.cosine(Document.embedding, query_vec),
)

Available: fn.cosine, fn.euclidean, fn.manhattan, fn.dot

Int8 variants: fn.cosine_int8, fn.euclidean_int8, fn.manhattan_int8, fn.dot_int8

Vector Operations

fn.normalize, fn.vec_dim, fn.vec_add, fn.vec_scale

Temporal Functions

For working with Timestamp columns:

from inputlayer import Timestamp
from inputlayer import functions as fn

# Rows from the last hour
result = await kg.query(
    SensorReading,
    where=lambda r: fn.within_last(r.timestamp, fn.time_now(), 3600000),
)

# Time-decayed scoring
result = await kg.query(
    Article.title,
    join=[Article],
    score=fn.time_decay(Article.published_at, fn.time_now(), 86400000),
)

Available: fn.time_now, fn.time_diff, fn.time_add, fn.time_sub, fn.time_decay, fn.time_decay_linear, fn.time_before, fn.time_after, fn.time_between, fn.within_last, fn.intervals_overlap, fn.interval_contains, fn.interval_duration, fn.point_in_interval

Math Functions

fn.abs_, fn.sqrt, fn.pow_, fn.log, fn.exp, fn.sin, fn.cos, fn.tan, fn.floor, fn.ceil, fn.sign, fn.min_val, fn.max_val

String Functions

fn.len_, fn.upper, fn.lower, fn.trim, fn.substr, fn.replace, fn.concat

Type Conversion

fn.to_int, fn.to_float

User and Access Management

The SDK provides methods for managing users, API keys, and per-knowledge-graph access control.

User Management

# Create a new user
await il.create_user("alice", "securepassword", role="editor")

# List users
users = await il.list_users()
for u in users:
    print(f"{u.username}: {u.role}")

# Change a user's role
await il.set_role("alice", "admin")

# Change a user's password
await il.set_password("alice", "newpassword")

# Remove a user
await il.drop_user("alice")

API Keys

# Create an API key
key = await il.create_api_key("my-service")
print(f"Store this key securely: {key}")

# List active keys
keys = await il.list_api_keys()
for k in keys:
    print(f"{k.label} (created: {k.created_at})")

# Revoke a key
await il.revoke_api_key("my-service")

Per-Knowledge-Graph Access Control

# Grant a user access to a specific knowledge graph
await kg.grant_access("alice", "editor")

# List access control entries
acl = await kg.list_acl()
for entry in acl:
    print(f"{entry.username}: {entry.role}")

# Revoke access
await kg.revoke_access("alice")

Error Handling

The SDK uses a hierarchy of typed exceptions so you can handle specific failure modes:

from inputlayer import (
    InputLayerError,           # base class for all errors
    ConnectionError,           # network/connection issues
    AuthenticationError,       # bad credentials
    SchemaConflictError,       # schema mismatch on define()
    ValidationError,           # invalid data
    QueryError,                # engine rejected a query (parse/type/unsafe rule)
    QueryTimeoutError,         # query took too long
    PermissionError,           # insufficient permissions
    KnowledgeGraphNotFoundError,
    KnowledgeGraphExistsError,
    RelationNotFoundError,
    RuleNotFoundError,
    IndexNotFoundError,
    InternalError,             # unexpected server error
)

try:
    await kg.define(Employee)
except AuthenticationError:
    print("Check your credentials")
except SchemaConflictError as e:
    print(f"Schema conflict: {e.conflicts}")
    print(f"  Existing: {e.existing_schema}")
    print(f"  Proposed: {e.proposed_schema}")
except QueryError as e:
    # The engine rejected the query - parse error, type error, unsafe
    # rule, etc. The full IQL string is attached for debugging.
    print(f"Query rejected: {e}")
    print(f"  query: {e.query}")
except InputLayerError as e:
    print(f"InputLayer error: {e}")

Migrations

For production deployments, the SDK includes a Django-style migration system that tracks schema changes, generates versioned migration files, and supports rollbacks. The il CLI is installed automatically with the package.

# Generate a migration from your current models
il makemigrations --models myapp.models

# Apply pending migrations to a server
il migrate --url ws://localhost:8080/ws --kg production

# Check what's applied
il showmigrations --url ws://localhost:8080/ws --kg production

This is a large topic with its own dedicated guide. See Migrations for the full walkthrough covering the CLI, operations, workflows, and CI/CD integration.

Known Limitations

A few rough edges to be aware of in the current pre-alpha SDK:

  • Aggregates are session rules under the hood. kg.query(count(Employee.id)) works by registering a temporary session rule and querying it. The rule is best-effort dropped after the call, so if your process dies between rule registration and cleanup the rule lingers until the WebSocket session ends. This is invisible to application code but worth knowing if you read server logs.

  • order_by and offset are applied client-side for plain queries. IQL only supports ordering inside aggregate heads (top_k, within_radius, etc.), so the SDK sorts and slices the result rows in Python after they come back. This is fine for typical result sizes but is not appropriate for streaming over millions of rows.

  • kg.relations() only sees relations after data exists. The server's .rel listing surfaces a relation only after at least one row has been inserted; defining the schema is not enough. If you need a registry of declared schemas that's independent of data, track them in your application code.

Next Steps