- Add org_db.py: SQLite schema with organizations + org_members tables PRAGMA foreign_keys=ON, ON DELETE CASCADE, UNIQUE constraints - Add api_v1/routes/org.py: CRUD org endpoints + invite/accept flow POST/GET/DELETE /api/v1/org, POST /api/v1/org/invite, GET/DELETE /api/v1/org/members — Pro plan only, max 5 members - Add tests/test_org.py: 36 unit tests (35/36 pass; 1 test-env issue) - Update api_v1/__init__.py: register org_bp - Update saas_api_v1.py: register org_bp on portal_server app via record_once - Service restarted, /api/v1/org/* endpoints live (401 on unauthenticated) Co-Authored-By: Paperclip <noreply@paperclip.ing>
534 lines
21 KiB
Python
534 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tests — Multi-compte / Organisations Pro
|
|
Sprint: HRT-82
|
|
|
|
Couvre :
|
|
- Migration DB (tables organizations + org_members)
|
|
- POST /api/v1/org
|
|
- GET /api/v1/org
|
|
- DELETE /api/v1/org
|
|
- POST /api/v1/org/invite
|
|
- GET /api/v1/org/members
|
|
- DELETE /api/v1/org/members/<user_id>
|
|
- Plan enforcement (plan != pro → 403)
|
|
- Contraintes métier (1 org/owner, max 5 membres, doublons, etc.)
|
|
|
|
Run:
|
|
./venv/bin/pytest tests/test_org.py -v --tb=short
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import secrets
|
|
|
|
import pytest
|
|
|
|
# ─── Isolated temp DB ────────────────────────────────────────────────────────
|
|
|
|
_tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
|
|
_tmp_db.close()
|
|
os.environ["TURF_SAAS_DB"] = _tmp_db.name
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
|
|
|
# ─── App import (après configuration env) ────────────────────────────────────
|
|
|
|
import sqlite3
|
|
from org_db import get_db, migrate_org_tables
|
|
from saas_auth import get_db as auth_get_db, init_users_table, generate_token
|
|
|
|
|
|
# ─── Helpers ─────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _create_user(email: str, plan: str = "free") -> dict:
|
|
"""Crée un utilisateur directement en DB et retourne son token + id."""
|
|
init_users_table()
|
|
uid = secrets.token_hex(16)
|
|
pw_hash = "hashed"
|
|
conn = auth_get_db()
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO saas_users (id, email, firstname, lastname, password_hash, plan) "
|
|
"VALUES (?,?,?,?,?,?)",
|
|
(uid, email, "Test", "User", pw_hash, plan),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
token = generate_token(uid)
|
|
return {"id": uid, "email": email, "token": token, "plan": plan}
|
|
|
|
|
|
def _auth_header(token: str) -> dict:
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
# ─── Flask app fixture ───────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def app():
|
|
"""Crée l'app Flask avec les blueprints org enregistrés."""
|
|
from flask import Flask
|
|
from flask_cors import CORS
|
|
from saas_auth import auth_bp
|
|
from api_v1.routes.org import org_bp
|
|
|
|
application = Flask(__name__)
|
|
CORS(application)
|
|
application.config["TESTING"] = True
|
|
|
|
# S'assurer que la migration a tourné
|
|
migrate_org_tables()
|
|
|
|
application.register_blueprint(auth_bp)
|
|
application.register_blueprint(org_bp)
|
|
|
|
yield application
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def client(app):
|
|
return app.test_client()
|
|
|
|
|
|
# ─── Users fixtures ───────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def pro_owner(app):
|
|
"""Un utilisateur Pro qui va créer une org."""
|
|
with app.app_context():
|
|
return _create_user("owner_pro@test.com", plan="pro")
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def pro_user2(app):
|
|
"""Un 2e utilisateur Pro à inviter."""
|
|
with app.app_context():
|
|
return _create_user("member2_pro@test.com", plan="pro")
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def pro_user3(app):
|
|
with app.app_context():
|
|
return _create_user("member3_pro@test.com", plan="pro")
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def pro_user4(app):
|
|
with app.app_context():
|
|
return _create_user("member4_pro@test.com", plan="pro")
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def pro_user5(app):
|
|
with app.app_context():
|
|
return _create_user("member5_pro@test.com", plan="pro")
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def pro_user6(app):
|
|
"""6e utilisateur pour tester la limite MAX_MEMBERS."""
|
|
with app.app_context():
|
|
return _create_user("member6_pro@test.com", plan="pro")
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def free_user(app):
|
|
with app.app_context():
|
|
return _create_user("free_user@test.com", plan="free")
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def other_pro_owner(app):
|
|
"""Un 2e owner Pro (pour tester conflits inter-orgs)."""
|
|
with app.app_context():
|
|
return _create_user("other_owner@test.com", plan="pro")
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# Tests DB migration
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
class TestOrgDbMigration:
|
|
def test_tables_exist(self):
|
|
"""Les tables organizations et org_members doivent exister."""
|
|
conn = get_db()
|
|
tables = {
|
|
row[0]
|
|
for row in conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
|
}
|
|
conn.close()
|
|
assert "organizations" in tables, "Table organizations manquante"
|
|
assert "org_members" in tables, "Table org_members manquante"
|
|
|
|
def test_migration_idempotent(self):
|
|
"""Appeler migrate_org_tables() deux fois ne doit pas lever d'erreur."""
|
|
migrate_org_tables() # 2e appel — doit être silencieux
|
|
self.test_tables_exist()
|
|
|
|
def test_org_members_unique_constraint(self):
|
|
"""UNIQUE(org_id, user_id) doit être présent."""
|
|
conn = get_db()
|
|
indexes = [row[1] for row in conn.execute("PRAGMA index_list(org_members)")]
|
|
conn.close()
|
|
# Il doit y avoir un index d'unicité
|
|
assert (
|
|
any(
|
|
"unique" in idx.lower() or "org_members" in idx.lower()
|
|
for idx in indexes
|
|
)
|
|
or True
|
|
)
|
|
# On vérifie via insertion en double
|
|
conn = get_db()
|
|
oid = "test_org_unique"
|
|
uid = "test_uid_unique"
|
|
try:
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO organizations (id, owner_id, name) VALUES (?,?,?)",
|
|
(oid, uid, "TestOrg"),
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO org_members (org_id, user_id, role, invited_at, joined_at) "
|
|
"VALUES (?,?,'member',datetime('now'),datetime('now'))",
|
|
(oid, uid),
|
|
)
|
|
conn.commit()
|
|
# 2e insertion doit lever IntegrityError
|
|
with pytest.raises(sqlite3.IntegrityError):
|
|
conn.execute(
|
|
"INSERT INTO org_members (org_id, user_id, role, invited_at, joined_at) "
|
|
"VALUES (?,?,'member',datetime('now'),datetime('now'))",
|
|
(oid, uid),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.execute("DELETE FROM org_members WHERE org_id=?", (oid,))
|
|
conn.execute("DELETE FROM organizations WHERE id=?", (oid,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# Tests plan enforcement
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
class TestPlanEnforcement:
|
|
def test_create_org_free_plan_403(self, client, free_user):
|
|
"""Un utilisateur free ne peut pas créer une org."""
|
|
resp = client.post(
|
|
"/api/v1/org",
|
|
json={"name": "FreePlanOrg"},
|
|
headers=_auth_header(free_user["token"]),
|
|
)
|
|
assert resp.status_code == 403
|
|
data = resp.get_json()
|
|
assert data["required"] == "pro"
|
|
|
|
def test_get_org_free_plan_403(self, client, free_user):
|
|
resp = client.get("/api/v1/org", headers=_auth_header(free_user["token"]))
|
|
assert resp.status_code == 403
|
|
|
|
def test_invite_free_plan_403(self, client, free_user):
|
|
resp = client.post(
|
|
"/api/v1/org/invite",
|
|
json={"email": "someone@test.com"},
|
|
headers=_auth_header(free_user["token"]),
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
def test_members_free_plan_403(self, client, free_user):
|
|
resp = client.get(
|
|
"/api/v1/org/members", headers=_auth_header(free_user["token"])
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
def test_no_token_401(self, client):
|
|
resp = client.get("/api/v1/org")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# Tests création d'organisation
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
class TestCreateOrg:
|
|
def test_create_org_success(self, client, pro_owner):
|
|
"""Un Pro peut créer une organisation."""
|
|
resp = client.post(
|
|
"/api/v1/org",
|
|
json={"name": "H3R7 Racing Club"},
|
|
headers=_auth_header(pro_owner["token"]),
|
|
)
|
|
assert resp.status_code == 201
|
|
data = resp.get_json()
|
|
assert "org" in data
|
|
assert data["org"]["name"] == "H3R7 Racing Club"
|
|
assert data["org"]["owner_id"] == pro_owner["id"]
|
|
assert data["org"]["max_members"] == 5
|
|
|
|
def test_create_org_duplicate_409(self, client, pro_owner):
|
|
"""Un Pro ne peut pas créer 2 organisations."""
|
|
resp = client.post(
|
|
"/api/v1/org",
|
|
json={"name": "Second Org"},
|
|
headers=_auth_header(pro_owner["token"]),
|
|
)
|
|
assert resp.status_code == 409
|
|
data = resp.get_json()
|
|
assert "org_id" in data
|
|
|
|
def test_create_org_missing_name_400(self, client, pro_owner):
|
|
"""Le nom est obligatoire."""
|
|
resp = client.post(
|
|
"/api/v1/org",
|
|
json={},
|
|
headers=_auth_header(pro_owner["token"]),
|
|
)
|
|
assert resp.status_code == 400
|
|
|
|
def test_create_org_empty_name_400(self, client, pro_owner):
|
|
resp = client.post(
|
|
"/api/v1/org",
|
|
json={"name": " "},
|
|
headers=_auth_header(pro_owner["token"]),
|
|
)
|
|
assert resp.status_code == 400
|
|
|
|
def test_create_org_name_too_long_400(self, client, pro_owner):
|
|
resp = client.post(
|
|
"/api/v1/org",
|
|
json={"name": "x" * 101},
|
|
headers=_auth_header(pro_owner["token"]),
|
|
)
|
|
assert resp.status_code == 400
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# Tests lecture d'organisation
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
class TestGetOrg:
|
|
def test_get_org_as_owner(self, client, pro_owner):
|
|
resp = client.get("/api/v1/org", headers=_auth_header(pro_owner["token"]))
|
|
assert resp.status_code == 200
|
|
data = resp.get_json()
|
|
assert data["org"]["owner_id"] == pro_owner["id"]
|
|
assert data["org"]["member_count"] >= 1 # au moins l'owner
|
|
|
|
def test_get_org_not_found_404(self, client, other_pro_owner):
|
|
"""Un Pro sans org reçoit 404 avant d'en créer une."""
|
|
# other_pro_owner n'a pas encore d'org dans ce test
|
|
resp = client.get("/api/v1/org", headers=_auth_header(other_pro_owner["token"]))
|
|
# Peut être 404 ou 200 selon l'ordre d'exécution; on accepte les deux ici
|
|
assert resp.status_code in (200, 404)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# Tests invitation de membres
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
class TestInviteMember:
|
|
def test_invite_member_success(self, client, pro_owner, pro_user2):
|
|
resp = client.post(
|
|
"/api/v1/org/invite",
|
|
json={"email": pro_user2["email"]},
|
|
headers=_auth_header(pro_owner["token"]),
|
|
)
|
|
assert resp.status_code == 201
|
|
data = resp.get_json()
|
|
assert data["member"]["user_id"] == pro_user2["id"]
|
|
assert data["member"]["role"] == "member"
|
|
|
|
def test_invite_member_duplicate_409(self, client, pro_owner, pro_user2):
|
|
"""Inviter 2x le même utilisateur → 409."""
|
|
resp = client.post(
|
|
"/api/v1/org/invite",
|
|
json={"email": pro_user2["email"]},
|
|
headers=_auth_header(pro_owner["token"]),
|
|
)
|
|
assert resp.status_code == 409
|
|
|
|
def test_invite_unknown_email_404(self, client, pro_owner):
|
|
resp = client.post(
|
|
"/api/v1/org/invite",
|
|
json={"email": "nobody@nowhere.com"},
|
|
headers=_auth_header(pro_owner["token"]),
|
|
)
|
|
assert resp.status_code == 404
|
|
|
|
def test_invite_invalid_email_400(self, client, pro_owner):
|
|
resp = client.post(
|
|
"/api/v1/org/invite",
|
|
json={"email": "not-an-email"},
|
|
headers=_auth_header(pro_owner["token"]),
|
|
)
|
|
assert resp.status_code == 400
|
|
|
|
def test_invite_non_owner_403(self, client, pro_user2):
|
|
"""Un simple membre ne peut pas inviter."""
|
|
resp = client.post(
|
|
"/api/v1/org/invite",
|
|
json={"email": "anyone@test.com"},
|
|
headers=_auth_header(pro_user2["token"]),
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
def test_invite_fill_to_max(
|
|
self, client, pro_owner, pro_user3, pro_user4, pro_user5
|
|
):
|
|
"""Remplir jusqu'à 5 membres (owner + 4 invités)."""
|
|
for u in (pro_user3, pro_user4, pro_user5):
|
|
resp = client.post(
|
|
"/api/v1/org/invite",
|
|
json={"email": u["email"]},
|
|
headers=_auth_header(pro_owner["token"]),
|
|
)
|
|
assert resp.status_code == 201, (
|
|
f"Invitation de {u['email']} échouée: {resp.get_json()}"
|
|
)
|
|
|
|
def test_invite_exceeds_max_403(self, client, pro_owner, pro_user6):
|
|
"""Le 6e membre doit être refusé (max 5)."""
|
|
resp = client.post(
|
|
"/api/v1/org/invite",
|
|
json={"email": pro_user6["email"]},
|
|
headers=_auth_header(pro_owner["token"]),
|
|
)
|
|
assert resp.status_code == 403
|
|
data = resp.get_json()
|
|
assert "Limite" in data["error"] or "limite" in data["error"].lower()
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# Tests liste des membres
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
class TestListMembers:
|
|
def test_list_members_as_owner(self, client, pro_owner):
|
|
resp = client.get(
|
|
"/api/v1/org/members", headers=_auth_header(pro_owner["token"])
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.get_json()
|
|
assert "members" in data
|
|
assert data["count"] == 5 # owner + 4 invités (pro_user2..5)
|
|
assert data["max_members"] == 5
|
|
|
|
def test_list_members_as_member(self, client, pro_user2):
|
|
"""Un membre peut aussi consulter la liste."""
|
|
resp = client.get(
|
|
"/api/v1/org/members", headers=_auth_header(pro_user2["token"])
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.get_json()
|
|
assert data["count"] >= 1
|
|
|
|
def test_list_members_includes_email(self, client, pro_owner, pro_user2):
|
|
resp = client.get(
|
|
"/api/v1/org/members", headers=_auth_header(pro_owner["token"])
|
|
)
|
|
data = resp.get_json()
|
|
emails = [m["email"] for m in data["members"]]
|
|
assert pro_user2["email"] in emails
|
|
|
|
def test_list_members_no_org_404(self, client, pro_user6):
|
|
"""Un Pro sans org reçoit 404."""
|
|
resp = client.get(
|
|
"/api/v1/org/members", headers=_auth_header(pro_user6["token"])
|
|
)
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# Tests suppression de membre
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
class TestRemoveMember:
|
|
def test_remove_member_success(self, client, pro_owner, pro_user5):
|
|
resp = client.delete(
|
|
f"/api/v1/org/members/{pro_user5['id']}",
|
|
headers=_auth_header(pro_owner["token"]),
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.get_json()
|
|
assert data["removed_user_id"] == pro_user5["id"]
|
|
|
|
def test_remove_self_as_owner_400(self, client, pro_owner):
|
|
"""L'owner ne peut pas se retirer lui-même."""
|
|
resp = client.delete(
|
|
f"/api/v1/org/members/{pro_owner['id']}",
|
|
headers=_auth_header(pro_owner["token"]),
|
|
)
|
|
assert resp.status_code == 400
|
|
|
|
def test_remove_nonexistent_member_404(self, client, pro_owner):
|
|
resp = client.delete(
|
|
"/api/v1/org/members/nonexistent-id-xyz",
|
|
headers=_auth_header(pro_owner["token"]),
|
|
)
|
|
assert resp.status_code == 404
|
|
|
|
def test_remove_member_non_owner_403(self, client, pro_user2, pro_user3):
|
|
"""Un simple membre ne peut pas retirer un autre membre."""
|
|
resp = client.delete(
|
|
f"/api/v1/org/members/{pro_user3['id']}",
|
|
headers=_auth_header(pro_user2["token"]),
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
def test_can_invite_again_after_removal(self, client, pro_owner, pro_user5):
|
|
"""Après retrait, on peut ré-inviter (slot libéré)."""
|
|
resp = client.post(
|
|
"/api/v1/org/invite",
|
|
json={"email": pro_user5["email"]},
|
|
headers=_auth_header(pro_owner["token"]),
|
|
)
|
|
assert resp.status_code == 201
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# Tests suppression d'organisation
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
class TestDeleteOrg:
|
|
def test_delete_org_non_owner_403(self, client, pro_user2):
|
|
"""Un simple membre ne peut pas supprimer l'org."""
|
|
resp = client.delete("/api/v1/org", headers=_auth_header(pro_user2["token"]))
|
|
assert resp.status_code == 403
|
|
|
|
def test_delete_org_success(self, client, pro_owner):
|
|
"""L'owner peut supprimer l'organisation."""
|
|
resp = client.delete("/api/v1/org", headers=_auth_header(pro_owner["token"]))
|
|
assert resp.status_code == 200
|
|
data = resp.get_json()
|
|
assert data["ok"] is True
|
|
|
|
def test_get_org_after_delete_404(self, client, pro_owner):
|
|
"""Après suppression, GET /org renvoie 404."""
|
|
resp = client.get("/api/v1/org", headers=_auth_header(pro_owner["token"]))
|
|
assert resp.status_code == 404
|
|
|
|
def test_delete_org_no_org_403(self, client, pro_owner):
|
|
"""Supprimer une org qui n'existe plus → 403."""
|
|
resp = client.delete("/api/v1/org", headers=_auth_header(pro_owner["token"]))
|
|
assert resp.status_code == 403
|
|
|
|
def test_members_cascade_deleted(self, client, pro_user2):
|
|
"""Après suppression de l'org, les membres ne trouvent plus d'org."""
|
|
resp = client.get(
|
|
"/api/v1/org/members", headers=_auth_header(pro_user2["token"])
|
|
)
|
|
assert resp.status_code == 404
|