Python SDK
InputLayer ships a Python Object-Logic Mapper (OLM) that lets you define schemas, insert data, run queries, and manage rules using pure Python - no query syntax required. The OLM compiles Python expressions into InputLayer queries over WebSocket.
Installation
pip install inputlayer
# With pandas support
pip install inputlayer[pandas]
Requirements: Python 3.10+, a running InputLayer server.
Connecting
from inputlayer import InputLayer
async def main():
async with InputLayer("ws://localhost:8080/ws", username="admin", password="admin") as il:
print(f"Connected, server version: {il.server_version}")
asyncio.run(main())
Authentication supports username/password or API keys:
# API key auth
async with InputLayer("ws://localhost:8080/ws", api_key="your-key-here") as il:
...
Defining Schemas
Define typed relations as Python classes:
from inputlayer import Relation, Vector, Timestamp
class Employee(Relation):
id: int
name: str
department: str
salary: float
active: bool
class Document(Relation):
id: int
title: str
embedding: Vector[384]
created_at: Timestamp
Deploy to the server:
kg = il.knowledge_graph("myapp")
await kg.define(Employee, Document)
define() is idempotent - calling it again on an existing relation is safe.
Supported Types
| Python Type | InputLayer Type | Description |
|---|---|---|
int | int | 64-bit integer |
float | float | 64-bit floating point |
str | string | UTF-8 string |
bool | bool | Boolean |
Vector[N] | vector(N) | N-dimensional float vector |
VectorInt8[N] | vector_int8(N) | N-dimensional int8 vector |
Timestamp | timestamp | Unix epoch milliseconds |
Inserting Data
# Single fact
await kg.insert(Employee(id=1, name="Alice", department="eng", salary=120000.0, active=True))
# Batch insert
await kg.insert([
Employee(id=2, name="Bob", department="hr", salary=90000.0, active=True),
Employee(id=3, name="Charlie", department="eng", salary=110000.0, active=False),
])
# From a pandas DataFrame
df = pd.DataFrame({"id": [4, 5], "name": ["Dave", "Eve"], ...})
await kg.insert(Employee, data=df)
Querying
Basic Queries
# All employees
result = await kg.query(Employee)
for emp in result:
print(f"{emp.name}: ${emp.salary}")
# With filter
engineers = await kg.query(
Employee,
where=lambda e: (e.department == "eng") & (e.active == True),
)
Column Selection
result = await kg.query(
Employee.name, Employee.salary,
join=[Employee],
where=lambda e: e.department == "eng",
)
Joins
class Department(Relation):
name: str
budget: float
result = await kg.query(
Employee.name, Department.budget,
join=[Employee, Department],
on=lambda e, d: e.department == d.name,
)
Aggregations
from inputlayer import count, sum_, avg, min_, max_
result = await kg.query(
Employee.department,
count(Employee.id),
avg(Employee.salary),
join=[Employee],
)
Ordering and Pagination
result = await kg.query(
Employee,
order_by=Employee.salary.desc(),
limit=10,
offset=20,
)
Derived Relations (Rules)
Define computed views with recursive logic:
from typing import ClassVar
from inputlayer import Derived, From
class Edge(Relation):
src: int
dst: int
class Reachable(Derived):
src: int
dst: int
rules: ClassVar[list] = []
# Base case + recursive case
Reachable.rules = [
From(Edge).select(src=Edge.src, dst=Edge.dst),
From(Reachable, Edge)
.where(lambda r, e: r.dst == e.src)
.select(src=Reachable.src, dst=Edge.dst),
]
# Deploy and query
await kg.define_rules(Reachable)
result = await kg.query(Reachable, where=lambda r: r.src == 1)
Vector Search
from inputlayer import HnswIndex
# Create index
await kg.create_index(HnswIndex(
name="doc_emb_idx",
relation=Document,
column="embedding",
metric="cosine",
m=32,
ef_construction=200,
))
# Search
result = await kg.vector_search(
Document,
query_vec=[0.1, 0.2, ...],
k=10,
metric="cosine",
)
Session Rules
Ephemeral rules scoped to the current WebSocket connection:
await kg.session.define_rules(MyTempView)
result = await kg.query(MyTempView, join=[MyTempView])
await kg.session.clear() # or just disconnect
Notifications
Subscribe to real-time data change events:
.on("persistent_update", relation="sensor_reading")
def on_update(event):
print(f"[{event.relation}] {event.count} rows changed")
Migrations
The migration system provides Django-style schema versioning. See Migrations for full documentation.
Error Handling
from inputlayer import (
InputLayerError,
AuthenticationError,
KnowledgeGraphNotFoundError,
SchemaConflictError,
)
try:
await kg.define(Employee)
except AuthenticationError:
print("Bad credentials")
except SchemaConflictError as e:
print(f"Schema mismatch: {e.conflicts}")
except InputLayerError as e:
print(f"Server error: {e}")
Sync Client
For scripts and non-async contexts:
from inputlayer import InputLayerSync
with InputLayerSync("ws://localhost:8080/ws", username="admin", password="admin") as il:
kg = il.knowledge_graph("demo")
kg.define(Employee)
kg.insert(Employee(id=1, name="Alice", department="eng", salary=120000.0, active=True))
result = kg.query(Employee)