Coverage for stepclient_traefik / discovery.py: 70%

93 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-29 02:37 +0000

1import logging 

2import re 

3import requests_unixsocket 

4from typing import Any 

5from .config import settings 

6from .const import ( 

7 ROUTER_RULE_KEY, ROUTER_TLS_KEY, ROUTER_CERTRESOLVER_KEY, 

8 TLS_DOMAINS_MAIN_KEY, TLS_DOMAINS_SANS_KEY, 

9 HOST_CALL_RE, HOSTSNI_CALL_RE, HOSTREGEXP_CALL_RE, 

10 BRACED_ALT_RE, ANCHORED_ALT_RE 

11) 

12from .utils import add_domains, split_sans_value 

13 

14logger = logging.getLogger(__name__) 

15 

16class DockerDiscovery: 

17 def __init__(self): 

18 self.session = requests_unixsocket.Session() 

19 self.url = "http+unix://" + settings.docker_sock.replace("/", "%2F") + "/services" 

20 

21 def get_services(self) -> list[dict[str, Any]]: 

22 try: 

23 r = self.session.get(self.url, timeout=20) 

24 r.raise_for_status() 

25 return r.json() 

26 except Exception as e: 

27 logger.error(f"Failed to fetch docker services: {e}") 

28 return [] 

29 

30 def extract_routers(self) -> tuple[dict[str, dict], list[str]]: 

31 """ 

32 Scan docker services and extract Traefik routers. 

33 Returns (routers_dict, warnings_list) 

34 """ 

35 routers: dict[str, dict] = {} 

36 warnings = [] 

37 

38 services = self.get_services() 

39 

40 for svc in services: 

41 labels = (svc.get("Spec", {}) or {}).get("Labels", {}) or {} 

42 if labels.get("traefik.enable") != "true": 

43 continue 

44 

45 # Build router meta + tls domains from labels 

46 for k, v in labels.items(): 

47 m = ROUTER_RULE_KEY.match(k) 

48 if m: 

49 routers.setdefault(m.group(1), {}).update({"rule": v}) 

50 continue 

51 m = ROUTER_TLS_KEY.match(k) 

52 if m: 

53 routers.setdefault(m.group(1), {}).update({"tls": v}) 

54 continue 

55 m = ROUTER_CERTRESOLVER_KEY.match(k) 

56 if m: 

57 routers.setdefault(m.group(1), {}).update({"certresolver": v}) 

58 continue 

59 

60 m = TLS_DOMAINS_MAIN_KEY.match(k) 

61 if m: 

62 r = routers.setdefault(m.group(1), {}) 

63 r.setdefault("tls_domains", []) 

64 add_domains(r["tls_domains"], [v]) 

65 continue 

66 

67 m = TLS_DOMAINS_SANS_KEY.match(k) 

68 if m: 

69 r = routers.setdefault(m.group(1), {}) 

70 r.setdefault("tls_domains", []) 

71 add_domains(r["tls_domains"], split_sans_value(v)) 

72 continue 

73 

74 return routers, warnings 

75 

76 def router_should_issue(self, router_meta: dict) -> bool: 

77 if settings.ignore_if_certresolver_present and router_meta.get("certresolver"): 

78 return False 

79 if settings.issue_if_tls_true: 

80 tls = (router_meta.get("tls") or "").lower() 

81 if tls not in ("true", "1", "yes"): 

82 return False 

83 return True 

84 

85 def extract_from_rules(self, rule: str) -> tuple[list[str], list[str]]: 

86 """ 

87 Returns (domains, warnings) 

88 """ 

89 domains = [] 

90 warnings = [] 

91 

92 if not rule: 

93 return domains, warnings 

94 

95 # Host(`a`) and HostSNI(`a`, `b`) 

96 for call_re in (HOST_CALL_RE, HOSTSNI_CALL_RE): 

97 for content in call_re.findall(rule): 

98 # Extract quoted domains: `example.com` 

99 # We use a local regex for this since it's specific to this context 

100 found = re.findall(r"`([^`]+)`", content) 

101 add_domains(domains, found) 

102 

103 # HostRegexp(`...`) -> attempt limited expansion 

104 for expr in HOSTREGEXP_CALL_RE.findall(rule): 

105 expr = expr.strip() 

106 

107 # Example: `{sub:(repos|nexus)}.example.com` 

108 m = BRACED_ALT_RE.match(expr) 

109 if m: 

110 alts = m.group(1).split("|") 

111 suffix = m.group(2) 

112 for a in alts: 

113 a = a.strip() 

114 if a: 

115 add_domains(domains, [f"{a}{suffix}"]) 

116 continue 

117 

118 # Example: `^(repos|nexus)\.example\.com$` 

119 m = ANCHORED_ALT_RE.match(expr) 

120 if m: 

121 alts = m.group(1).split("|") 

122 base = f"{m.group(2)}.{m.group(3)}" 

123 for a in alts: 

124 a = a.strip() 

125 if a: 

126 add_domains(domains, [f"{a}.{base}"]) 

127 continue 

128 

129 # If it's not an enumerable pattern, we can't mint a cert for it 

130 warnings.append(f"Unexpanded HostRegexp pattern: `{expr}` (cannot derive concrete hostnames)") 

131 return domains, warnings