Coverage for stepclient_traefik / utils.py: 28%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-29 02:37 +0000

1import os 

2import tempfile 

3import logging 

4from datetime import datetime, timezone, timedelta 

5import subprocess 

6from pathlib import Path 

7 

8logger = logging.getLogger(__name__) 

9 

10import shutil 

11import errno 

12 

13def atomic_write_text(path: str | Path, text: str, mode: int = 0o644): 

14 """Atomically write text to a file.""" 

15 path = Path(path) 

16 os.makedirs(path.parent, exist_ok=True) 

17 fd, tmp = tempfile.mkstemp(dir=path.parent) 

18 try: 

19 os.fchmod(fd, mode) 

20 with os.fdopen(fd, "w", encoding="utf-8") as f: 

21 f.write(text) 

22 f.flush() 

23 os.fsync(f.fileno()) 

24 try: 

25 os.replace(tmp, path) 

26 except OSError as e: 

27 # EBUSY (16) if target is a bind mount (e.g. docker) 

28 if e.errno == errno.EBUSY: 

29 shutil.copy2(tmp, path) 

30 else: 

31 raise 

32 finally: 

33 try: 

34 os.unlink(tmp) 

35 except FileNotFoundError: 

36 pass 

37 

38def atomic_write_bytes(path: str | Path, data: bytes, mode: int): 

39 """Atomically write bytes to a file.""" 

40 path = Path(path) 

41 os.makedirs(path.parent, exist_ok=True) 

42 fd, tmp = tempfile.mkstemp(dir=path.parent) 

43 try: 

44 os.fchmod(fd, mode) 

45 with os.fdopen(fd, "wb") as f: 

46 f.write(data) 

47 f.flush() 

48 os.fsync(f.fileno()) 

49 try: 

50 os.replace(tmp, path) 

51 except OSError as e: 

52 if e.errno == errno.EBUSY: 

53 shutil.copy2(tmp, path) 

54 else: 

55 raise 

56 finally: 

57 try: 

58 os.unlink(tmp) 

59 except FileNotFoundError: 

60 pass 

61 

62def parse_enddate(cert_path: str | Path) -> datetime | None: 

63 """Parse the enddate from a certificate file using openssl.""" 

64 try: 

65 if not os.path.exists(cert_path): 

66 return None 

67 out = subprocess.check_output(["openssl", "x509", "-in", str(cert_path), "-noout", "-enddate"], text=True).strip() 

68 if not out.startswith("notAfter="): 

69 return None 

70 s = out[len("notAfter="):].strip() 

71 dt = datetime.strptime(s, "%b %d %H:%M:%S %Y %Z") 

72 return dt.replace(tzinfo=timezone.utc) 

73 except Exception: 

74 return None 

75 

76def needs_renew(cert_path: str | Path, renew_before_hours: int) -> bool: 

77 """Check if a certificate needs renewal.""" 

78 if not os.path.exists(cert_path): 

79 return True 

80 end = parse_enddate(cert_path) 

81 if not end: 

82 return True 

83 return end <= (datetime.now(timezone.utc) + timedelta(hours=renew_before_hours)) 

84 

85def split_sans_value(v: str) -> list[str]: 

86 """Split a comma-separated string into a list of SANs.""" 

87 if not v: 

88 return [] 

89 parts = [p.strip() for p in v.split(",")] 

90 return [p for p in parts if p] 

91 

92def add_domains(target_list: list[str], domains: list[str]): 

93 """Add domains to a list, filtering empty ones and preserving order (deduplicating).""" 

94 for d in domains: 

95 d = d.strip() 

96 if d and d not in target_list: 

97 target_list.append(d)