Python SDK

InputLayer ships a Python Object-Logic Mapper (OLM) that lets you define schemas, insert data, run queries, and manage rules using pure Python - no query syntax required. The OLM compiles Python expressions into InputLayer queries over WebSocket.

Installation

pip install inputlayer

# With pandas support
pip install inputlayer[pandas]

Requirements: Python 3.10+, a running InputLayer server.

Connecting


from inputlayer import InputLayer

async def main():
    async with InputLayer("ws://localhost:8080/ws", username="admin", password="admin") as il:
        print(f"Connected, server version: {il.server_version}")

asyncio.run(main())

Authentication supports username/password or API keys:

# API key auth
async with InputLayer("ws://localhost:8080/ws", api_key="your-key-here") as il:
    ...

Defining Schemas

Define typed relations as Python classes:

from inputlayer import Relation, Vector, Timestamp

class Employee(Relation):
    id: int
    name: str
    department: str
    salary: float
    active: bool

class Document(Relation):
    id: int
    title: str
    embedding: Vector[384]
    created_at: Timestamp

Deploy to the server:

kg = il.knowledge_graph("myapp")
await kg.define(Employee, Document)

define() is idempotent - calling it again on an existing relation is safe.

Supported Types

Python TypeInputLayer TypeDescription
intint64-bit integer
floatfloat64-bit floating point
strstringUTF-8 string
boolboolBoolean
Vector[N]vector(N)N-dimensional float vector
VectorInt8[N]vector_int8(N)N-dimensional int8 vector
TimestamptimestampUnix epoch milliseconds

Inserting Data

# Single fact
await kg.insert(Employee(id=1, name="Alice", department="eng", salary=120000.0, active=True))

# Batch insert
await kg.insert([
    Employee(id=2, name="Bob", department="hr", salary=90000.0, active=True),
    Employee(id=3, name="Charlie", department="eng", salary=110000.0, active=False),
])

# From a pandas DataFrame

df = pd.DataFrame({"id": [4, 5], "name": ["Dave", "Eve"], ...})
await kg.insert(Employee, data=df)

Querying

Basic Queries

# All employees
result = await kg.query(Employee)
for emp in result:
    print(f"{emp.name}: ${emp.salary}")

# With filter
engineers = await kg.query(
    Employee,
    where=lambda e: (e.department == "eng") & (e.active == True),
)

Column Selection

result = await kg.query(
    Employee.name, Employee.salary,
    join=[Employee],
    where=lambda e: e.department == "eng",
)

Joins

class Department(Relation):
    name: str
    budget: float

result = await kg.query(
    Employee.name, Department.budget,
    join=[Employee, Department],
    on=lambda e, d: e.department == d.name,
)

Aggregations

from inputlayer import count, sum_, avg, min_, max_

result = await kg.query(
    Employee.department,
    count(Employee.id),
    avg(Employee.salary),
    join=[Employee],
)

Ordering and Pagination

result = await kg.query(
    Employee,
    order_by=Employee.salary.desc(),
    limit=10,
    offset=20,
)

Derived Relations (Rules)

Define computed views with recursive logic:

from typing import ClassVar
from inputlayer import Derived, From

class Edge(Relation):
    src: int
    dst: int

class Reachable(Derived):
    src: int
    dst: int
    rules: ClassVar[list] = []

# Base case + recursive case
Reachable.rules = [
    From(Edge).select(src=Edge.src, dst=Edge.dst),
    From(Reachable, Edge)
        .where(lambda r, e: r.dst == e.src)
        .select(src=Reachable.src, dst=Edge.dst),
]

# Deploy and query
await kg.define_rules(Reachable)
result = await kg.query(Reachable, where=lambda r: r.src == 1)
from inputlayer import HnswIndex

# Create index
await kg.create_index(HnswIndex(
    name="doc_emb_idx",
    relation=Document,
    column="embedding",
    metric="cosine",
    m=32,
    ef_construction=200,
))

# Search
result = await kg.vector_search(
    Document,
    query_vec=[0.1, 0.2, ...],
    k=10,
    metric="cosine",
)

Session Rules

Ephemeral rules scoped to the current WebSocket connection:

await kg.session.define_rules(MyTempView)
result = await kg.query(MyTempView, join=[MyTempView])
await kg.session.clear()  # or just disconnect

Notifications

Subscribe to real-time data change events:

@il.on("persistent_update", relation="sensor_reading")
def on_update(event):
    print(f"[{event.relation}] {event.count} rows changed")

Migrations

The migration system provides Django-style schema versioning. See Migrations for full documentation.

Error Handling

from inputlayer import (
    InputLayerError,
    AuthenticationError,
    KnowledgeGraphNotFoundError,
    SchemaConflictError,
)

try:
    await kg.define(Employee)
except AuthenticationError:
    print("Bad credentials")
except SchemaConflictError as e:
    print(f"Schema mismatch: {e.conflicts}")
except InputLayerError as e:
    print(f"Server error: {e}")

Sync Client

For scripts and non-async contexts:

from inputlayer import InputLayerSync

with InputLayerSync("ws://localhost:8080/ws", username="admin", password="admin") as il:
    kg = il.knowledge_graph("demo")
    kg.define(Employee)
    kg.insert(Employee(id=1, name="Alice", department="eng", salary=120000.0, active=True))
    result = kg.query(Employee)