#!/usr/bin/env python3 """ Tests — Multi-compte / Organisations Pro Sprint: HRT-82 Couvre : - Migration DB (tables organizations + org_members) - POST /api/v1/org - GET /api/v1/org - DELETE /api/v1/org - POST /api/v1/org/invite - GET /api/v1/org/members - DELETE /api/v1/org/members/ - Plan enforcement (plan != pro → 403) - Contraintes métier (1 org/owner, max 5 membres, doublons, etc.) Run: ./venv/bin/pytest tests/test_org.py -v --tb=short """ import os import sys import tempfile import secrets import pytest # ─── Isolated temp DB ──────────────────────────────────────────────────────── _tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False) _tmp_db.close() os.environ["TURF_SAAS_DB"] = _tmp_db.name sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) # ─── App import (après configuration env) ──────────────────────────────────── import sqlite3 from org_db import get_db, migrate_org_tables from saas_auth import get_db as auth_get_db, init_users_table, generate_token # ─── Helpers ───────────────────────────────────────────────────────────────── def _create_user(email: str, plan: str = "free") -> dict: """Crée un utilisateur directement en DB et retourne son token + id.""" init_users_table() uid = secrets.token_hex(16) pw_hash = "hashed" conn = auth_get_db() conn.execute( "INSERT OR IGNORE INTO saas_users (id, email, firstname, lastname, password_hash, plan) " "VALUES (?,?,?,?,?,?)", (uid, email, "Test", "User", pw_hash, plan), ) conn.commit() conn.close() token = generate_token(uid) return {"id": uid, "email": email, "token": token, "plan": plan} def _auth_header(token: str) -> dict: return {"Authorization": f"Bearer {token}"} # ─── Flask app fixture ─────────────────────────────────────────────────────── @pytest.fixture(scope="module") def app(): """Crée l'app Flask avec les blueprints org enregistrés.""" from flask import Flask from flask_cors import CORS from saas_auth import auth_bp from api_v1.routes.org import org_bp application = Flask(__name__) CORS(application) application.config["TESTING"] = True # S'assurer que la migration a tourné migrate_org_tables() application.register_blueprint(auth_bp) application.register_blueprint(org_bp) yield application @pytest.fixture(scope="module") def client(app): return app.test_client() # ─── Users fixtures ─────────────────────────────────────────────────────────── @pytest.fixture(scope="module") def pro_owner(app): """Un utilisateur Pro qui va créer une org.""" with app.app_context(): return _create_user("owner_pro@test.com", plan="pro") @pytest.fixture(scope="module") def pro_user2(app): """Un 2e utilisateur Pro à inviter.""" with app.app_context(): return _create_user("member2_pro@test.com", plan="pro") @pytest.fixture(scope="module") def pro_user3(app): with app.app_context(): return _create_user("member3_pro@test.com", plan="pro") @pytest.fixture(scope="module") def pro_user4(app): with app.app_context(): return _create_user("member4_pro@test.com", plan="pro") @pytest.fixture(scope="module") def pro_user5(app): with app.app_context(): return _create_user("member5_pro@test.com", plan="pro") @pytest.fixture(scope="module") def pro_user6(app): """6e utilisateur pour tester la limite MAX_MEMBERS.""" with app.app_context(): return _create_user("member6_pro@test.com", plan="pro") @pytest.fixture(scope="module") def free_user(app): with app.app_context(): return _create_user("free_user@test.com", plan="free") @pytest.fixture(scope="module") def other_pro_owner(app): """Un 2e owner Pro (pour tester conflits inter-orgs).""" with app.app_context(): return _create_user("other_owner@test.com", plan="pro") # ═══════════════════════════════════════════════════════════════════════════════ # Tests DB migration # ═══════════════════════════════════════════════════════════════════════════════ class TestOrgDbMigration: def test_tables_exist(self): """Les tables organizations et org_members doivent exister.""" conn = get_db() tables = { row[0] for row in conn.execute("SELECT name FROM sqlite_master WHERE type='table'") } conn.close() assert "organizations" in tables, "Table organizations manquante" assert "org_members" in tables, "Table org_members manquante" def test_migration_idempotent(self): """Appeler migrate_org_tables() deux fois ne doit pas lever d'erreur.""" migrate_org_tables() # 2e appel — doit être silencieux self.test_tables_exist() def test_org_members_unique_constraint(self): """UNIQUE(org_id, user_id) doit être présent.""" conn = get_db() indexes = [row[1] for row in conn.execute("PRAGMA index_list(org_members)")] conn.close() # Il doit y avoir un index d'unicité assert ( any( "unique" in idx.lower() or "org_members" in idx.lower() for idx in indexes ) or True ) # On vérifie via insertion en double conn = get_db() oid = "test_org_unique" uid = "test_uid_unique" try: conn.execute( "INSERT OR IGNORE INTO organizations (id, owner_id, name) VALUES (?,?,?)", (oid, uid, "TestOrg"), ) conn.execute( "INSERT INTO org_members (org_id, user_id, role, invited_at, joined_at) " "VALUES (?,?,'member',datetime('now'),datetime('now'))", (oid, uid), ) conn.commit() # 2e insertion doit lever IntegrityError with pytest.raises(sqlite3.IntegrityError): conn.execute( "INSERT INTO org_members (org_id, user_id, role, invited_at, joined_at) " "VALUES (?,?,'member',datetime('now'),datetime('now'))", (oid, uid), ) conn.commit() finally: conn.execute("DELETE FROM org_members WHERE org_id=?", (oid,)) conn.execute("DELETE FROM organizations WHERE id=?", (oid,)) conn.commit() conn.close() # ═══════════════════════════════════════════════════════════════════════════════ # Tests plan enforcement # ═══════════════════════════════════════════════════════════════════════════════ class TestPlanEnforcement: def test_create_org_free_plan_403(self, client, free_user): """Un utilisateur free ne peut pas créer une org.""" resp = client.post( "/api/v1/org", json={"name": "FreePlanOrg"}, headers=_auth_header(free_user["token"]), ) assert resp.status_code == 403 data = resp.get_json() assert data["required"] == "pro" def test_get_org_free_plan_403(self, client, free_user): resp = client.get("/api/v1/org", headers=_auth_header(free_user["token"])) assert resp.status_code == 403 def test_invite_free_plan_403(self, client, free_user): resp = client.post( "/api/v1/org/invite", json={"email": "someone@test.com"}, headers=_auth_header(free_user["token"]), ) assert resp.status_code == 403 def test_members_free_plan_403(self, client, free_user): resp = client.get( "/api/v1/org/members", headers=_auth_header(free_user["token"]) ) assert resp.status_code == 403 def test_no_token_401(self, client): resp = client.get("/api/v1/org") assert resp.status_code == 401 # ═══════════════════════════════════════════════════════════════════════════════ # Tests création d'organisation # ═══════════════════════════════════════════════════════════════════════════════ class TestCreateOrg: def test_create_org_success(self, client, pro_owner): """Un Pro peut créer une organisation.""" resp = client.post( "/api/v1/org", json={"name": "H3R7 Racing Club"}, headers=_auth_header(pro_owner["token"]), ) assert resp.status_code == 201 data = resp.get_json() assert "org" in data assert data["org"]["name"] == "H3R7 Racing Club" assert data["org"]["owner_id"] == pro_owner["id"] assert data["org"]["max_members"] == 5 def test_create_org_duplicate_409(self, client, pro_owner): """Un Pro ne peut pas créer 2 organisations.""" resp = client.post( "/api/v1/org", json={"name": "Second Org"}, headers=_auth_header(pro_owner["token"]), ) assert resp.status_code == 409 data = resp.get_json() assert "org_id" in data def test_create_org_missing_name_400(self, client, pro_owner): """Le nom est obligatoire.""" resp = client.post( "/api/v1/org", json={}, headers=_auth_header(pro_owner["token"]), ) assert resp.status_code == 400 def test_create_org_empty_name_400(self, client, pro_owner): resp = client.post( "/api/v1/org", json={"name": " "}, headers=_auth_header(pro_owner["token"]), ) assert resp.status_code == 400 def test_create_org_name_too_long_400(self, client, pro_owner): resp = client.post( "/api/v1/org", json={"name": "x" * 101}, headers=_auth_header(pro_owner["token"]), ) assert resp.status_code == 400 # ═══════════════════════════════════════════════════════════════════════════════ # Tests lecture d'organisation # ═══════════════════════════════════════════════════════════════════════════════ class TestGetOrg: def test_get_org_as_owner(self, client, pro_owner): resp = client.get("/api/v1/org", headers=_auth_header(pro_owner["token"])) assert resp.status_code == 200 data = resp.get_json() assert data["org"]["owner_id"] == pro_owner["id"] assert data["org"]["member_count"] >= 1 # au moins l'owner def test_get_org_not_found_404(self, client, other_pro_owner): """Un Pro sans org reçoit 404 avant d'en créer une.""" # other_pro_owner n'a pas encore d'org dans ce test resp = client.get("/api/v1/org", headers=_auth_header(other_pro_owner["token"])) # Peut être 404 ou 200 selon l'ordre d'exécution; on accepte les deux ici assert resp.status_code in (200, 404) # ═══════════════════════════════════════════════════════════════════════════════ # Tests invitation de membres # ═══════════════════════════════════════════════════════════════════════════════ class TestInviteMember: def test_invite_member_success(self, client, pro_owner, pro_user2): resp = client.post( "/api/v1/org/invite", json={"email": pro_user2["email"]}, headers=_auth_header(pro_owner["token"]), ) assert resp.status_code == 201 data = resp.get_json() assert data["member"]["user_id"] == pro_user2["id"] assert data["member"]["role"] == "member" def test_invite_member_duplicate_409(self, client, pro_owner, pro_user2): """Inviter 2x le même utilisateur → 409.""" resp = client.post( "/api/v1/org/invite", json={"email": pro_user2["email"]}, headers=_auth_header(pro_owner["token"]), ) assert resp.status_code == 409 def test_invite_unknown_email_404(self, client, pro_owner): resp = client.post( "/api/v1/org/invite", json={"email": "nobody@nowhere.com"}, headers=_auth_header(pro_owner["token"]), ) assert resp.status_code == 404 def test_invite_invalid_email_400(self, client, pro_owner): resp = client.post( "/api/v1/org/invite", json={"email": "not-an-email"}, headers=_auth_header(pro_owner["token"]), ) assert resp.status_code == 400 def test_invite_non_owner_403(self, client, pro_user2): """Un simple membre ne peut pas inviter.""" resp = client.post( "/api/v1/org/invite", json={"email": "anyone@test.com"}, headers=_auth_header(pro_user2["token"]), ) assert resp.status_code == 403 def test_invite_fill_to_max( self, client, pro_owner, pro_user3, pro_user4, pro_user5 ): """Remplir jusqu'à 5 membres (owner + 4 invités).""" for u in (pro_user3, pro_user4, pro_user5): resp = client.post( "/api/v1/org/invite", json={"email": u["email"]}, headers=_auth_header(pro_owner["token"]), ) assert resp.status_code == 201, ( f"Invitation de {u['email']} échouée: {resp.get_json()}" ) def test_invite_exceeds_max_403(self, client, pro_owner, pro_user6): """Le 6e membre doit être refusé (max 5).""" resp = client.post( "/api/v1/org/invite", json={"email": pro_user6["email"]}, headers=_auth_header(pro_owner["token"]), ) assert resp.status_code == 403 data = resp.get_json() assert "Limite" in data["error"] or "limite" in data["error"].lower() # ═══════════════════════════════════════════════════════════════════════════════ # Tests liste des membres # ═══════════════════════════════════════════════════════════════════════════════ class TestListMembers: def test_list_members_as_owner(self, client, pro_owner): resp = client.get( "/api/v1/org/members", headers=_auth_header(pro_owner["token"]) ) assert resp.status_code == 200 data = resp.get_json() assert "members" in data assert data["count"] == 5 # owner + 4 invités (pro_user2..5) assert data["max_members"] == 5 def test_list_members_as_member(self, client, pro_user2): """Un membre peut aussi consulter la liste.""" resp = client.get( "/api/v1/org/members", headers=_auth_header(pro_user2["token"]) ) assert resp.status_code == 200 data = resp.get_json() assert data["count"] >= 1 def test_list_members_includes_email(self, client, pro_owner, pro_user2): resp = client.get( "/api/v1/org/members", headers=_auth_header(pro_owner["token"]) ) data = resp.get_json() emails = [m["email"] for m in data["members"]] assert pro_user2["email"] in emails def test_list_members_no_org_404(self, client, pro_user6): """Un Pro sans org reçoit 404.""" resp = client.get( "/api/v1/org/members", headers=_auth_header(pro_user6["token"]) ) assert resp.status_code == 404 # ═══════════════════════════════════════════════════════════════════════════════ # Tests suppression de membre # ═══════════════════════════════════════════════════════════════════════════════ class TestRemoveMember: def test_remove_member_success(self, client, pro_owner, pro_user5): resp = client.delete( f"/api/v1/org/members/{pro_user5['id']}", headers=_auth_header(pro_owner["token"]), ) assert resp.status_code == 200 data = resp.get_json() assert data["removed_user_id"] == pro_user5["id"] def test_remove_self_as_owner_400(self, client, pro_owner): """L'owner ne peut pas se retirer lui-même.""" resp = client.delete( f"/api/v1/org/members/{pro_owner['id']}", headers=_auth_header(pro_owner["token"]), ) assert resp.status_code == 400 def test_remove_nonexistent_member_404(self, client, pro_owner): resp = client.delete( "/api/v1/org/members/nonexistent-id-xyz", headers=_auth_header(pro_owner["token"]), ) assert resp.status_code == 404 def test_remove_member_non_owner_403(self, client, pro_user2, pro_user3): """Un simple membre ne peut pas retirer un autre membre.""" resp = client.delete( f"/api/v1/org/members/{pro_user3['id']}", headers=_auth_header(pro_user2["token"]), ) assert resp.status_code == 403 def test_can_invite_again_after_removal(self, client, pro_owner, pro_user5): """Après retrait, on peut ré-inviter (slot libéré).""" resp = client.post( "/api/v1/org/invite", json={"email": pro_user5["email"]}, headers=_auth_header(pro_owner["token"]), ) assert resp.status_code == 201 # ═══════════════════════════════════════════════════════════════════════════════ # Tests suppression d'organisation # ═══════════════════════════════════════════════════════════════════════════════ class TestDeleteOrg: def test_delete_org_non_owner_403(self, client, pro_user2): """Un simple membre ne peut pas supprimer l'org.""" resp = client.delete("/api/v1/org", headers=_auth_header(pro_user2["token"])) assert resp.status_code == 403 def test_delete_org_success(self, client, pro_owner): """L'owner peut supprimer l'organisation.""" resp = client.delete("/api/v1/org", headers=_auth_header(pro_owner["token"])) assert resp.status_code == 200 data = resp.get_json() assert data["ok"] is True def test_get_org_after_delete_404(self, client, pro_owner): """Après suppression, GET /org renvoie 404.""" resp = client.get("/api/v1/org", headers=_auth_header(pro_owner["token"])) assert resp.status_code == 404 def test_delete_org_no_org_403(self, client, pro_owner): """Supprimer une org qui n'existe plus → 403.""" resp = client.delete("/api/v1/org", headers=_auth_header(pro_owner["token"])) assert resp.status_code == 403 def test_members_cascade_deleted(self, client, pro_user2): """Après suppression de l'org, les membres ne trouvent plus d'org.""" resp = client.get( "/api/v1/org/members", headers=_auth_header(pro_user2["token"]) ) assert resp.status_code == 404