autoconf - init annotations parser for k8s
This commit is contained in:
parent
22612f1757
commit
556836b499
@ -1,6 +1,5 @@
|
|||||||
from kubernetes import client, config, watch
|
from kubernetes import client, config, watch
|
||||||
|
from threading import Thread, Lock
|
||||||
import asyncio
|
|
||||||
|
|
||||||
class IngressController :
|
class IngressController :
|
||||||
|
|
||||||
@ -8,32 +7,68 @@ class IngressController :
|
|||||||
config.load_kube_config()
|
config.load_kube_config()
|
||||||
self.__api = client.CoreV1Api()
|
self.__api = client.CoreV1Api()
|
||||||
self.__extensions_api = client.ExtensionsV1beta1Api()
|
self.__extensions_api = client.ExtensionsV1beta1Api()
|
||||||
|
self.__lock = Lock()
|
||||||
|
self.__last_conf = {}
|
||||||
|
|
||||||
|
def __annotations_to_env(self, annotations, service=False) :
|
||||||
|
env = {}
|
||||||
|
prefix = ""
|
||||||
|
if service :
|
||||||
|
if not "bunkerized-nginx.SERVER_NAME" in annotations :
|
||||||
|
raise Exception("Missing bunkerized-nginx.SERVER_NAME annotation in Service.")
|
||||||
|
prefix = annotations["bunkerized-nginx.SERVER_NAME"].split(" ")[0] + "_"
|
||||||
|
for annotation in annotations :
|
||||||
|
if annotation.startswith("bunkerized-nginx.") and annotation.split(".")[1] != "" and annotation.split(".")[1] != "AUTOCONF" :
|
||||||
|
env[prefix + annotation.split(".")[1]] = annotations[annotation]
|
||||||
|
return env
|
||||||
|
|
||||||
|
def gen_conf(self) :
|
||||||
|
ingresses = self.get_ingresses()
|
||||||
|
services = self.get_services()
|
||||||
|
env = {}
|
||||||
|
for ingress in ingresses :
|
||||||
|
if ingress.metadata.annotations == None :
|
||||||
|
continue
|
||||||
|
if "bunkerized-nginx.AUTOCONF" in ingress.metadata.annotations :
|
||||||
|
env.update(self.__annotations_to_env(ingress.metadata.annotations))
|
||||||
|
for service in services :
|
||||||
|
if service.metadata.annotations == None :
|
||||||
|
continue
|
||||||
|
if "bunkerized-nginx.AUTOCONF" in service.metadata.annotations :
|
||||||
|
env.update(self.__annotations_to_env(service.metadata.annotations, service=True))
|
||||||
|
if self.__last_conf != env :
|
||||||
|
self.__last_conf = env
|
||||||
|
print("*** NEW CONF ***")
|
||||||
|
for k, v in env.items() :
|
||||||
|
print(k + " = " + v)
|
||||||
|
|
||||||
def get_ingresses(self) :
|
def get_ingresses(self) :
|
||||||
return self.__extensions_api.list_ingress_for_all_namespaces(watch=False)
|
return self.__extensions_api.list_ingress_for_all_namespaces(watch=False).items
|
||||||
|
|
||||||
def get_services(self) :
|
def get_services(self) :
|
||||||
return self.__api.list_service_for_all_namespaces(watch=False)
|
return self.__api.list_service_for_all_namespaces(watch=False).items
|
||||||
|
|
||||||
async def watch_ingress(self) :
|
def watch_ingress(self) :
|
||||||
print("ok ingress", flush=True)
|
|
||||||
w = watch.Watch()
|
w = watch.Watch()
|
||||||
for event in w.stream(self.__extensions_api.list_ingress_for_all_namespaces) :
|
for event in w.stream(self.__extensions_api.list_ingress_for_all_namespaces) :
|
||||||
print("*** NEW INGRESS EVENT ***", flush=True)
|
self.__lock.acquire()
|
||||||
for k, v in event.items() :
|
# print("*** NEW INGRESS EVENT ***", flush=True)
|
||||||
print(k + " :", flush=True)
|
# for k, v in event.items() :
|
||||||
print(v, flush=True)
|
# print(k + " :", flush=True)
|
||||||
await asyncio.sleep(0)
|
# print(v, flush=True)
|
||||||
|
self.gen_conf()
|
||||||
|
self.__lock.release()
|
||||||
|
|
||||||
async def watch_service(self) :
|
def watch_service(self) :
|
||||||
print("ok service", flush=True)
|
|
||||||
w = watch.Watch()
|
w = watch.Watch()
|
||||||
for event in w.stream(self.__api.list_service_for_all_namespaces) :
|
for event in w.stream(self.__api.list_service_for_all_namespaces) :
|
||||||
print("*** NEW SERVICE EVENT ***", flush=True)
|
self.__lock.acquire()
|
||||||
for k, v in event.items() :
|
self.gen_conf()
|
||||||
print(k + " :", flush=True)
|
# print("*** NEW SERVICE EVENT ***", flush=True)
|
||||||
print(v, flush=True)
|
# for k, v in event.items() :
|
||||||
await asyncio.sleep(0)
|
# print(k + " :", flush=True)
|
||||||
|
# print(v, flush=True)
|
||||||
|
self.__lock.release()
|
||||||
|
|
||||||
ic = IngressController()
|
ic = IngressController()
|
||||||
|
|
||||||
@ -44,11 +79,9 @@ print("*** SERVICES ***")
|
|||||||
print(ic.get_services())
|
print(ic.get_services())
|
||||||
|
|
||||||
print("*** LISTENING FOR EVENTS ***")
|
print("*** LISTENING FOR EVENTS ***")
|
||||||
|
t_ingress = Thread(target=ic.watch_service)
|
||||||
ioloop = asyncio.get_event_loop()
|
t_service = Thread(target=ic.watch_service)
|
||||||
print("ok1")
|
t_ingress.start()
|
||||||
ioloop.create_task(ic.watch_ingress())
|
t_service.start()
|
||||||
print("ok2")
|
t_ingress.join()
|
||||||
ioloop.create_task(ic.watch_service())
|
t_service.join()
|
||||||
print("ok3")
|
|
||||||
ioloop.run_forever()
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user