140 lines
5.1 KiB
Python
140 lines
5.1 KiB
Python
#!/usr/bin/env python3
|
|
"""Module PostgreSQL pour la persistance des conversations Agent IA."""
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
from contextlib import contextmanager
|
|
|
|
PG_HOST = "10.0.3.3"
|
|
PG_PORT = 5432
|
|
PG_DB = "n8n"
|
|
PG_USER = "fpNKWWEZnfaVjvWS"
|
|
PG_PASS = "MiGvyqnsKWUD7SzKgOoSnUAedUqX3US6"
|
|
|
|
|
|
def _connect():
|
|
return psycopg2.connect(host=PG_HOST, port=PG_PORT, dbname=PG_DB, user=PG_USER, password=PG_PASS)
|
|
|
|
|
|
@contextmanager
|
|
def get_db():
|
|
conn = _connect()
|
|
try:
|
|
yield conn
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_workflows():
|
|
with get_db() as conn:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute("SELECT id, slug, name, webhook_url, description, is_active, mode FROM agent_chat_workflows WHERE is_active = true ORDER BY id;")
|
|
return cur.fetchall()
|
|
|
|
|
|
def get_sessions(workflow_slug):
|
|
with get_db() as conn:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute("""
|
|
SELECT s.id, s.session_id, s.title, s.created_at, s.updated_at
|
|
FROM agent_chat_sessions s
|
|
JOIN agent_chat_workflows w ON s.workflow_id = w.id
|
|
WHERE w.slug = %s
|
|
ORDER BY s.updated_at DESC;
|
|
""", (workflow_slug,))
|
|
return cur.fetchall()
|
|
|
|
|
|
def get_messages(session_id, workflow_slug):
|
|
with get_db() as conn:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute("""
|
|
SELECT m.id, m.role, m.content, m.created_at
|
|
FROM agent_chat_messages m
|
|
JOIN agent_chat_workflows w ON m.workflow_id = w.id
|
|
WHERE m.session_id = %s AND w.slug = %s
|
|
ORDER BY m.created_at ASC;
|
|
""", (session_id, workflow_slug))
|
|
return cur.fetchall()
|
|
|
|
|
|
def create_session(session_id, workflow_id, title=None):
|
|
with get_db() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
INSERT INTO agent_chat_sessions (session_id, workflow_id, title)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (session_id, workflow_id) DO NOTHING;
|
|
""", (session_id, workflow_id, title))
|
|
|
|
|
|
def save_message(session_id, workflow_id, role, content):
|
|
with get_db() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
INSERT INTO agent_chat_messages (session_id, workflow_id, role, content)
|
|
VALUES (%s, %s, %s, %s)
|
|
RETURNING id;
|
|
""", (session_id, workflow_id, role, content))
|
|
msg_id = cur.fetchone()[0]
|
|
cur.execute("""
|
|
UPDATE agent_chat_sessions SET updated_at = NOW()
|
|
WHERE session_id = %s AND workflow_id = %s;
|
|
""", (session_id, workflow_id))
|
|
return msg_id
|
|
|
|
|
|
def delete_session(session_id, workflow_slug):
|
|
with get_db() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
DELETE FROM agent_chat_sessions
|
|
WHERE session_id = %s AND workflow_id = (SELECT id FROM agent_chat_workflows WHERE slug = %s);
|
|
""", (session_id, workflow_slug))
|
|
return cur.rowcount
|
|
|
|
|
|
def rename_session(session_id, workflow_slug, new_title):
|
|
with get_db() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
UPDATE agent_chat_sessions
|
|
SET title = %s, updated_at = NOW()
|
|
WHERE session_id = %s AND workflow_id = (SELECT id FROM agent_chat_workflows WHERE slug = %s);
|
|
""", (new_title, session_id, workflow_slug))
|
|
return cur.rowcount
|
|
|
|
|
|
def delete_messages_before(days, workflow_slug):
|
|
with get_db() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
DELETE FROM agent_chat_messages
|
|
WHERE workflow_id = (SELECT id FROM agent_chat_workflows WHERE slug = %s)
|
|
AND created_at < NOW() - INTERVAL '%s days';
|
|
""", (workflow_slug, days))
|
|
deleted = cur.rowcount
|
|
cur.execute("""
|
|
DELETE FROM agent_chat_sessions s
|
|
WHERE s.workflow_id = (SELECT id FROM agent_chat_workflows WHERE slug = %s)
|
|
AND NOT EXISTS (SELECT 1 FROM agent_chat_messages m WHERE m.session_id = s.session_id AND m.workflow_id = s.workflow_id);
|
|
""", (workflow_slug,))
|
|
return deleted
|
|
|
|
|
|
def search_messages(query, workflow_slug, limit=20):
|
|
with get_db() as conn:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute("""
|
|
SELECT DISTINCT m.session_id, m.content, m.role, m.created_at
|
|
FROM agent_chat_messages m
|
|
WHERE m.workflow_id = (SELECT id FROM agent_chat_workflows WHERE slug = %s)
|
|
AND m.content ILIKE %s
|
|
ORDER BY m.created_at DESC
|
|
LIMIT %s;
|
|
""", (workflow_slug, f'%{query}%', limit))
|
|
return cur.fetchall()
|