autoconf - basic ingress controller support for kubernetes
This commit is contained in:
@@ -149,7 +149,14 @@ class Config :
|
||||
except :
|
||||
ret = False
|
||||
elif self.__type == Controller.Type.KUBERNETES :
|
||||
log("config", "ERROR", "TODO get urls for k8s")
|
||||
for instance in instances :
|
||||
name = instance.metadata.name
|
||||
try :
|
||||
dns_result = dns.resolver.query(name + ".default.svc.cluster.local")
|
||||
for ip in dns_result :
|
||||
urls.append("http://" + ip.to_text() + ":8080" + self.__api_uri + path)
|
||||
except :
|
||||
ret = False
|
||||
|
||||
for url in urls :
|
||||
req = None
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from kubernetes import client, config, watch
|
||||
from threading import Thread, Lock
|
||||
import time
|
||||
|
||||
import Controller
|
||||
|
||||
@@ -13,20 +14,35 @@ class IngressController(Controller.Controller) :
|
||||
self.__api = client.CoreV1Api()
|
||||
self.__extensions_api = client.ExtensionsV1beta1Api()
|
||||
self.__old_env = {}
|
||||
self.__internal_lock = Lock()
|
||||
|
||||
def __get_pods(self) :
|
||||
return self.__api.list_pod_for_all_namespaces(watch=False, label_selector="bunkerized-nginx").items
|
||||
|
||||
def __get_ingresses(self) :
|
||||
return self.__extensions_api.list_ingress_for_all_namespaces(watch=False).items
|
||||
return self.__extensions_api.list_ingress_for_all_namespaces(watch=False, label_selector="bunkerized-nginx").items
|
||||
|
||||
def __get_services(self) :
|
||||
return self.__api.list_service_for_all_namespaces(watch=False).items
|
||||
def __get_services(self, autoconf=False) :
|
||||
services = self.__api.list_service_for_all_namespaces(watch=False, label_selector="bunkerized-nginx").items
|
||||
if not autoconf :
|
||||
return services
|
||||
services_autoconf = []
|
||||
for service in services :
|
||||
if service.metadata.annotations != None and "bunkerized-nginx.AUTOCONF" in service.metadata.annotations :
|
||||
services_autoconf.append(service)
|
||||
return services_autoconf
|
||||
|
||||
def __annotations_to_env(self, annotations, service=False) :
|
||||
def __pod_to_env(self, pod_env) :
|
||||
env = {}
|
||||
prefix = ""
|
||||
if service :
|
||||
if not "bunkerized-nginx.SERVER_NAME" in annotations :
|
||||
raise Exception("Missing bunkerized-nginx.SERVER_NAME annotation in Service.")
|
||||
prefix = annotations["bunkerized-nginx.SERVER_NAME"].split(" ")[0] + "_"
|
||||
for env_var in pod_env :
|
||||
env[env_var.name] = env_var.value
|
||||
if env_var.value == None :
|
||||
env[env_var.name] = ""
|
||||
return env
|
||||
|
||||
def __annotations_to_env(self, annotations) :
|
||||
env = {}
|
||||
prefix = annotations["bunkerized-nginx.SERVER_NAME"].split(" ")[0] + "_"
|
||||
for annotation in annotations :
|
||||
if annotation.startswith("bunkerized-nginx.") and annotation.replace("bunkerized-nginx.", "", 1) != "" and annotation.replace("bunkerized-nginx.", "", 1) != "AUTOCONF" :
|
||||
env[prefix + annotation.replace("bunkerized-nginx.", "", 1)] = annotations[annotation]
|
||||
@@ -34,64 +50,124 @@ class IngressController(Controller.Controller) :
|
||||
|
||||
def __rules_to_env(self, rules) :
|
||||
env = {}
|
||||
first_servers = []
|
||||
for rule in rules :
|
||||
rule = rule.to_dict()
|
||||
prefix = ""
|
||||
if "host" in rule :
|
||||
prefix = rule["host"] + "_"
|
||||
first_servers.append(rule["host"])
|
||||
if not "http" in rule or not "paths" in rule["http"] :
|
||||
continue
|
||||
for path in rule["http"]["paths"] :
|
||||
env[prefix + "USE_REVERSE_PROXY"] = "yes"
|
||||
env[prefix + "REVERSE_PROXY_URL"] = path["path"]
|
||||
env[prefix + "REVERSE_PROXY_HOST"] = "http://" + path["backend"]["serviceName"] + ":" + str(path["backend"]["servicePort"])
|
||||
env[prefix + "REVERSE_PROXY_HOST"] = "http://" + path["backend"]["service_name"] + ":" + str(path["backend"]["service_port"])
|
||||
env["SERVER_NAME"] = " ".join(first_servers)
|
||||
return env
|
||||
|
||||
def get_env(self) :
|
||||
pods = self.__get_pods()
|
||||
ingresses = self.__get_ingresses()
|
||||
services = self.__get_services()
|
||||
env = {}
|
||||
first_servers = []
|
||||
for pod in pods :
|
||||
env.update(self.__pod_to_env(pod.spec.containers[0].env))
|
||||
if "SERVER_NAME" in env and env["SERVER_NAME"] != "" :
|
||||
first_servers.extend(env["SERVER_NAME"].split(" "))
|
||||
for ingress in ingresses :
|
||||
if ingress.metadata.annotations == None :
|
||||
continue
|
||||
if "bunkerized-nginx.AUTOCONF" in ingress.metadata.annotations :
|
||||
env.update(self.__annotations_to_env(ingress.metadata.annotations))
|
||||
env.update(self.__rules_to_env(ingress.spec.rules))
|
||||
env.update(self.__rules_to_env(ingress.spec.rules))
|
||||
if "SERVER_NAME" in env and env["SERVER_NAME"] != "" :
|
||||
first_servers.extend(env["SERVER_NAME"].split(" "))
|
||||
for service in services :
|
||||
if service.metadata.annotations == None :
|
||||
continue
|
||||
if "bunkerized-nginx.AUTOCONF" in service.metadata.annotations :
|
||||
env.update(self.__annotations_to_env(service.metadata.annotations, service=True))
|
||||
if service.metadata.annotations != None and "bunkerized-nginx.SERVER_NAME" in service.metadata.annotations :
|
||||
env.update(self.__annotations_to_env(service.metadata.annotations))
|
||||
first_servers.append(service.metadata.annotations["SERVER_NAME"])
|
||||
first_servers = list(dict.fromkeys(first_servers))
|
||||
if len(first_servers) == 0 :
|
||||
env["SERVER_NAME"] = ""
|
||||
else :
|
||||
env["SERVER_NAME"] = " ".join(first_servers)
|
||||
return self._fix_env(env)
|
||||
|
||||
def process_events(self, current_env) :
|
||||
self.__old_env = current_env
|
||||
t_pod = Thread(target=self.__watch_pod)
|
||||
t_ingress = Thread(target=self.__watch_ingress)
|
||||
t_service = Thread(target=self.__watch_service)
|
||||
t_pod.start()
|
||||
t_ingress.start()
|
||||
t_service.start()
|
||||
t_pod.join()
|
||||
t_ingress.join()
|
||||
t_service.join()
|
||||
|
||||
def __watch_ingress(self) :
|
||||
def __watch_pod(self) :
|
||||
w = watch.Watch()
|
||||
for event in w.stream(self.__extensions_api.list_ingress_for_all_namespaces) :
|
||||
for event in w.stream(self.__api.list_pod_for_all_namespaces, label_selector="bunkerized-nginx") :
|
||||
self.__internal_lock.acquire()
|
||||
new_env = self.get_env()
|
||||
if new_env != self.__old_env() :
|
||||
if self.gen_conf(new_env, lock=False) :
|
||||
if new_env != self.__old_env :
|
||||
if self.gen_conf(new_env) :
|
||||
self.__old_env = new_env.copy()
|
||||
log("CONTROLLER", "INFO", "successfully generated new configuration")
|
||||
if self.reload() :
|
||||
log("controller", "INFO", "successful reload")
|
||||
else :
|
||||
log("controller", "ERROR", "failed reload")
|
||||
self.__internal_lock.release()
|
||||
|
||||
def __watch_ingress(self) :
|
||||
w = watch.Watch()
|
||||
for event in w.stream(self.__extensions_api.list_ingress_for_all_namespaces, label_selector="bunkerized-nginx") :
|
||||
self.__internal_lock.acquire()
|
||||
new_env = self.get_env()
|
||||
if new_env != self.__old_env :
|
||||
if self.gen_conf(new_env) :
|
||||
self.__old_env = new_env.copy()
|
||||
log("CONTROLLER", "INFO", "successfully generated new configuration")
|
||||
if self.reload() :
|
||||
log("controller", "INFO", "successful reload")
|
||||
else :
|
||||
log("controller", "ERROR", "failed reload")
|
||||
self.__internal_lock.release()
|
||||
|
||||
def __watch_service(self) :
|
||||
w = watch.Watch()
|
||||
for event in w.stream(self.__api.list_service_for_all_namespaces) :
|
||||
for event in w.stream(self.__api.list_service_for_all_namespaces, label_selector="bunkerized-nginx") :
|
||||
self.__internal_lock.acquire()
|
||||
new_env = self.get_env()
|
||||
if new_env != self.__old_env() :
|
||||
if self.gen_conf(new_env, lock=False) :
|
||||
if new_env != self.__old_env :
|
||||
if self.gen_conf(new_env) :
|
||||
self.__old_env = new_env.copy()
|
||||
log("CONTROLLER", "INFO", "successfully generated new configuration")
|
||||
if self.reload() :
|
||||
log("controller", "INFO", "successful reload")
|
||||
else :
|
||||
log("controller", "ERROR", "failed reload")
|
||||
self.__internal_lock.release()
|
||||
|
||||
def reload(self) :
|
||||
return self._reload(self.__get_ingresses())
|
||||
return self._reload(self.__get_services(autoconf=True))
|
||||
|
||||
def wait(self) :
|
||||
return self._config.wait(self.__get_ingresses())
|
||||
# Wait for at least one bunkerized-nginx pod
|
||||
pods = self.__get_pods()
|
||||
while len(pods) == 0 :
|
||||
time.sleep(1)
|
||||
pods = self.__get_pods()
|
||||
|
||||
# Wait for at least one bunkerized-nginx service
|
||||
services = self.__get_services(autoconf=True)
|
||||
while len(services) == 0 :
|
||||
time.sleep(1)
|
||||
services = self.__get_services(autoconf=True)
|
||||
|
||||
# Generate first config
|
||||
env = self.get_env()
|
||||
if not self.gen_conf(env) :
|
||||
return False, env
|
||||
|
||||
# Wait for bunkerized-nginx
|
||||
return self._config.wait(services), env
|
||||
|
||||
Reference in New Issue
Block a user