import os from shutil import rmtree, copytree, chown from logging import getLogger, INFO, ERROR, StreamHandler, Formatter from flask import ( Flask, flash, jsonify, redirect, render_template, request, send_file, url_for, ) from flask_login import LoginManager, login_required, login_user, logout_user from flask_wtf.csrf import CSRFProtect, CSRFError from json import JSONDecodeError, load as json_load from bs4 import BeautifulSoup from datetime import datetime, timezone from dateutil.parser import parse as dateutil_parse from requests import get from requests.utils import default_headers from sys import path as sys_path, exit as sys_exit from copy import deepcopy from docker import DockerClient from docker.errors import ( NotFound as docker_NotFound, APIError as docker_APIError, DockerException, ) from uuid import uuid4 from time import time import tarfile import zipfile from ui.src.ConfigFiles import ConfigFiles from ui.src.Config import Config from ui.src.ReverseProxied import ReverseProxied from ui.src.User import User from ui.utils import ( check_settings, env_to_summary_class, form_plugin_gen, form_service_gen, form_service_gen_multiple, form_service_gen_multiple_values, gen_folders_tree_html, get_variables, path_to_dict, ) sys_path.append("/opt/bunkerweb/utils") from ui.src.Instances import Instances from api.API import API from utils.ApiCaller import ApiCaller # Set up logger logger = getLogger("flask_app") logger.setLevel(INFO) # create console handler with a higher log level ch = StreamHandler() ch.setLevel(ERROR) # create formatter and add it to the handlers formatter = Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) # add the handlers to logger logger.addHandler(ch) # Flask app app = Flask( __name__, static_url_path="/", static_folder="static", template_folder="templates", ) app.wsgi_app = ReverseProxied(app.wsgi_app) # Set variables and instantiate objects vars = get_variables() if not vars["FLASK_ENV"] == "development" and vars["ADMIN_PASSWORD"] == "changeme": logger.error("Please change the default admin password.") sys_exit(1) if not vars["FLASK_ENV"] == "development" and ( vars["ABSOLUTE_URI"].endswith("/changeme/") or vars["ABSOLUTE_URI"].endswith("/changeme") ): logger.error("Please change the default URL.") sys_exit(1) with open("/opt/bunkerweb/tmp/ui.pid", "w") as f: f.write(str(os.getpid())) login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = "login" user = User(vars["ADMIN_USERNAME"], vars["ADMIN_PASSWORD"]) api_caller = ApiCaller() PLUGIN_KEYS = [ "id", "order", "name", "description", "version", "settings", ] try: docker_client: DockerClient = DockerClient(base_url=vars["DOCKER_HOST"]) except (docker_APIError, DockerException): docker_client = None if docker_client: apis: list[API] = [] for container in docker_client.containers.list(filters={"label": "bunkerweb.UI"}): env_variables = { x[0]: x[1] for x in [env.split("=") for env in container.attrs["Config"]["Env"]] } apis.append( API( f"http://{container.name}:{env_variables.get('API_HTTP_PORT', '5000')}", env_variables.get("API_SERVER_NAME", "bwapi"), ) ) api_caller._set_apis(apis) try: app.config.update( DEBUG=True, SECRET_KEY=vars["FLASK_SECRET"], ABSOLUTE_URI=vars["ABSOLUTE_URI"], INSTANCES=Instances(docker_client), CONFIG=Config(), CONFIGFILES=ConfigFiles(), SESSION_COOKIE_DOMAIN=vars["ABSOLUTE_URI"] .replace("http://", "") .replace("https://", "") .split("/")[0], WTF_CSRF_SSL_STRICT=False, USER=user, SEND_FILE_MAX_AGE_DEFAULT=86400, ) except FileNotFoundError as e: logger.error(repr(e), e.filename) sys_exit(1) # Declare functions for jinja2 app.jinja_env.globals.update(env_to_summary_class=env_to_summary_class) app.jinja_env.globals.update(form_plugin_gen=form_plugin_gen) app.jinja_env.globals.update(form_service_gen=form_service_gen) app.jinja_env.globals.update(form_service_gen_multiple=form_service_gen_multiple) app.jinja_env.globals.update( form_service_gen_multiple_values=form_service_gen_multiple_values ) app.jinja_env.globals.update(gen_folders_tree_html=gen_folders_tree_html) app.jinja_env.globals.update(check_settings=check_settings) @login_manager.user_loader def load_user(user_id): return User(user_id, vars["ADMIN_PASSWORD"]) # CSRF protection csrf = CSRFProtect() csrf.init_app(app) @app.errorhandler(CSRFError) def handle_csrf_error(e): """ It takes a CSRFError exception as an argument, and returns a Flask response :param e: The exception object :return: A template with the error message and a 401 status code. """ logout_user() flash("Wrong CSRF token !", "error") return render_template("login.html"), 403 @app.route("/") def index(): return redirect(url_for("login")) @app.route("/loading") @login_required def loading(): next_url = request.values.get("next") return render_template("loading.html", next=next_url) @app.route("/home") @login_required def home(): """ It returns the home page :return: The home.html template is being rendered with the following variables: check_version: a boolean indicating whether the local version is the same as the remote version remote_version: the remote version version: the local version instances_number: the number of instances services_number: the number of services posts: a list of posts """ r = get( "https://raw.githubusercontent.com/bunkerity/bunkerweb/master/VERSION", ) remote_version = None if r.status_code == 200: remote_version = r.text.strip() with open("/opt/bunkerweb/VERSION", "r") as f: version = f.read().strip() headers = default_headers() headers.update({"User-Agent": "bunkerweb-ui"}) r = get( "https://www.bunkerity.com/wp-json/wp/v2/posts", headers=headers, ) formatted_posts = None if r.status_code == 200: posts = r.json() formatted_posts = [] for post in posts[:5]: formatted_posts.append( { "link": post["link"], "title": post["title"]["rendered"], "description": BeautifulSoup( post["content"]["rendered"][ post["content"]["rendered"].index("") + 4 : post["content"]["rendered"].index("") ], features="html.parser", ).get_text()[:256] + ("..." if len(post["content"]["rendered"]) > 256 else ""), "date": dateutil_parse(post["date"]).strftime("%B %d, %Y"), "image_url": post["yoast_head_json"]["og_image"][0]["url"].replace( "wwwdev", "www" ), "reading_time": post["yoast_head_json"]["twitter_misc"][ "Est. reading time" ], } ) instances_number = len(app.config["INSTANCES"].get_instances()) services_number = len(app.config["CONFIG"].get_services()) return render_template( "home.html", check_version=not remote_version or version == remote_version, remote_version=remote_version, version=version, instances_number=instances_number, services_number=services_number, posts=formatted_posts, ) @app.route("/instances", methods=["GET", "POST"]) @login_required def instances(): # Manage instances if request.method == "POST": # Check operation if not "operation" in request.form or not request.form["operation"] in [ "reload", "start", "stop", "restart", ]: flash("Missing operation parameter on /instances.", "error") return redirect(url_for("loading", next=url_for("instances"))) # Check that all fields are present if not "INSTANCE_ID" in request.form: flash("Missing INSTANCE_ID parameter.", "error") return redirect(url_for("loading", next=url_for("instances"))) # Do the operation if request.form["operation"] == "reload": operation = app.config["INSTANCES"].reload_instance( request.form["INSTANCE_ID"] ) elif request.form["operation"] == "start": operation = app.config["INSTANCES"].start_instance( request.form["INSTANCE_ID"] ) elif request.form["operation"] == "stop": operation = app.config["INSTANCES"].stop_instance( request.form["INSTANCE_ID"] ) elif request.form["operation"] == "restart": operation = app.config["INSTANCES"].restart_instance( request.form["INSTANCE_ID"] ) if operation.startswith("Can't"): flash(operation, "error") else: flash(operation) return redirect(url_for("loading", next=url_for("instances"))) # Display instances instances = app.config["INSTANCES"].get_instances() return render_template("instances.html", title="Instances", instances=instances) @app.route("/services", methods=["GET", "POST"]) @login_required def services(): if request.method == "POST": # Check operation if not "operation" in request.form or not request.form["operation"] in [ "new", "edit", "delete", ]: flash("Missing operation parameter on /services.", "error") return redirect(url_for("loading", next=url_for("services"))) # Check variables variables = deepcopy(request.form.to_dict()) del variables["csrf_token"] if ( not "OLD_SERVER_NAME" in request.form and request.form["operation"] == "edit" ): flash("Missing OLD_SERVER_NAME parameter.", "error") return redirect(url_for("loading", next=url_for("services"))) if request.form["operation"] in ("new", "edit"): del variables["operation"] if request.form["operation"] == "edit": del variables["OLD_SERVER_NAME"] # Edit check fields and remove already existing ones config = app.config["CONFIG"].get_config() for variable in deepcopy(variables): if variables[variable] == "on": variables[variable] = "yes" elif variables[variable] == "off": variables[variable] = "no" if ( request.form["operation"] == "edit" and variable != "SERVER_NAME" and variables[variable] == config.get(variable, None) or not variables[variable].strip() ): del variables[variable] if not variables: flash( f"{variables['SERVER_NAME'].split(' ')[0]} was not edited because no values were changed." ) return redirect(url_for("loading", next=url_for("services"))) error = app.config["CONFIG"].check_variables(variables) if error: return redirect(url_for("loading", next=url_for("services"))) # Delete elif request.form["operation"] == "delete": if not "SERVER_NAME" in request.form: flash("Missing SERVER_NAME parameter.", "error") return redirect(url_for("loading", next=url_for("services"))) error = app.config["CONFIG"].check_variables( {"SERVER_NAME": request.form["SERVER_NAME"]} ) if error: return redirect(url_for("loading", next=url_for("services"))) error = 0 # Do the operation if request.form["operation"] == "new": operation, error = app.config["CONFIG"].new_service(variables) elif request.form["operation"] == "edit": operation = app.config["CONFIG"].edit_service( request.form["OLD_SERVER_NAME"], variables ) elif request.form["operation"] == "delete": operation, error = app.config["CONFIG"].delete_service( request.form["SERVER_NAME"] ) if error: flash(operation, "error") return redirect(url_for("loading", next=url_for("services"))) flash(operation) # Reload instances _reloads = app.config["INSTANCES"].reload_instances() if not _reloads: for _reload in _reloads: flash(f"Reload failed for the instance {_reload}", "error") else: flash("Successfully reloaded instances") return redirect(url_for("loading", next=url_for("services"))) # Display services services = app.config["CONFIG"].get_services() return render_template("services.html", services=services) @app.route("/global_config", methods=["GET", "POST"]) @login_required def global_config(): if request.method == "POST": # Check variables variables = deepcopy(request.form.to_dict()) del variables["csrf_token"] # Edit check fields and remove already existing ones config = app.config["CONFIG"].get_config() for variable in deepcopy(variables): if variables[variable] == "on": variables[variable] = "yes" elif variables[variable] == "off": variables[variable] = "no" if ( variables[variable] == config.get(variable, None) or not variables[variable].strip() ): del variables[variable] if not variables: flash( f"The global configuration was not edited because no values were changed." ) return redirect(url_for("loading", next=url_for("global_config"))) error = app.config["CONFIG"].check_variables(variables, True) if error: return redirect(url_for("loading", next=url_for("global_config"))) error = 0 # Do the operation operation = app.config["CONFIG"].edit_global_conf(variables) if error: flash(operation, "error") return redirect(url_for("loading", next=url_for("global_config"))) flash(operation) # Reload instances _reloads = app.config["INSTANCES"].reload_instances() if not _reloads: for _reload in _reloads: flash(f"Reload failed for the instance {_reload}", "error") else: flash("Successfully reloaded instances") return redirect(url_for("loading", next=url_for("global_config"))) # Display services services = app.config["CONFIG"].get_services() return render_template("global_config.html", services=services) @app.route("/configs", methods=["GET", "POST"]) @login_required def configs(): if request.method == "POST": operation = "" # Check operation if not "operation" in request.form or not request.form["operation"] in [ "new", "edit", "delete", ]: flash("Missing operation parameter on /configs.", "error") return redirect(url_for("loading", next=url_for("configs"))) # Check variables variables = deepcopy(request.form.to_dict()) del variables["csrf_token"] operation = app.config["CONFIGFILES"].check_path(variables["path"]) if operation: flash(operation, "error") return redirect(url_for("loading", next=url_for("configs"))), 500 if request.form["operation"] in ("new", "edit"): if not app.config["CONFIGFILES"].check_name(variables["name"]): flash( f"Invalid {variables['type']} name. (Can only contain numbers, letters, underscores and hyphens (min 4 characters and max 32))", "error", ) return redirect(url_for("loading", next=url_for("configs"))) if variables["type"] == "file": variables["name"] = f"{variables['name']}.conf" variables["content"] = BeautifulSoup( variables["content"], "html.parser" ).get_text() if request.form["operation"] == "new": if variables["type"] == "folder": operation, error = app.config["CONFIGFILES"].create_folder( variables["path"], variables["name"] ) elif variables["type"] == "file": operation, error = app.config["CONFIGFILES"].create_file( variables["path"], variables["name"], variables["content"] ) elif request.form["operation"] == "edit": if variables["type"] == "folder": operation, error = app.config["CONFIGFILES"].edit_folder( variables["path"], variables["name"] ) elif variables["type"] == "file": operation, error = app.config["CONFIGFILES"].edit_file( variables["path"], variables["name"], variables["content"] ) if error: flash(operation, "error") return redirect(url_for("loading", next=url_for("configs"))) else: operation, error = app.config["CONFIGFILES"].delete_path(variables["path"]) if error: flash(operation, "error") return redirect(url_for("loading", next=url_for("configs"))) flash(operation) # Reload instances _reloads = app.config["INSTANCES"].reload_instances() if not _reloads: for _reload in _reloads: flash(f"Reload failed for the instance {_reload}", "error") else: flash("Successfully reloaded instances") return redirect(url_for("loading", next=url_for("configs"))) return render_template( "configs.html", folders=[path_to_dict("/opt/bunkerweb/configs")] ) @app.route("/plugins", methods=["GET", "POST"]) @login_required def plugins(): if request.method == "POST": operation = "" error = 0 if "operation" in request.form and request.form["operation"] == "delete": # Check variables variables = deepcopy(request.form.to_dict()) del variables["csrf_token"] operation = app.config["CONFIGFILES"].check_path( variables["path"], "/opt/bunkerweb/plugins/" ) if operation: flash(operation, "error") return redirect(url_for("loading", next=url_for("plugins"))), 500 operation, error = app.config["CONFIGFILES"].delete_path(variables["path"]) if error: flash(operation, "error") return redirect(url_for("loading", next=url_for("plugins"))) else: if not os.path.exists("/opt/bunkerweb/tmp/ui") or not os.listdir( "/opt/bunkerweb/tmp/ui" ): flash("Please upload new plugins to reload plugins", "error") return redirect(url_for("loading", next=url_for("plugins"))) for file in os.listdir("/opt/bunkerweb/tmp/ui"): if not os.path.isfile(f"/opt/bunkerweb/tmp/ui/{file}"): continue folder_name = "" temp_folder_name = file.split(".")[0] try: if file.endswith(".zip"): try: with zipfile.ZipFile( f"/opt/bunkerweb/tmp/ui/{file}" ) as zip_file: try: zip_file.getinfo("plugin.json") zip_file.extractall( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}" ) with open( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/plugin.json", "r", ) as f: plugin_file = json_load(f) if not all( key in plugin_file.keys() for key in PLUGIN_KEYS ): raise ValueError folder_name = plugin_file["id"] if not app.config["CONFIGFILES"].check_name( folder_name ): error = 1 flash( f"Invalid plugin name for {temp_folder_name}. (Can only contain numbers, letters, underscores and hyphens (min 4 characters and max 32))", "error", ) raise Exception if os.path.exists( f"/opt/bunkerweb/plugins/{folder_name}" ): raise FileExistsError copytree( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}", f"/opt/bunkerweb/plugins/{folder_name}", ) except KeyError: zip_file.extractall( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}" ) dirs = [ d for d in os.listdir( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}" ) if os.path.isdir( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{d}" ) ] if ( not dirs or len(dirs) > 1 or not os.path.exists( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{dirs[0]}/plugin.json" ) ): raise KeyError with open( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{dirs[0]}/plugin.json", "r", ) as f: plugin_file = json_load(f) if not all( key in plugin_file.keys() for key in PLUGIN_KEYS ): raise ValueError folder_name = plugin_file["id"] if not app.config["CONFIGFILES"].check_name( folder_name ): error = 1 flash( f"Invalid plugin name for {temp_folder_name}. (Can only contain numbers, letters, underscores and hyphens (min 4 characters and max 32))", "error", ) raise Exception if os.path.exists( f"/opt/bunkerweb/plugins/{folder_name}" ): raise FileExistsError copytree( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{dirs[0]}", f"/opt/bunkerweb/plugins/{folder_name}", ) except zipfile.BadZipFile: error = 1 flash( f"{file} is not a valid zip file. ({folder_name if folder_name else temp_folder_name})", "error", ) else: try: with tarfile.open( f"/opt/bunkerweb/tmp/ui/{file}", errorlevel=2, ) as tar_file: try: tar_file.getmember("plugin.json") tar_file.extractall( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}" ) with open( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/plugin.json", "r", ) as f: plugin_file = json_load(f) if not all( key in plugin_file.keys() for key in PLUGIN_KEYS ): raise ValueError folder_name = plugin_file["id"] if not app.config["CONFIGFILES"].check_name( folder_name ): error = 1 flash( f"Invalid plugin name for {temp_folder_name}. (Can only contain numbers, letters, underscores and hyphens (min 4 characters and max 32))", "error", ) raise Exception if os.path.exists( f"/opt/bunkerweb/plugins/{folder_name}" ): raise FileExistsError copytree( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}", f"/opt/bunkerweb/plugins/{folder_name}", ) except KeyError: tar_file.extractall( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}", ) dirs = [ d for d in os.listdir( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}" ) if os.path.isdir( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{d}" ) ] if ( not dirs or len(dirs) > 1 or not os.path.exists( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{dirs[0]}/plugin.json" ) ): raise KeyError with open( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{dirs[0]}/plugin.json", "r", ) as f: plugin_file = json_load(f) if not all( key in plugin_file.keys() for key in PLUGIN_KEYS ): raise ValueError folder_name = plugin_file["id"] if not app.config["CONFIGFILES"].check_name( folder_name ): error = 1 flash( f"Invalid plugin name for {temp_folder_name}. (Can only contain numbers, letters, underscores and hyphens (min 4 characters and max 32))", "error", ) raise Exception if os.path.exists( f"/opt/bunkerweb/plugins/{folder_name}" ): raise FileExistsError copytree( f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{dirs[0]}", f"/opt/bunkerweb/plugins/{folder_name}", ) except tarfile.ReadError: error = 1 flash( f"Couldn't read file {file} ({folder_name if folder_name else temp_folder_name})", "error", ) except tarfile.CompressionError: error = 1 flash( f"{file} is not a valid tar file ({folder_name if folder_name else temp_folder_name})", "error", ) except tarfile.HeaderError: error = 1 flash( f"The file plugin.json in {file} is not valid ({folder_name if folder_name else temp_folder_name})", "error", ) except KeyError: error = 1 flash( f"{file} is not a valid plugin (plugin.json file is missing) ({folder_name if folder_name else temp_folder_name})", "error", ) except JSONDecodeError as e: error = 1 flash( f"The file plugin.json in {file} is not valid ({e.msg}: line {e.lineno} column {e.colno} (char {e.pos})) ({folder_name if folder_name else temp_folder_name})", "error", ) except ValueError: error = 1 flash( f"The file plugin.json is missing one or more of the following keys: {', '.join(PLUGIN_KEYS)} ({folder_name if folder_name else temp_folder_name})", "error", ) except FileExistsError: error = 1 flash( f"A plugin named {folder_name} already exists", "error", ) except (tarfile.TarError, OSError) as e: error = 1 flash(f"{e}", "error") except Exception: pass finally: if error != 1: flash( f"Successfully created plugin: {folder_name}" ) error = 0 for root, dirs, files in os.walk("/opt/bunkerweb/plugins", topdown=False): for name in files + dirs: chown(os.path.join(root, name), "nginx", "nginx") os.chmod(os.path.join(root, name), 0o770) app.config["CONFIG"].reload_config() if operation: flash(operation) # Reload instances _reloads = app.config["INSTANCES"].reload_instances() if not _reloads: for _reload in _reloads: flash(f"Reload failed for the instance {_reload}", "error") else: flash("Successfully reloaded instances") if os.path.exists("/opt/bunkerweb/tmp/ui"): try: rmtree("/opt/bunkerweb/tmp/ui") except OSError: pass app.config["CONFIG"].reload_plugins() return redirect(url_for("loading", next=url_for("plugins"))) plugins = [ { "name": "plugins", "type": "folder", "path": "/opt/bunkerweb/plugins", "can_create_files": False, "can_create_folders": False, "can_edit": False, "can_delete": False, "children": [ { "name": _dir, "type": "folder", "path": f"/opt/bunkerweb/plugins/{_dir}", "can_create_files": False, "can_create_folders": False, "can_edit": False, "can_delete": True, } for _dir in os.listdir("/opt/bunkerweb/plugins") ], } ] return render_template("plugins.html", folders=plugins) @app.route("/plugins/upload", methods=["POST"]) @login_required def upload_plugin(): if not request.files: return {"status": "ko"}, 400 if not os.path.exists("/opt/bunkerweb/tmp/ui"): os.mkdir("/opt/bunkerweb/tmp/ui") for file in request.files.values(): if not file.filename.endswith((".zip", ".tar.gz", ".tar.xz")): return {"status": "ko"}, 422 with open( f"/opt/bunkerweb/tmp/ui/{uuid4()}{file.filename[file.filename.index('.'):]}", "wb", ) as f: f.write(file.read()) return {"status": "ok"}, 201 @app.route("/cache", methods=["GET"]) @login_required def cache(): return render_template( "cache.html", folders=[path_to_dict("/opt/bunkerweb/cache", is_cache=True)] ) @app.route("/cache/download", methods=["GET"]) @login_required def cache_download(): path = request.args.get("path") if not path: return redirect(url_for("loading", next=url_for("cache"))), 400 operation = app.config["CONFIGFILES"].check_path(path, "/opt/bunkerweb/cache/") if operation: flash(operation, "error") return redirect(url_for("loading", next=url_for("plugins"))), 500 return send_file(path, as_attachment=True) @app.route("/logs", methods=["GET"]) @login_required def logs(): instances = app.config["INSTANCES"].get_instances() first_instance = instances[0] if instances else None return render_template( "logs.html", first_instance=first_instance, instances=instances ) @app.route("/logs/local", methods=["GET"]) @login_required def logs_linux(): if not os.path.exists("/usr/sbin/nginx"): return ( jsonify( { "status": "ko", "message": "There are no linux instances running", } ), 404, ) last_update = request.args.get("last_update") raw_logs_access = [] raw_logs_error = [] if last_update: if os.path.exists("/var/log/nginx/error.log"): with open("/var/log/nginx/error.log", "r") as f: raw_logs_error = f.read().splitlines()[int(last_update.split(".")[0]) :] if os.path.exists("/var/log/nginx/access.log"): with open("/var/log/nginx/access.log", "r") as f: raw_logs_access = f.read().splitlines()[ int(last_update.split(".")[1]) : ] else: if os.path.exists("/var/log/nginx/error.log"): with open("/var/log/nginx/error.log", "r") as f: raw_logs_error = f.read().splitlines() if os.path.exists("/var/log/nginx/access.log"): with open("/var/log/nginx/access.log", "r") as f: raw_logs_access = f.read().splitlines() logs_error = [] temp_multiple_lines = [] NGINX_LOG_LEVELS = [ "debug", "notice", "info", "warn", "error", "crit", "alert", "emerg", ] for line in raw_logs_error: line_lower = line.lower() if "[info]" in line.lower() and line.endswith(":") or "[error]" in line.lower(): if temp_multiple_lines: logs_error.append("\n".join(temp_multiple_lines)) temp_multiple_lines = [ f"{datetime.strptime(' '.join(line.strip().split(' ')[0:2]), '%Y/%m/%d %H:%M:%S').replace(tzinfo=timezone.utc).timestamp()} {line}" ] elif ( all(f"[{log_level}]" not in line_lower for log_level in NGINX_LOG_LEVELS) and temp_multiple_lines ): temp_multiple_lines.append(line) else: logs_error.append( f"{datetime.strptime(' '.join(line.strip().split(' ')[0:2]), '%Y/%m/%d %H:%M:%S').replace(tzinfo=timezone.utc).timestamp()} {line}" ) if temp_multiple_lines: logs_error.append("\n".join(temp_multiple_lines)) logs_access = [ f"{datetime.strptime(line[line.find('[') + 1: line.find(']')], '%d/%b/%Y:%H:%M:%S %z').timestamp()} {line}" for line in raw_logs_access ] raw_logs = logs_error + logs_access raw_logs.sort( key=lambda x: float(x.split(" ")[0]) if x.split(" ")[0].isdigit() else 0 ) logs = [] for log in raw_logs: log_lower = log.lower() error_type = ( "error" if "[error]" in log_lower or "[crit]" in log_lower or "[alert]" in log_lower or "❌" in log_lower else ( "warn" if "[warn]" in log_lower else ("info" if "[info]" in log_lower else "message") ) ) if "\n" in log: splitted_one_line = log.split("\n") logs.append( { "content": " ".join( splitted_one_line.pop(0).strip().split(" ")[1:] ), "type": error_type, "separator": True, } ) for splitted_log in splitted_one_line: logs.append( { "content": splitted_log, "type": error_type, } ) else: logs.append( { "content": " ".join(log.strip().split(" ")[1:]), "type": error_type, } ) count_error_logs = 0 for log in logs_error: if "\n" in log: for _ in log.split("\n"): count_error_logs += 1 else: count_error_logs += 1 return jsonify( { "logs": logs, "last_update": f"{count_error_logs + int(last_update.split('.')[0])}.{len(logs_access) + int(last_update.split('.')[1])}" if last_update else f"{count_error_logs}.{len(logs_access)}", } ) @app.route("/logs/", methods=["GET"]) @login_required def logs_container(container_id): last_update = request.args.get("last_update") logs = [] if docker_client: try: if last_update: if not last_update.isdigit(): return ( jsonify( { "status": "ko", "message": "last_update must be an integer", } ), 422, ) docker_logs = docker_client.containers.get(container_id).logs( stdout=True, stderr=True, since=datetime.fromtimestamp(int(last_update)), ) else: docker_logs = docker_client.containers.get(container_id).logs( stdout=True, stderr=True, ) for log in docker_logs.decode("utf-8", errors="replace").split("\n")[0:-1]: log_lower = log.lower() logs.append( { "content": log, "type": "error" if "[error]" in log_lower or "[crit]" in log_lower or "[alert]" in log_lower or "❌" in log_lower else ( "warn" if "[warn]" in log_lower else ("info" if "[info]" in log_lower else "message") ), } ) except docker_NotFound: return ( jsonify( { "status": "ko", "message": f"Container with ID {container_id} not found!", } ), 404, ) return jsonify({"logs": logs, "last_update": int(time())}) @app.route("/login", methods=["GET", "POST"]) def login(): fail = False if ( request.method == "POST" and "username" in request.form and "password" in request.form ): if app.config["USER"].get_id() == request.form["username"] and app.config[ "USER" ].check_password(request.form["password"]): # log the user in next_url = request.form.get("next") login_user(app.config["USER"]) # redirect him to the page he originally wanted or to the home page return redirect(url_for("loading", next=next_url or url_for("home"))) else: fail = True if fail: return ( render_template("login.html", error="Invalid username or password"), 401, ) return render_template("login.html") @app.route("/logout") @login_required def logout(): logout_user() return redirect(url_for("login"))