JIMS Core
JIMS (Just an Integrated Multiagent System) is a framework for building conversational AI with persistent thread management. Vedana is built on top of JIMS, but JIMS can also be used without Vedana for other scenarios.
Key concepts
- Thread — a conversation between a user (or several users) and the agent system.
- Event — anything that happens in a thread: a message, an action, a state change.
- Pipeline — an async function that processes a
ThreadContextand produces events. - ThreadContext — the object the ThreadController hands to the pipeline. Holds history, events, and the LLM provider.
- ThreadController — manages the thread lifecycle, event storage, and pipeline execution.
Storage
JIMS stores threads and events in Postgres (with SQLite compatibility for tests). The schema:
erDiagram
THREADS ||--o{ THREAD_EVENTS : has
THREADS {
UUID thread_id PK
string contact_id "indexed"
datetime created_at
JSONB thread_config
}
THREAD_EVENTS {
UUID thread_id PK,FK
UUID event_id PK
datetime created_at
string event_type "comm.user_message, rag.query_processed, ..."
string event_domain "comm, rag, state, ..."
string event_name
JSON event_data
}
class ThreadDB(Base):
__tablename__ = "threads"
thread_id: UUID # primary key
contact_id: str # indexed
created_at: datetime
thread_config: JSONB # arbitrary thread configuration
class ThreadEventDB(Base):
__tablename__ = "thread_events"
thread_id: UUID # primary
event_id: UUID # primary
created_at: datetime # server_default=now()
event_type: str # full type, e.g. "comm.user_message.user1"
event_domain: str # "comm" (nullable; currently NOT populated by ThreadController)
event_name: str # "user_message" (nullable; currently NOT populated by ThreadController)
event_data: JSON # payload (Postgres JSON, see note below)
Notes:
event_datais stored as PostgresJSON(notJSONB). Seejims_core/db.py:46.event_domainandevent_namecolumns exist in the schema but are not populated byThreadController.store_event_dict/store_user_message/store_assistant_message— onlyevent_typeis written. They remainNULL.created_atis set by the server.- One thread = one chain of events ordered by
created_at.
ThreadController.make_context() reads every thread_event for the thread, filters comm.* into history, and the rest goes into events.
Pipeline execution flow
sequenceDiagram
participant U as User / Interface
participant TC as ThreadController
participant DB as Postgres
participant CTX as ThreadContext
participant P as Pipeline
participant LLM as LLMProvider
U->>TC: store_event_dict("comm.user_message", {role:"user", content})
TC->>DB: INSERT thread_events (comm.user_message)
U->>TC: run_pipeline_with_context(pipeline)
TC->>DB: SELECT thread_events
TC->>CTX: build ThreadContext (history, events, llm)
TC->>P: pipeline(ctx)
P->>LLM: chat_completion / structured / tools
LLM-->>P: completion
P->>CTX: send_message / send_event
P-->>TC: return
TC->>DB: INSERT outgoing events
TC-->>U: outgoing_events
ThreadController
jims_core.thread.thread_controller.ThreadController is the entry point to threads.
Creating / finding a thread
ctl = await ThreadController.new_thread(
sessionmaker=app.sessionmaker,
contact_id="user-42",
thread_id=uuid7(),
thread_config={"interface": "api"},
)
# or
ctl = await ThreadController.from_thread_id(sessionmaker, thread_id)
ctl = await ThreadController.latest_thread_from_contact_id(sessionmaker, from_contact_id=contact_id)
When a new thread is created, a jims.lifecycle.thread_created event is automatically recorded.
Recording messages and events
await ctl.store_user_message(event_id=uuid7(), content="Hello!")
await ctl.store_assistant_message(event_id=uuid7(), content="Hi!")
await ctl.store_event_dict(event_id=uuid7(), event_type="custom.event", event_data={...})
User and assistant messages are stored as events with types comm.user_message / comm.assistant_message, with payloads of the form {"role": "user|assistant", "content": "..."}.
Running a pipeline
ctx = await ctl.make_context() # reads every thread event from the DB
outgoing = await ctl.run_pipeline_with_context(my_pipeline, ctx)
run_pipeline_with_context wraps execution in an OpenTelemetry span, measures duration, increments jims_pipeline_runs_total{status,pipeline} and jims_pipeline_run_duration_seconds. After completion, every outgoing_event produced by the pipeline is written to the DB.
ThreadContext
jims_core.thread.thread_context.ThreadContext is what the pipeline sees.
@dataclass
class ThreadContext:
thread_id: UUID
history: list[CommunicationEvent] # filtered by comm.*
events: list[EventEnvelope] # every event
llm: LLMProvider
outgoing_events: list[EventEnvelope] # what the pipeline plans to record
thread_config: dict
status_updater: StatusUpdater | None
Reading
ctx.get_last_user_message()— the last user message.ctx.get_last_user_action()— the last user action in thecomm.*domain (including buttons, commands).ctx.context(conversation_length=20)— enriched history: the last Ncomm.*messages, interleaved (bycreated_at) with anycontext.*events that fall into the same window (for example, reasoning from data model filtering). Note: the current implementation collects allcontext.*events from the window in reverse order; the linkage to a specificcomm.*message is by time-ordering, not by an explicit relation. Vedana uses this method to feed the LLM with context.ctx.get_state(state_name, state_type)— the latest state of typestate_type.
Writing
ctx.send_message(text)— send an assistant message into the thread.ctx.send_event(event_type, data)— an arbitrary event (e.g.rag.query_processed).ctx.set_state(state_name, state)— record astate.set.{state_name}event.await ctx.update_agent_status(status)— update the status (passed to theStatusUpdaterif one is set by the interface).
StatusUpdater is an interface hook: in the Telegram bot, for example, it shows “is typing…”/“analysing query…” in the chat window; in the Reflex chat it updates the UI.
Pipeline protocol
from jims_core.schema import Pipeline
class Pipeline(Protocol):
async def __call__(self, ctx: ThreadContext) -> Any: ...
Any class or function with this interface is a valid pipeline. In Vedana the pipelines are instances of RagPipeline and StartPipeline (see vedana_core.rag_pipeline).
JimsApp
jims_core.app.JimsApp is the application-level DI container.
@dataclass
class JimsApp:
sessionmaker: async_sessionmaker
pipeline: Pipeline # main pipeline
conversation_start_pipeline: Pipeline | None = None # runs on /start
JimsApp.new_thread(contact_id, thread_id, thread_config) is a shortcut for ThreadController.new_thread.
In Vedana the instance is assembled in vedana_core.app.make_jims_app and bound to a global variable app (the entry point for CLI scripts).
LLMProvider
jims_core.llms.llm_provider.LLMProvider is a wrapper over LiteLLM that:
- manages the model name and API key (mutable at runtime via
set_model); - collects usage metrics (Prometheus + a local
usage: dict[str, ModelUsage]); - supports:
chat_completion_plain(messages)— a regular chat completion;chat_completion_with_tools(messages, tools)— function calling;chat_completion_structured(messages, response_format)— structured output (used in data model filtering);create_embedding(text),create_embeddings(texts)— embeddings with automatic batching byEMBEDDINGS_MAX_BATCH_SIZEandEMBEDDINGS_MAX_TOKENS_PER_BATCH;
- retries calls via
backoff(3 attempts, exponential back-off).
Metrics:
llm_calls_total{model}llm_usage_prompt_tokens_total{model}llm_usage_completion_tokens_total{model}
cached_tokens (if the provider reports them) and requests_cost (via LiteLLM’s _hidden_params.response_cost) are also captured on ModelUsage.
Metrics and tracing
ThreadController.run_pipeline_with_context opens an OTel span jims.run_pipeline_with_context with the attributes jims.thread.id and jims.pipeline. Below it, spans are created for Memgraph queries (memgraph.execute_ro_cypher_query, memgraph.run_cypher, memgraph.text_search, memgraph.vector_search) and for pgvector (pgvector.vector_search).
Details — Observability.