#!/usr/bin/env python3 """ H3R7Tech — LeadHunter CRM (SQLite) ===================================== Couche de persistance SQLite pour les leads LeadHunter. Schéma validé CTO (HRT-66) : CREATE TABLE leads ( id INTEGER PRIMARY KEY AUTOINCREMENT, source TEXT NOT NULL, -- 'google_places' ou 'osm' name TEXT NOT NULL, address TEXT, phone TEXT, rating REAL, reviews_count INTEGER, website TEXT, score INTEGER, rgpd_ok BOOLEAN DEFAULT 1, scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, status TEXT DEFAULT 'new' -- new, contacted, closed, rejected ); Auteur: H3R7Tech Backend Engineer Issue: HRT-66 """ import sqlite3 import logging import csv import io from contextlib import contextmanager from datetime import datetime from logging.handlers import RotatingFileHandler from typing import Optional # ─── Logging ──────────────────────────────────────────────────────────────── logger = logging.getLogger("leadhunter.crm") _handler = RotatingFileHandler( "/home/h3r7/leadhunter.log", maxBytes=5 * 1024 * 1024, backupCount=3, ) _handler.setFormatter( logging.Formatter("%(asctime)s %(levelname)-8s %(name)s — %(message)s") ) logger.setLevel(logging.INFO) if not logger.handlers: logger.addHandler(_handler) logger.addHandler(logging.StreamHandler()) # ─── Chemin DB ─────────────────────────────────────────────────────────────── DB_PATH = "/home/h3r7/leadhunter.db" # Statuts valides pour un lead (7 etapes Kanban) VALID_STATUSES = { "nouveau", # NOUVEAU "contacte", # CONTACTÉ "interesse", # INTÉRESSÉ "demo_planifiee", # DÉMO PLANIFIÉE "proposition_envoyee", # PROPOSITION ENVOYÉE "negotiation", # NÉGOCIATION "signe_ou_refuse", # SIGNÉ / REFUSÉ } # Mapping des anciens statuts vers les nouveaux (pour migration) LEGACY_STATUS_MAP = { "new": "nouveau", "contacted": "contacte", "closed": "signe_ou_refuse", "rejected": "signe_ou_refuse", } # ─── Initialisation ────────────────────────────────────────────────────────── def init_db(db_path: str = DB_PATH) -> None: """ Crée la base SQLite et la table leads si elle n'existe pas. Idempotent — peut être appelé au démarrage de l'API. """ with sqlite3.connect(db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS leads ( id INTEGER PRIMARY KEY AUTOINCREMENT, source TEXT NOT NULL, name TEXT NOT NULL, address TEXT, phone TEXT, rating REAL, reviews_count INTEGER, website TEXT, score INTEGER, rgpd_ok BOOLEAN DEFAULT 1, scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, status TEXT DEFAULT 'new' ) """) conn.commit() logger.info(f"DB initialisée : {db_path}") # ─── Context manager ───────────────────────────────────────────────────────── @contextmanager def _get_conn(db_path: str = DB_PATH): """Fournit une connexion SQLite avec row_factory.""" conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row try: yield conn conn.commit() except Exception as e: conn.rollback() logger.warning(f"DB transaction rollback : {e}") raise finally: conn.close() # ─── CRUD ──────────────────────────────────────────────────────────────────── def insert_lead(lead: dict, db_path: str = DB_PATH) -> Optional[int]: """ Insère un lead normalisé dans la DB. Args: lead: dict avec les champs normalisés (source, name, address, ...) db_path: chemin vers la DB SQLite. Returns: L'id SQLite du lead inséré, ou None en cas d'erreur. """ try: with _get_conn(db_path) as conn: cursor = conn.execute( """ INSERT INTO leads (source, name, address, phone, rating, reviews_count, website, score, rgpd_ok, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( lead.get("source", "unknown"), lead.get("name", ""), lead.get("address", ""), lead.get("phone", ""), lead.get("rating"), lead.get("reviews_count"), lead.get("website", ""), lead.get("score"), 1 if lead.get("rgpd_ok", True) else 0, lead.get("status", "new"), ), ) lead_id = cursor.lastrowid logger.info(f"Lead inséré id={lead_id} : {lead.get('name')}") return lead_id except Exception as e: logger.warning(f"insert_lead error : {e}") return None def insert_leads(leads: list[dict], db_path: str = DB_PATH) -> list[int]: """ Insère une liste de leads en batch. Returns: Liste des ids insérés. """ ids = [] for lead in leads: lead_id = insert_lead(lead, db_path) if lead_id is not None: ids.append(lead_id) logger.info(f"insert_leads : {len(ids)}/{len(leads)} insérés.") return ids def get_leads( status: Optional[str] = None, limit: int = 100, offset: int = 0, db_path: str = DB_PATH, ) -> list[dict]: """ Récupère les leads avec filtre optionnel sur le statut. Args: status: filtre sur le champ 'status' (new, contacted, closed, rejected). limit: pagination — nombre de résultats max. offset: pagination — décalage. Returns: Liste de dicts (tous les champs de la table leads). """ try: with _get_conn(db_path) as conn: if status: rows = conn.execute( "SELECT * FROM leads WHERE status = ? ORDER BY score DESC, scraped_at DESC LIMIT ? OFFSET ?", (status, limit, offset), ).fetchall() else: rows = conn.execute( "SELECT * FROM leads ORDER BY score DESC, scraped_at DESC LIMIT ? OFFSET ?", (limit, offset), ).fetchall() return [dict(r) for r in rows] except Exception as e: logger.warning(f"get_leads error : {e}") return [] def get_lead_by_id(lead_id: int, db_path: str = DB_PATH) -> Optional[dict]: """Récupère un lead par son id.""" try: with _get_conn(db_path) as conn: row = conn.execute( "SELECT * FROM leads WHERE id = ?", (lead_id,) ).fetchone() return dict(row) if row else None except Exception as e: logger.warning(f"get_lead_by_id error : {e}") return None def update_lead(lead_id: int, data: dict, db_path: str = DB_PATH) -> bool: """ Met à jour un lead avec les champs fournis. Args: lead_id: id du lead. data: dict avec les champs a mettre a jour (name, address, phone, etc.) Returns: True si mise a jour reussie, False sinon. """ allowed_fields = { "name", "address", "phone", "rating", "reviews_count", "website", "score", "rgpd_ok", "status", } fields_to_update = {k: v for k, v in data.items() if k in allowed_fields} if not fields_to_update: logger.warning( f"update_lead : aucun champ valide fourni pour lead_id={lead_id}" ) return False if ( "status" in fields_to_update and fields_to_update["status"] not in VALID_STATUSES ): logger.warning(f"update_lead : statut invalide '{fields_to_update['status']}'") return False try: with _get_conn(db_path) as conn: set_clause = ", ".join([f"{k} = ?" for k in fields_to_update]) values = list(fields_to_update.values()) + [lead_id] conn.execute(f"UPDATE leads SET {set_clause} WHERE id = ?", values) logger.info( f"Lead id={lead_id} mis a jour : {list(fields_to_update.keys())}" ) return True except Exception as e: logger.warning(f"update_lead error : {e}") return False def delete_lead(lead_id: int, db_path: str = DB_PATH) -> bool: """ Supprime un lead physiquement. Args: lead_id: id du lead a supprimer. Returns: True si suppression reussie, False sinon. """ try: with _get_conn(db_path) as conn: conn.execute("DELETE FROM leads WHERE id = ?", (lead_id,)) logger.info(f"Lead id={lead_id} supprime") return True except Exception as e: logger.warning(f"delete_lead error : {e}") return False def update_lead_status(lead_id: int, status: str, db_path: str = DB_PATH) -> bool: """ Met à jour le statut d'un lead. Args: lead_id: id du lead. status: nouveau statut ('new', 'contacted', 'closed', 'rejected'). Returns: True si mise à jour réussie, False sinon. """ if status not in VALID_STATUSES: logger.warning(f"update_lead_status : statut invalide '{status}'") return False try: with _get_conn(db_path) as conn: conn.execute( "UPDATE leads SET status = ? WHERE id = ?", (status, lead_id), ) logger.info(f"Lead id={lead_id} statut → {status}") return True except Exception as e: logger.warning(f"update_lead_status error : {e}") return False def get_stats(db_path: str = DB_PATH) -> dict: """ Retourne les statistiques globales du CRM. Returns: Dict avec total, by_status, by_source, avg_score, top_leads_count """ try: with _get_conn(db_path) as conn: total = conn.execute("SELECT COUNT(*) FROM leads").fetchone()[0] by_status_rows = conn.execute( "SELECT status, COUNT(*) as cnt FROM leads GROUP BY status" ).fetchall() by_status = {r["status"]: r["cnt"] for r in by_status_rows} by_source_rows = conn.execute( "SELECT source, COUNT(*) as cnt FROM leads GROUP BY source" ).fetchall() by_source = {r["source"]: r["cnt"] for r in by_source_rows} avg_score_row = conn.execute( "SELECT AVG(score) FROM leads WHERE score IS NOT NULL" ).fetchone() avg_score = round(avg_score_row[0] or 0, 2) # Leads "chauds" = score ≥ 5 top_count = conn.execute( "SELECT COUNT(*) FROM leads WHERE score >= 5" ).fetchone()[0] return { "total": total, "by_status": by_status, "by_source": by_source, "avg_score": avg_score, "top_leads_count": top_count, "generated_at": datetime.utcnow().isoformat() + "Z", } except Exception as e: logger.warning(f"get_stats error : {e}") return {} def export_csv( status: Optional[str] = None, db_path: str = DB_PATH, ) -> str: """ Exporte les leads en CSV (string). Args: status: filtre optionnel sur le statut. Returns: Contenu CSV en string UTF-8. """ leads = get_leads(status=status, limit=10000, db_path=db_path) output = io.StringIO() fieldnames = [ "id", "source", "name", "address", "phone", "rating", "reviews_count", "website", "score", "rgpd_ok", "scraped_at", "status", ] writer = csv.DictWriter(output, fieldnames=fieldnames, extrasaction="ignore") writer.writeheader() writer.writerows(leads) logger.info(f"export_csv : {len(leads)} leads exportés.") return output.getvalue() # ─── CLI (debug) ───────────────────────────────────────────────────────────── if __name__ == "__main__": init_db() # Test insertion test_lead = { "source": "google_places", "name": "Restaurant Test", "address": "10 rue de la Paix, 59000 Lille", "phone": "+33 3 20 00 00 01", "rating": 4.5, "reviews_count": 120, "website": "", "score": 8, "rgpd_ok": True, "status": "new", } lead_id = insert_lead(test_lead) print(f"Lead inséré : id={lead_id}") leads = get_leads() print(f"Leads en DB : {len(leads)}") stats = get_stats() print(f"Stats : {stats}")