Coverage for stepclient_traefik / utils.py: 28%
78 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-29 02:37 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-29 02:37 +0000
1import os
2import tempfile
3import logging
4from datetime import datetime, timezone, timedelta
5import subprocess
6from pathlib import Path
8logger = logging.getLogger(__name__)
10import shutil
11import errno
13def atomic_write_text(path: str | Path, text: str, mode: int = 0o644):
14 """Atomically write text to a file."""
15 path = Path(path)
16 os.makedirs(path.parent, exist_ok=True)
17 fd, tmp = tempfile.mkstemp(dir=path.parent)
18 try:
19 os.fchmod(fd, mode)
20 with os.fdopen(fd, "w", encoding="utf-8") as f:
21 f.write(text)
22 f.flush()
23 os.fsync(f.fileno())
24 try:
25 os.replace(tmp, path)
26 except OSError as e:
27 # EBUSY (16) if target is a bind mount (e.g. docker)
28 if e.errno == errno.EBUSY:
29 shutil.copy2(tmp, path)
30 else:
31 raise
32 finally:
33 try:
34 os.unlink(tmp)
35 except FileNotFoundError:
36 pass
38def atomic_write_bytes(path: str | Path, data: bytes, mode: int):
39 """Atomically write bytes to a file."""
40 path = Path(path)
41 os.makedirs(path.parent, exist_ok=True)
42 fd, tmp = tempfile.mkstemp(dir=path.parent)
43 try:
44 os.fchmod(fd, mode)
45 with os.fdopen(fd, "wb") as f:
46 f.write(data)
47 f.flush()
48 os.fsync(f.fileno())
49 try:
50 os.replace(tmp, path)
51 except OSError as e:
52 if e.errno == errno.EBUSY:
53 shutil.copy2(tmp, path)
54 else:
55 raise
56 finally:
57 try:
58 os.unlink(tmp)
59 except FileNotFoundError:
60 pass
62def parse_enddate(cert_path: str | Path) -> datetime | None:
63 """Parse the enddate from a certificate file using openssl."""
64 try:
65 if not os.path.exists(cert_path):
66 return None
67 out = subprocess.check_output(["openssl", "x509", "-in", str(cert_path), "-noout", "-enddate"], text=True).strip()
68 if not out.startswith("notAfter="):
69 return None
70 s = out[len("notAfter="):].strip()
71 dt = datetime.strptime(s, "%b %d %H:%M:%S %Y %Z")
72 return dt.replace(tzinfo=timezone.utc)
73 except Exception:
74 return None
76def needs_renew(cert_path: str | Path, renew_before_hours: int) -> bool:
77 """Check if a certificate needs renewal."""
78 if not os.path.exists(cert_path):
79 return True
80 end = parse_enddate(cert_path)
81 if not end:
82 return True
83 return end <= (datetime.now(timezone.utc) + timedelta(hours=renew_before_hours))
85def split_sans_value(v: str) -> list[str]:
86 """Split a comma-separated string into a list of SANs."""
87 if not v:
88 return []
89 parts = [p.strip() for p in v.split(",")]
90 return [p for p in parts if p]
92def add_domains(target_list: list[str], domains: list[str]):
93 """Add domains to a list, filtering empty ones and preserving order (deduplicating)."""
94 for d in domains:
95 d = d.strip()
96 if d and d not in target_list:
97 target_list.append(d)