289 lines
9.0 KiB
Python
Executable File
289 lines
9.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
H3R7Tech - Web Scraper pour Artisans
|
|
=====================================
|
|
Extraction de données depuis Pages Jaunes / Google Maps
|
|
Stockage vers CRM + Export Google Sheets
|
|
|
|
Auteur: H3R7Tech
|
|
Date: 25/02/2026
|
|
"""
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
import json
|
|
import csv
|
|
import os
|
|
from datetime import datetime
|
|
import urllib.parse
|
|
|
|
# Configuration
|
|
CRM_FILE = '/home/h3r7/turf_scraper/crm_prospects.json'
|
|
EXPORT_DIR = '/home/h3r7/turf_scraper/exports/'
|
|
|
|
# Créer le dossier exports
|
|
os.makedirs(EXPORT_DIR, exist_ok=True)
|
|
|
|
class ScraperArtisans:
|
|
"""Classe principale pour le scraping des artisans"""
|
|
|
|
def __init__(self):
|
|
self.headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
'Accept-Language': 'fr-FR,fr;q=0.9,en;q=0.8',
|
|
}
|
|
|
|
def search_pagesjaunes(self, profession, ville, cp):
|
|
"""
|
|
Recherche sur Pages Jaunes
|
|
Note: Souvent bloqué par Cloudflare
|
|
"""
|
|
url = f"https://www.pagesjaunes.fr/annuaire/{ville.lower()}-{cp}/{profession.lower()}"
|
|
|
|
try:
|
|
r = requests.get(url, headers=self.headers, timeout=10)
|
|
if r.status_code == 200:
|
|
return self._parse_pagesjaunes(r.text)
|
|
except Exception as e:
|
|
print(f"❌ PagesJaunes bloqué: {e}")
|
|
|
|
return []
|
|
|
|
def _parse_pagesjaunes(self, html):
|
|
"""Parse le HTML de Pages Jaunes"""
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
results = []
|
|
|
|
for item in soup.select('.bi-thu__ItemSearchResult')[:20]:
|
|
try:
|
|
name = item.select_one('.bi-thu__Title')
|
|
addr = item.select_one('.bi-thu__Address')
|
|
phone = item.select_one('.bi-thu__PhoneNumber')
|
|
|
|
if name:
|
|
results.append({
|
|
'nom': name.get_text(strip=True),
|
|
'adresse': addr.get_text(strip=True) if addr else '',
|
|
'telephone': phone.get_text(strip=True) if phone else '',
|
|
'website': '',
|
|
'note': '',
|
|
'avis': ''
|
|
})
|
|
except:
|
|
continue
|
|
|
|
return results
|
|
|
|
def search_google_maps(self, profession, ville, cp):
|
|
"""
|
|
Recherche via Google Maps (méthode alternative)
|
|
Utilise la recherche Google classique
|
|
"""
|
|
query = f"{profession} {ville} {cp}"
|
|
url = f"https://www.google.com/search?q={urllib.parse.quote(query)}+annuaire"
|
|
|
|
try:
|
|
r = requests.get(url, headers=self.headers, timeout=15)
|
|
if r.status_code == 200:
|
|
return self._parse_google_results(r.text)
|
|
except Exception as e:
|
|
print(f"❌ Google bloqué: {e}")
|
|
|
|
return []
|
|
|
|
def _parse_google_results(self, html):
|
|
"""Parse les résultats Google"""
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
results = []
|
|
|
|
# Chercher les éléments de résultats
|
|
for item in soup.select('.g')[:15]:
|
|
try:
|
|
title = item.select_one('h3')
|
|
if title:
|
|
text = title.get_text(strip=True)
|
|
# Chercher téléphone dans le texte
|
|
phone = ''
|
|
if '06' in text or '07' in text:
|
|
for word in text.split():
|
|
if word.startswith('0') and len(word) == 10:
|
|
phone = word
|
|
|
|
results.append({
|
|
'nom': text[:100],
|
|
'adresse': '',
|
|
'telephone': phone,
|
|
'website': '',
|
|
'note': '',
|
|
'avis': ''
|
|
})
|
|
except:
|
|
continue
|
|
|
|
return results
|
|
|
|
|
|
class CRMManager:
|
|
"""Gestion du CRM local"""
|
|
|
|
def __init__(self):
|
|
self.file = CRM_FILE
|
|
|
|
def load(self):
|
|
if os.path.exists(self.file):
|
|
with open(self.file, 'r') as f:
|
|
return json.load(f)
|
|
return {"prospects": [], "last_id": 0}
|
|
|
|
def save(self, data):
|
|
with open(self.file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
def add_prospect(self, data):
|
|
"""Ajouter un prospect au CRM"""
|
|
crm = self.load()
|
|
|
|
crm['last_id'] += 1
|
|
prospect = {
|
|
'id': crm['last_id'],
|
|
'nom': data.get('nom', ''),
|
|
'entreprise': data.get('entreprise', data.get('nom', '')),
|
|
'tel': data.get('telephone', '').replace(' ', '').replace('.', ''),
|
|
'email': '',
|
|
'secteur': data.get('profession', 'Artisan'),
|
|
'statut': 'nouveau',
|
|
'score': self._calculate_score(data),
|
|
'notes': f"Adresse: {data.get('adresse', '')} | Note: {data.get('note', '')}",
|
|
'source': 'Scraping',
|
|
'created': datetime.now().isoformat(),
|
|
'updated': datetime.now().isoformat()
|
|
}
|
|
|
|
crm['prospects'].append(prospect)
|
|
self.save(crm)
|
|
|
|
return prospect['id']
|
|
|
|
def _calculate_score(self, data):
|
|
"""Calculer le score de qualification"""
|
|
score = 1
|
|
|
|
if data.get('telephone'):
|
|
score += 1
|
|
if data.get('note'):
|
|
score += 1
|
|
if data.get('avis'):
|
|
score += 1
|
|
|
|
return min(score, 5)
|
|
|
|
def export_csv(self, filename=None):
|
|
"""Exporter vers CSV"""
|
|
if not filename:
|
|
filename = f"prospects_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
|
|
|
crm = self.load()
|
|
filepath = os.path.join(EXPORT_DIR, filename)
|
|
|
|
with open(filepath, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(['ID', 'Nom', 'Entreprise', 'Téléphone', 'Email', 'Secteur', 'Statut', 'Score', 'Adresse', 'Source', 'Date création'])
|
|
|
|
for p in crm['prospects']:
|
|
writer.writerow([
|
|
p.get('id', ''),
|
|
p.get('nom', ''),
|
|
p.get('entreprise', ''),
|
|
p.get('tel', ''),
|
|
p.get('email', ''),
|
|
p.get('secteur', ''),
|
|
p.get('statut', ''),
|
|
p.get('score', ''),
|
|
p.get('notes', ''),
|
|
p.get('source', ''),
|
|
p.get('created', '')[:10]
|
|
])
|
|
|
|
return filepath
|
|
|
|
def get_stats(self):
|
|
"""Obtenir les statistiques"""
|
|
crm = self.load()
|
|
prospects = crm['prospects']
|
|
|
|
stats = {
|
|
'total': len(prospects),
|
|
'par_statut': {},
|
|
'par_secteur': {},
|
|
'score_moyen': 0
|
|
}
|
|
|
|
total_score = 0
|
|
for p in prospects:
|
|
statut = p.get('statut', 'nouveau')
|
|
stats['par_statut'][statut] = stats['par_statut'].get(statut, 0) + 1
|
|
|
|
secteur = p.get('secteur', 'Autre')
|
|
stats['par_secteur'][secteur] = stats['par_secteur'].get(secteur, 0) + 1
|
|
|
|
total_score += p.get('score', 0)
|
|
|
|
if prospects:
|
|
stats['score_moyen'] = round(total_score / len(prospects), 1)
|
|
|
|
return stats
|
|
|
|
|
|
def main():
|
|
"""Fonction principale - Demo"""
|
|
print("=" * 60)
|
|
print("🏢 H3R7Tech - Web Scraper pour Artisans")
|
|
print("=" * 60)
|
|
|
|
# Demo: Ajouter des prospects simulate
|
|
demo_data = [
|
|
{
|
|
'nom': 'Dupont Philippe',
|
|
'entreprise': 'Dupont Cordonnerie',
|
|
'telephone': '0320123456',
|
|
'adresse': '45 Rue Nationale, Lille 59000',
|
|
'profession': 'Cordonnier',
|
|
'note': '4.5',
|
|
'avis': '120'
|
|
},
|
|
{
|
|
'nom': 'Martin Jean',
|
|
'entreprise': 'Martin Réparation',
|
|
'telephone': '0320987654',
|
|
'adresse': '12 Avenue de la République, Lille 59000',
|
|
'profession': 'Cordonnier',
|
|
'note': '4.2',
|
|
'avis': '85'
|
|
}
|
|
]
|
|
|
|
# Ajouter au CRM
|
|
crm = CRMManager()
|
|
for data in demo_data:
|
|
cid = crm.add_prospect(data)
|
|
print(f"✅ Ajouté: {data['nom']} (ID: {cid})")
|
|
|
|
# Exporter vers CSV
|
|
csv_file = crm.export_csv()
|
|
print(f"\n💾 Export CSV: {csv_file}")
|
|
|
|
# Afficher les stats
|
|
stats = crm.get_stats()
|
|
print(f"\n📊 Statistiques CRM:")
|
|
print(f" Total prospects: {stats['total']}")
|
|
print(f" Score moyen: {stats['score_moyen']}")
|
|
|
|
print("\n" + "=" * 60)
|
|
print("🚀 Pour lancer une recherche réelle:")
|
|
print(" python3 scraper_artisans.py --profession 'cordonnier' --ville 'lille' --cp '59000'")
|
|
print("=" * 60)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|