Files
turf_saas/leadhunter_crm.py
DevOps Engineer f9a45e6deb feat(HRT-66): LeadHunter S1 — core scraping, scoring, CRM SQLite et API Flask
- leadhunter_scraper.py : Google Places Nearby Search + Place Details
  avec compteur quota daily_quota.json (limite 900/jour),
  sleep(0.5) entre requêtes, fallback Overpass OSM boundary MEL,
  filtre website absent, déduplcation, rgpd_ok=True

- leadhunter_scorer.py : moteur de scoring 0-8 pts
  critère n°1 = site web absent (+3), avis ≥50 (+2),
  note ≥4.0 (+2), téléphone (+1), note <3.0 (-1)

- leadhunter_crm.py : CRM SQLite schéma validé CTO
  (id, source, name, address, phone, rating, reviews_count,
   website, score, rgpd_ok, scraped_at, status)
  CRUD : insert_lead, get_leads, update_lead_status, get_stats, export_csv

- leadhunter_api.py : Flask service port 8769
  GET /api/leads, POST /api/leads/scrape, GET /api/leads/stats,
  GET /api/leads/export, PATCH /api/leads/<id>/status, GET /health
  assert GOOGLE_PLACES_API_KEY au démarrage
  scraping asynchrone (thread) avec status endpoint

- infra/turf-saas-leadhunter.service : service systemd
  EnvironmentFile=/home/h3r7/.env pour GOOGLE_PLACES_API_KEY

Tests : py_compile OK, scorer testé, CRM SQLite testé

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-27 16:33:30 +02:00

350 lines
11 KiB
Python

#!/usr/bin/env python3
"""
H3R7Tech — LeadHunter CRM (SQLite)
=====================================
Couche de persistance SQLite pour les leads LeadHunter.
Schéma validé CTO (HRT-66) :
CREATE TABLE leads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL, -- 'google_places' ou 'osm'
name TEXT NOT NULL,
address TEXT,
phone TEXT,
rating REAL,
reviews_count INTEGER,
website TEXT,
score INTEGER,
rgpd_ok BOOLEAN DEFAULT 1,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'new' -- new, contacted, closed, rejected
);
Auteur: H3R7Tech Backend Engineer
Issue: HRT-66
"""
import sqlite3
import logging
import csv
import io
from contextlib import contextmanager
from datetime import datetime
from logging.handlers import RotatingFileHandler
from typing import Optional
# ─── Logging ────────────────────────────────────────────────────────────────
logger = logging.getLogger("leadhunter.crm")
_handler = RotatingFileHandler(
"/home/h3r7/leadhunter.log",
maxBytes=5 * 1024 * 1024,
backupCount=3,
)
_handler.setFormatter(
logging.Formatter("%(asctime)s %(levelname)-8s %(name)s%(message)s")
)
logger.setLevel(logging.INFO)
if not logger.handlers:
logger.addHandler(_handler)
logger.addHandler(logging.StreamHandler())
# ─── Chemin DB ───────────────────────────────────────────────────────────────
DB_PATH = "/home/h3r7/leadhunter.db"
# Statuts valides pour un lead
VALID_STATUSES = {"new", "contacted", "closed", "rejected"}
# ─── Initialisation ──────────────────────────────────────────────────────────
def init_db(db_path: str = DB_PATH) -> None:
"""
Crée la base SQLite et la table leads si elle n'existe pas.
Idempotent — peut être appelé au démarrage de l'API.
"""
with sqlite3.connect(db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS leads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
name TEXT NOT NULL,
address TEXT,
phone TEXT,
rating REAL,
reviews_count INTEGER,
website TEXT,
score INTEGER,
rgpd_ok BOOLEAN DEFAULT 1,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'new'
)
""")
conn.commit()
logger.info(f"DB initialisée : {db_path}")
# ─── Context manager ─────────────────────────────────────────────────────────
@contextmanager
def _get_conn(db_path: str = DB_PATH):
"""Fournit une connexion SQLite avec row_factory."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
logger.warning(f"DB transaction rollback : {e}")
raise
finally:
conn.close()
# ─── CRUD ────────────────────────────────────────────────────────────────────
def insert_lead(lead: dict, db_path: str = DB_PATH) -> Optional[int]:
"""
Insère un lead normalisé dans la DB.
Args:
lead: dict avec les champs normalisés (source, name, address, ...)
db_path: chemin vers la DB SQLite.
Returns:
L'id SQLite du lead inséré, ou None en cas d'erreur.
"""
try:
with _get_conn(db_path) as conn:
cursor = conn.execute(
"""
INSERT INTO leads
(source, name, address, phone, rating, reviews_count,
website, score, rgpd_ok, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
lead.get("source", "unknown"),
lead.get("name", ""),
lead.get("address", ""),
lead.get("phone", ""),
lead.get("rating"),
lead.get("reviews_count"),
lead.get("website", ""),
lead.get("score"),
1 if lead.get("rgpd_ok", True) else 0,
lead.get("status", "new"),
),
)
lead_id = cursor.lastrowid
logger.info(f"Lead inséré id={lead_id} : {lead.get('name')}")
return lead_id
except Exception as e:
logger.warning(f"insert_lead error : {e}")
return None
def insert_leads(leads: list[dict], db_path: str = DB_PATH) -> list[int]:
"""
Insère une liste de leads en batch.
Returns:
Liste des ids insérés.
"""
ids = []
for lead in leads:
lead_id = insert_lead(lead, db_path)
if lead_id is not None:
ids.append(lead_id)
logger.info(f"insert_leads : {len(ids)}/{len(leads)} insérés.")
return ids
def get_leads(
status: Optional[str] = None,
limit: int = 100,
offset: int = 0,
db_path: str = DB_PATH,
) -> list[dict]:
"""
Récupère les leads avec filtre optionnel sur le statut.
Args:
status: filtre sur le champ 'status' (new, contacted, closed, rejected).
limit: pagination — nombre de résultats max.
offset: pagination — décalage.
Returns:
Liste de dicts (tous les champs de la table leads).
"""
try:
with _get_conn(db_path) as conn:
if status:
rows = conn.execute(
"SELECT * FROM leads WHERE status = ? ORDER BY score DESC, scraped_at DESC LIMIT ? OFFSET ?",
(status, limit, offset),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM leads ORDER BY score DESC, scraped_at DESC LIMIT ? OFFSET ?",
(limit, offset),
).fetchall()
return [dict(r) for r in rows]
except Exception as e:
logger.warning(f"get_leads error : {e}")
return []
def get_lead_by_id(lead_id: int, db_path: str = DB_PATH) -> Optional[dict]:
"""Récupère un lead par son id."""
try:
with _get_conn(db_path) as conn:
row = conn.execute(
"SELECT * FROM leads WHERE id = ?", (lead_id,)
).fetchone()
return dict(row) if row else None
except Exception as e:
logger.warning(f"get_lead_by_id error : {e}")
return None
def update_lead_status(lead_id: int, status: str, db_path: str = DB_PATH) -> bool:
"""
Met à jour le statut d'un lead.
Args:
lead_id: id du lead.
status: nouveau statut ('new', 'contacted', 'closed', 'rejected').
Returns:
True si mise à jour réussie, False sinon.
"""
if status not in VALID_STATUSES:
logger.warning(f"update_lead_status : statut invalide '{status}'")
return False
try:
with _get_conn(db_path) as conn:
conn.execute(
"UPDATE leads SET status = ? WHERE id = ?",
(status, lead_id),
)
logger.info(f"Lead id={lead_id} statut → {status}")
return True
except Exception as e:
logger.warning(f"update_lead_status error : {e}")
return False
def get_stats(db_path: str = DB_PATH) -> dict:
"""
Retourne les statistiques globales du CRM.
Returns:
Dict avec total, by_status, by_source, avg_score, top_leads_count
"""
try:
with _get_conn(db_path) as conn:
total = conn.execute("SELECT COUNT(*) FROM leads").fetchone()[0]
by_status_rows = conn.execute(
"SELECT status, COUNT(*) as cnt FROM leads GROUP BY status"
).fetchall()
by_status = {r["status"]: r["cnt"] for r in by_status_rows}
by_source_rows = conn.execute(
"SELECT source, COUNT(*) as cnt FROM leads GROUP BY source"
).fetchall()
by_source = {r["source"]: r["cnt"] for r in by_source_rows}
avg_score_row = conn.execute(
"SELECT AVG(score) FROM leads WHERE score IS NOT NULL"
).fetchone()
avg_score = round(avg_score_row[0] or 0, 2)
# Leads "chauds" = score ≥ 5
top_count = conn.execute(
"SELECT COUNT(*) FROM leads WHERE score >= 5"
).fetchone()[0]
return {
"total": total,
"by_status": by_status,
"by_source": by_source,
"avg_score": avg_score,
"top_leads_count": top_count,
"generated_at": datetime.utcnow().isoformat() + "Z",
}
except Exception as e:
logger.warning(f"get_stats error : {e}")
return {}
def export_csv(
status: Optional[str] = None,
db_path: str = DB_PATH,
) -> str:
"""
Exporte les leads en CSV (string).
Args:
status: filtre optionnel sur le statut.
Returns:
Contenu CSV en string UTF-8.
"""
leads = get_leads(status=status, limit=10000, db_path=db_path)
output = io.StringIO()
fieldnames = [
"id",
"source",
"name",
"address",
"phone",
"rating",
"reviews_count",
"website",
"score",
"rgpd_ok",
"scraped_at",
"status",
]
writer = csv.DictWriter(output, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
writer.writerows(leads)
logger.info(f"export_csv : {len(leads)} leads exportés.")
return output.getvalue()
# ─── CLI (debug) ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
init_db()
# Test insertion
test_lead = {
"source": "google_places",
"name": "Restaurant Test",
"address": "10 rue de la Paix, 59000 Lille",
"phone": "+33 3 20 00 00 01",
"rating": 4.5,
"reviews_count": 120,
"website": "",
"score": 8,
"rgpd_ok": True,
"status": "new",
}
lead_id = insert_lead(test_lead)
print(f"Lead inséré : id={lead_id}")
leads = get_leads()
print(f"Leads en DB : {len(leads)}")
stats = get_stats()
print(f"Stats : {stats}")