Coverage for stepclient_traefik / discovery.py: 70%
93 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-29 02:37 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-29 02:37 +0000
1import logging
2import re
3import requests_unixsocket
4from typing import Any
5from .config import settings
6from .const import (
7 ROUTER_RULE_KEY, ROUTER_TLS_KEY, ROUTER_CERTRESOLVER_KEY,
8 TLS_DOMAINS_MAIN_KEY, TLS_DOMAINS_SANS_KEY,
9 HOST_CALL_RE, HOSTSNI_CALL_RE, HOSTREGEXP_CALL_RE,
10 BRACED_ALT_RE, ANCHORED_ALT_RE
11)
12from .utils import add_domains, split_sans_value
14logger = logging.getLogger(__name__)
16class DockerDiscovery:
17 def __init__(self):
18 self.session = requests_unixsocket.Session()
19 self.url = "http+unix://" + settings.docker_sock.replace("/", "%2F") + "/services"
21 def get_services(self) -> list[dict[str, Any]]:
22 try:
23 r = self.session.get(self.url, timeout=20)
24 r.raise_for_status()
25 return r.json()
26 except Exception as e:
27 logger.error(f"Failed to fetch docker services: {e}")
28 return []
30 def extract_routers(self) -> tuple[dict[str, dict], list[str]]:
31 """
32 Scan docker services and extract Traefik routers.
33 Returns (routers_dict, warnings_list)
34 """
35 routers: dict[str, dict] = {}
36 warnings = []
38 services = self.get_services()
40 for svc in services:
41 labels = (svc.get("Spec", {}) or {}).get("Labels", {}) or {}
42 if labels.get("traefik.enable") != "true":
43 continue
45 # Build router meta + tls domains from labels
46 for k, v in labels.items():
47 m = ROUTER_RULE_KEY.match(k)
48 if m:
49 routers.setdefault(m.group(1), {}).update({"rule": v})
50 continue
51 m = ROUTER_TLS_KEY.match(k)
52 if m:
53 routers.setdefault(m.group(1), {}).update({"tls": v})
54 continue
55 m = ROUTER_CERTRESOLVER_KEY.match(k)
56 if m:
57 routers.setdefault(m.group(1), {}).update({"certresolver": v})
58 continue
60 m = TLS_DOMAINS_MAIN_KEY.match(k)
61 if m:
62 r = routers.setdefault(m.group(1), {})
63 r.setdefault("tls_domains", [])
64 add_domains(r["tls_domains"], [v])
65 continue
67 m = TLS_DOMAINS_SANS_KEY.match(k)
68 if m:
69 r = routers.setdefault(m.group(1), {})
70 r.setdefault("tls_domains", [])
71 add_domains(r["tls_domains"], split_sans_value(v))
72 continue
74 return routers, warnings
76 def router_should_issue(self, router_meta: dict) -> bool:
77 if settings.ignore_if_certresolver_present and router_meta.get("certresolver"):
78 return False
79 if settings.issue_if_tls_true:
80 tls = (router_meta.get("tls") or "").lower()
81 if tls not in ("true", "1", "yes"):
82 return False
83 return True
85 def extract_from_rules(self, rule: str) -> tuple[list[str], list[str]]:
86 """
87 Returns (domains, warnings)
88 """
89 domains = []
90 warnings = []
92 if not rule:
93 return domains, warnings
95 # Host(`a`) and HostSNI(`a`, `b`)
96 for call_re in (HOST_CALL_RE, HOSTSNI_CALL_RE):
97 for content in call_re.findall(rule):
98 # Extract quoted domains: `example.com`
99 # We use a local regex for this since it's specific to this context
100 found = re.findall(r"`([^`]+)`", content)
101 add_domains(domains, found)
103 # HostRegexp(`...`) -> attempt limited expansion
104 for expr in HOSTREGEXP_CALL_RE.findall(rule):
105 expr = expr.strip()
107 # Example: `{sub:(repos|nexus)}.example.com`
108 m = BRACED_ALT_RE.match(expr)
109 if m:
110 alts = m.group(1).split("|")
111 suffix = m.group(2)
112 for a in alts:
113 a = a.strip()
114 if a:
115 add_domains(domains, [f"{a}{suffix}"])
116 continue
118 # Example: `^(repos|nexus)\.example\.com$`
119 m = ANCHORED_ALT_RE.match(expr)
120 if m:
121 alts = m.group(1).split("|")
122 base = f"{m.group(2)}.{m.group(3)}"
123 for a in alts:
124 a = a.strip()
125 if a:
126 add_domains(domains, [f"{a}.{base}"])
127 continue
129 # If it's not an enumerable pattern, we can't mint a cert for it
130 warnings.append(f"Unexpanded HostRegexp pattern: `{expr}` (cannot derive concrete hostnames)")
131 return domains, warnings