jobs - lock and reload management
This commit is contained in:
parent
26db144df4
commit
f44e41cede
@ -5,30 +5,29 @@ class ReloadServerHandler(socketserver.StreamRequestHandler):
|
|||||||
def handle(self) :
|
def handle(self) :
|
||||||
locked = False
|
locked = False
|
||||||
try :
|
try :
|
||||||
# Get lock order from client
|
|
||||||
data = self.request.recv(512)
|
|
||||||
if not data or data != b"lock" :
|
|
||||||
return
|
|
||||||
self.server.controller.lock.acquire()
|
|
||||||
locked = True
|
|
||||||
|
|
||||||
# Get reload order from client
|
|
||||||
data = self.request.recv(512)
|
|
||||||
if not data or data != b"reload" :
|
|
||||||
self.server.controller.lock.release()
|
|
||||||
return
|
|
||||||
if self.server.controller.reload() :
|
|
||||||
self.request.sendall(b"ok")
|
|
||||||
else :
|
|
||||||
self.request.sendall(b"ko")
|
|
||||||
|
|
||||||
# Release the lock
|
|
||||||
self.server.controller.lock.release()
|
|
||||||
|
|
||||||
|
while True :
|
||||||
|
data = self.request.recv(512)
|
||||||
|
if not data or not data in [b"lock", b"reload", b"unlock"] :
|
||||||
|
break
|
||||||
|
if data == b"lock" :
|
||||||
|
self.server.controller.lock.acquire()
|
||||||
|
locked = True
|
||||||
|
self.request.sendall(b"ok")
|
||||||
|
elif data == b"unlock" :
|
||||||
|
self.server.controller.lock.release()
|
||||||
|
locked = False
|
||||||
|
self.request.sendall(b"ok")
|
||||||
|
elif data == b"reload" :
|
||||||
|
ret = self.server.controller.reload() :
|
||||||
|
if ret :
|
||||||
|
self.request.sendall(b"ok")
|
||||||
|
else :
|
||||||
|
self.request.sendall(b"ko")
|
||||||
except Exception as e :
|
except Exception as e :
|
||||||
utils.log("Exception ReloadServer : " + str(e))
|
utils.log("Exception ReloadServer : " + str(e))
|
||||||
if locked :
|
if locked :
|
||||||
self.server.controller.lock.release()
|
self.server.controller.lock.release()
|
||||||
|
|
||||||
def run_reload_server(controller) :
|
def run_reload_server(controller) :
|
||||||
server = socketserver.UnixStreamServer("/tmp/autoconf.sock", ReloadServerHandler)
|
server = socketserver.UnixStreamServer("/tmp/autoconf.sock", ReloadServerHandler)
|
||||||
|
|||||||
52
jobs/Job.py
52
jobs/Job.py
@ -7,6 +7,58 @@ class JobRet(enum.Enum) :
|
|||||||
OK_RELOAD = 1
|
OK_RELOAD = 1
|
||||||
OK_NO_RELOAD = 2
|
OK_NO_RELOAD = 2
|
||||||
|
|
||||||
|
class ReloadRet(enum.Enum) :
|
||||||
|
KO = 0
|
||||||
|
OK = 1
|
||||||
|
NO = 2
|
||||||
|
|
||||||
|
class JobManagement() :
|
||||||
|
|
||||||
|
def __init__(self) :
|
||||||
|
self.__local_nginx = False
|
||||||
|
if os.path.isfile("/usr/sbin/nginx") and os.path.isfile("/tmp/nginx.pid") :
|
||||||
|
self.__local_nginx = True
|
||||||
|
self.__autoconf_socket = None
|
||||||
|
if os.path.exists("/tmp/autoconf.sock") and stat.S_ISSOCK(os.stat("/tmp/autoconf.sock")) :
|
||||||
|
self.__autoconf_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
self.__autoconf_socket.connect()
|
||||||
|
|
||||||
|
def __autoconf_order(self, order) :
|
||||||
|
self.__autoconf_socket.sendall(order)
|
||||||
|
data = self.__autoconf_socket.recv(512)
|
||||||
|
if not data or data != b"ok" :
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def lock(self) :
|
||||||
|
if self.__autoconf_socket != None :
|
||||||
|
return self.__autoconf_order(b"lock")
|
||||||
|
# TODO : local lock
|
||||||
|
return True
|
||||||
|
|
||||||
|
def unlock(self) :
|
||||||
|
if self.__autoconf_socket != None :
|
||||||
|
return self.__autoconf_order(b"unlock")
|
||||||
|
# TODO : local unlock
|
||||||
|
return True
|
||||||
|
|
||||||
|
def reload(self) :
|
||||||
|
if self.__autoconf_socket != None :
|
||||||
|
if self.__autoconf_order(b"reload") :
|
||||||
|
return ReloadRet.OK
|
||||||
|
return ReloadRet.KO
|
||||||
|
elif self.__local_nginx :
|
||||||
|
proc = subprocess.run(["/usr/sbin/nginx", "-s", "reload"], capture_output=True)
|
||||||
|
if proc.returncode != 0 :
|
||||||
|
log("reload", "ERROR", "can't reload nginx (status code = " + str(proc.returncode) + ")")
|
||||||
|
if len(proc.stdout.decode("ascii")) > 1 :
|
||||||
|
log("reload", "ERROR", proc.stdout.decode("ascii"))
|
||||||
|
if len(proc.stderr.decode("ascii")) > 1 :
|
||||||
|
log("reload", "ERROR", proc.stderr.decode("ascii"))
|
||||||
|
return ReloadRet.KO
|
||||||
|
return ReloadRet.OK
|
||||||
|
return ReloadRet.NO
|
||||||
|
|
||||||
class Job(abc.ABC) :
|
class Job(abc.ABC) :
|
||||||
|
|
||||||
def __init__(self, name, data, filename=None, redis_host=None, type="line", regex=r"^.+$", copy_cache=False) :
|
def __init__(self, name, data, filename=None, redis_host=None, type="line", regex=r"^.+$", copy_cache=False) :
|
||||||
|
|||||||
23
jobs/main.py
23
jobs/main.py
@ -5,9 +5,8 @@ import argparse, sys
|
|||||||
sys.path.append("/opt/bunkerized-nginx/jobs")
|
sys.path.append("/opt/bunkerized-nginx/jobs")
|
||||||
|
|
||||||
import Abusers, CertbotNew, CertbotRenew, ExitNodes, GeoIP, Proxies, Referrers, SelfSignedCert, UserAgents
|
import Abusers, CertbotNew, CertbotRenew, ExitNodes, GeoIP, Proxies, Referrers, SelfSignedCert, UserAgents
|
||||||
from Job import JobRet
|
from Job import JobRet, JobManagement, ReloadRet
|
||||||
|
|
||||||
from reload import reload
|
|
||||||
from logger import log
|
from logger import log
|
||||||
|
|
||||||
JOBS = {
|
JOBS = {
|
||||||
@ -45,6 +44,10 @@ if __name__ == "__main__" :
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
job = args.name
|
job = args.name
|
||||||
|
|
||||||
|
# Acquire the lock before
|
||||||
|
management = JobManagement()
|
||||||
|
management.lock()
|
||||||
|
|
||||||
# Run job
|
# Run job
|
||||||
log("job", "INFO", "executing job " + job)
|
log("job", "INFO", "executing job " + job)
|
||||||
ret = 0
|
ret = 0
|
||||||
@ -57,21 +60,29 @@ if __name__ == "__main__" :
|
|||||||
ret = instance.run()
|
ret = instance.run()
|
||||||
if ret == JobRet.KO :
|
if ret == JobRet.KO :
|
||||||
log("job", "ERROR", "error while running job " + job)
|
log("job", "ERROR", "error while running job " + job)
|
||||||
|
if reload_socket != None :
|
||||||
|
reload_socket.sendall(b"unlock")
|
||||||
|
reload_socket.recv(512)
|
||||||
|
reload_socket.close()
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
log("job", "INFO", "job " + job + " successfully executed")
|
log("job", "INFO", "job " + job + " successfully executed")
|
||||||
|
|
||||||
# Reload
|
# Reload
|
||||||
if ret == JobRet.OK_RELOAD and args.reload :
|
if ret == JobRet.OK_RELOAD and args.reload :
|
||||||
ret = reload()
|
ret = management.reload()
|
||||||
if ret == 0 :
|
if ret == ReloadRet.KO :
|
||||||
log("job", "ERROR", "error while doing reload operation (job = " + job + ")")
|
log("job", "ERROR", "error while doing reload operation (job = " + job + ")")
|
||||||
|
management.unlock()
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
elif ret == 1 :
|
elif ret == ReloadRet.OK :
|
||||||
log("job", "INFO", "reload operation successfully executed (job = " + job + ")")
|
log("job", "INFO", "reload operation successfully executed (job = " + job + ")")
|
||||||
elif ret == 2 :
|
elif ret == ReloadRet.NO :
|
||||||
log("job", "INFO", "skipped reload operation because nginx is not running (job = " + job + ")")
|
log("job", "INFO", "skipped reload operation because nginx is not running (job = " + job + ")")
|
||||||
else :
|
else :
|
||||||
log("job", "INFO", "skipped reload operation because it's not needed (job = " + job + ")")
|
log("job", "INFO", "skipped reload operation because it's not needed (job = " + job + ")")
|
||||||
|
|
||||||
|
# Release the lock
|
||||||
|
management.unlock()
|
||||||
|
|
||||||
# Done
|
# Done
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|||||||
@ -1,46 +1,15 @@
|
|||||||
import docker, subprocess, os, stat, sys, traceback
|
import sys, traceback
|
||||||
|
|
||||||
|
from Job import JobManagement, ReloadRet
|
||||||
from logger import log
|
from logger import log
|
||||||
|
|
||||||
def reload() :
|
|
||||||
|
|
||||||
# Linux or single Docker use case
|
|
||||||
if os.path.isfile("/usr/sbin/nginx") and os.path.isfile("/tmp/nginx.pid") :
|
|
||||||
proc = subprocess.run(["/usr/sbin/nginx", "-s", "reload"], capture_output=True)
|
|
||||||
if proc.returncode != 0 :
|
|
||||||
log("reload", "ERROR", "can't reload nginx (status code = " + str(proc.returncode) + ")")
|
|
||||||
if len(proc.stdout.decode("ascii")) > 1 :
|
|
||||||
log("reload", "ERROR", proc.stdout.decode("ascii"))
|
|
||||||
if len(proc.stderr.decode("ascii")) > 1 :
|
|
||||||
log("reload", "ERROR", proc.stderr.decode("ascii"))
|
|
||||||
return 0
|
|
||||||
return 1
|
|
||||||
|
|
||||||
# Autoconf case (Docker, Swarm and Ingress)
|
|
||||||
if os.path.exists("/tmp/autoconf.sock") and stat.S_ISSOCK(os.stat("/tmp/autoconf.sock")) :
|
|
||||||
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
||||||
client.connect("/tmp/autoconf.sock")
|
|
||||||
client.send("reload".encode("utf-8"))
|
|
||||||
data = client.recv(512)
|
|
||||||
client.close()
|
|
||||||
if not data or data.decode("utf-8") != "ok" :
|
|
||||||
log("reload", "ERROR", "can't reload nginx (data not ok)")
|
|
||||||
return 0
|
|
||||||
return 1
|
|
||||||
|
|
||||||
return 2
|
|
||||||
|
|
||||||
if __name__ == "__main__" :
|
if __name__ == "__main__" :
|
||||||
try :
|
try :
|
||||||
#print("[*] Starting reload operation ...")
|
management = JobManagement()
|
||||||
ret = reload()
|
ret = management.reload()
|
||||||
if ret == 0 :
|
if ret == ReloadRet.OK or ret == ReloadRet.NO :
|
||||||
sys.exit(1)
|
sys.exit(0)
|
||||||
#elif ret == 1 :
|
sys.exit(1)
|
||||||
#print("[*] Reload operation successfully executed")
|
|
||||||
#elif ret == 2 :
|
|
||||||
#print("[*] Skipped reload operation because nginx is not running")
|
|
||||||
sys.exit(0)
|
|
||||||
except :
|
except :
|
||||||
log("reload", "ERROR", "can't reload nginx (exception)")
|
log("reload", "ERROR", "can't reload nginx (exception)")
|
||||||
log("reload", "ERROR", traceback.format_exc())
|
log("reload", "ERROR", traceback.format_exc())
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user