Skip to content

CognitiveMemoryPipeline API

cmm.pipeline.conversation.CognitiveMemoryPipeline

The main entry point for the cognitive memory system. Accepts conversation turns, encodes them into gist summaries, stores them in the FAISS-backed memory store, and retrieves relevant memories with decay, spreading activation, priming, and working memory.

Constructor

from cmm.pipeline.conversation import CognitiveMemoryPipeline

pipeline = CognitiveMemoryPipeline(
    # Core
    embedding_model=None,          # EmbeddingModel instance (default: all-mpnet-base-v2)
    gist_encoder=None,             # GistEncoder instance (default: PassthroughGistEncoder)
    store=None,                    # MemoryStore instance (default: new store)
    retriever_top_k=5,             # Number of results to return
    retriever_threshold=0.3,       # Minimum score threshold

    # Decay
    decay_grace_period=14*86400,   # Grace period in seconds (default: 2 weeks)
    decay_rate=5e-7,               # Lambda for exponential decay
    decay_freq_weight=1.0,         # Weight for frequency-based decay adjustment
    decay_lock_frequency=0.5,      # Accesses/week to lock a memory permanently

    # Working Memory
    working_memory_capacity=10,    # Max items in working memory
    working_memory_ttl=5,          # Turns before items expire

    # Spreading Activation
    spread_depth=1,                # Max hops
    spread_factor=0.5,             # Score decay per hop
    spread_top_k=3,                # Neighbors per hop
    entity_linking=True,           # Enable spaCy NER entity linking
    entity_boost=0.8,              # Boost for entity-linked memories

    # Priming
    priming_boost=0.3,             # Boost strength at activation
    priming_decay=0.5,             # Decay rate per turn
    priming_max_turns=10,          # Max turns to keep priming active

    # Consolidation
    consolidation_summarizer=None, # ConsolidationSummarizer instance
    consolidation_threshold=50,    # Turns before auto-consolidation
    cluster_similarity=0.6,        # Similarity threshold for clustering
    min_cluster_size=3,            # Minimum cluster size

    # Importance
    importance_scorer=None,        # ImportanceScorer instance (default: RuleBasedImportanceScorer)

    # Multi-agent
    agent_id=None,                 # Tag memories with this agent ID
    session_id=None,               # Tag memories with this session ID
    default_scope="team",          # Default scope: "team", "shared", or "private"
)

Methods

ingest

results = pipeline.ingest(role: str | Role, content: str) -> list[RetrievalResult]

Convenience method: create a conversation turn and process it. Stores the turn in memory and returns recalled memories relevant to this turn.

The role parameter accepts "user", "assistant", or a Role enum value.

results = pipeline.ingest("user", "I'm allergic to peanuts. I carry an EpiPen.")
results = pipeline.ingest("assistant", "Noted, severe peanut allergy.")

recall

results = pipeline.recall(query: str, top_k: int | None = None) -> list[RetrievalResult]

Query memory without storing anything. Returns the top-k relevant memories.

results = pipeline.recall("ordering food for the team")

recall_with_metamemory

meta = pipeline.recall_with_metamemory(query: str, top_k: int | None = None) -> MetamemoryResult

Query memory with metamemory signals. Returns a MetamemoryResult with confidence level, results, and partial matches ("tip of the tongue" hints).

meta = pipeline.recall_with_metamemory("something about a striped animal")
print(meta.confidence)      # ConfidenceLevel.MODERATE
print(meta.partial_matches) # Low-confidence hints

format_recalled

text = pipeline.format_recalled(results: list[RetrievalResult]) -> str

Format retrieval results as text for LLM context injection. Returns a clearly marked block of recalled memories.

results = pipeline.recall("ordering food")
context = pipeline.format_recalled(results)
# "[Recalled from memory...] I'm allergic to peanuts. [End recalled memories]"

consolidate

new_memories = pipeline.consolidate() -> list[Memory]

Manually trigger memory consolidation. Clusters similar episodic memories and creates semantic memories. Returns the newly created semantic memories.

end_session

summary = pipeline.end_session() -> Memory | None

End the current session and create a session summary. Returns the session summary memory, or None if not enough turns. Resets the session state.

maintain

metrics = pipeline.maintain() -> HealthMetrics

Run memory maintenance: deduplicate and prune. Returns health metrics.

health

metrics = pipeline.health() -> HealthMetrics

Get memory store health metrics without running maintenance.

save

pipeline.save(directory: str) -> None

Save the entire pipeline state to disk. Saves the FAISS index, all memory metadata, and the entity index. Call this to persist memory across restarts.

pipeline.save("./my_memory")

load (classmethod)

pipeline = CognitiveMemoryPipeline.load(directory: str, **kwargs) -> CognitiveMemoryPipeline

Load a pipeline from disk. Restores the FAISS index, memory metadata, and entity index. Additional kwargs are passed to the constructor for configuring decay, spreading, priming, etc.

pipeline = CognitiveMemoryPipeline.load("./my_memory")

Properties

Property Type Description
memory_count int Number of memories in the store
working_memory WorkingMemory The working memory buffer
priming PrimingState The priming state
turn_count int Total turns processed