fabric-makussu/bar/modules/cavalcade.py
2025-05-05 20:13:39 +02:00

275 lines
8.9 KiB
Python

import os
import struct
import subprocess
import re
import ctypes
import signal
from gi.repository import GLib, Gtk, Gdk
from loguru import logger
from math import pi
from fabric.widgets.overlay import Overlay
from fabric.utils.helpers import get_relative_path
import configparser
def get_bars(file_path):
config = configparser.ConfigParser()
config.read(file_path)
return int(config["general"]["bars"])
CAVA_CONFIG = get_relative_path("../config/cavalcade/cava.ini")
bars = get_bars(CAVA_CONFIG)
def set_death_signal():
"""
Set the death signal of the child process to SIGTERM so that if the parent
process is killed, the child (cava) is automatically terminated.
"""
libc = ctypes.CDLL("libc.so.6")
PR_SET_PDEATHSIG = 1
libc.prctl(PR_SET_PDEATHSIG, signal.SIGTERM)
class Cava:
"""
CAVA wrapper.
Launch cava process with certain settings and read output.
"""
NONE = 0
RUNNING = 1
RESTARTING = 2
CLOSING = 3
def __init__(self, mainapp):
self.bars = bars
self.path = "/tmp/cava.fifo"
self.cava_config_file = CAVA_CONFIG
self.data_handler = mainapp.draw.update
self.command = ["cava", "-p", self.cava_config_file]
self.state = self.NONE
self.env = dict(os.environ)
self.env["LC_ALL"] = "en_US.UTF-8" # not sure if it's necessary
is_16bit = True
self.byte_type, self.byte_size, self.byte_norm = (
("H", 2, 65535) if is_16bit else ("B", 1, 255)
)
if not os.path.exists(self.path):
os.mkfifo(self.path)
self.fifo_fd = None
self.fifo_dummy_fd = None
self.io_watch_id = None
def _run_process(self):
logger.debug("Launching cava process...")
try:
self.process = subprocess.Popen(
self.command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env=self.env,
preexec_fn=set_death_signal, # Ensure cava gets killed when the parent dies.
)
logger.debug("cava successfully launched!")
self.state = self.RUNNING
except Exception:
logger.exception("Fail to launch cava")
def _start_io_reader(self):
logger.debug("Activating GLib IO watch for cava stream handler")
# Open FIFO in non-blocking mode for reading
self.fifo_fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
# Open dummy write end to prevent getting an EOF on our FIFO
self.fifo_dummy_fd = os.open(self.path, os.O_WRONLY | os.O_NONBLOCK)
self.io_watch_id = GLib.io_add_watch(
self.fifo_fd, GLib.IO_IN, self._io_callback
)
def _io_callback(self, source, condition):
chunk = self.byte_size * self.bars # number of bytes for given format
try:
data = os.read(self.fifo_fd, chunk)
except OSError as e:
# logger.error("Error reading FIFO: {}".format(e))
return False
# When no data is read, do not remove the IO watch immediately.
if len(data) < chunk:
# Instead of closing the FIFO, we log a warning and continue.
# logger.warning("Incomplete data packet received (expected {} bytes, got {}). Waiting for more data...".format(chunk, len(data)))
# Returning True keeps the IO watch active. A real EOF will only occur when the writer closes.
return True
fmt = self.byte_type * self.bars # format string for struct.unpack
sample = [i / self.byte_norm for i in struct.unpack(fmt, data)]
GLib.idle_add(self.data_handler, sample)
return True
def _on_stop(self):
logger.debug("Cava stream handler deactivated")
if self.state == self.RESTARTING:
self.start()
elif self.state == self.RUNNING:
self.state = self.NONE
logger.error("Cava process was unexpectedly terminated.")
# self.restart() # May cause infinity loop, need more check
def start(self):
"""Launch cava"""
self._start_io_reader()
self._run_process()
def restart(self):
"""Restart cava process"""
if self.state == self.RUNNING:
logger.debug("Restarting cava process (normal mode) ...")
self.state = self.RESTARTING
if self.process.poll() is None:
self.process.kill()
elif self.state == self.NONE:
logger.warning("Restarting cava process (after crash) ...")
self.start()
def close(self):
"""Stop cava process"""
self.state = self.CLOSING
if self.process.poll() is None:
self.process.kill()
if self.io_watch_id:
GLib.source_remove(self.io_watch_id)
if self.fifo_fd:
os.close(self.fifo_fd)
if self.fifo_dummy_fd:
os.close(self.fifo_dummy_fd)
if os.path.exists(self.path):
os.remove(self.path)
class AttributeDict(dict):
"""Dictionary with keys as attributes. Does nothing but easy reading"""
def __getattr__(self, attr):
return self.get(attr, 3)
def __setattr__(self, attr, value):
self[attr] = value
class Spectrum:
"""Spectrum drawing"""
def __init__(self):
self.silence_value = 0
self.audio_sample = []
self.color = None
self.area = Gtk.DrawingArea()
self.area.connect("draw", self.redraw)
self.area.add_events(Gdk.EventMask.BUTTON_PRESS_MASK)
self.sizes = AttributeDict()
self.sizes.area = AttributeDict()
self.sizes.bar = AttributeDict()
self.silence = 10
self.max_height = 12
self.area.connect("configure-event", self.size_update)
self.color_update()
def is_silence(self, value):
"""Check if volume level critically low during last iterations"""
self.silence_value = 0 if value > 0 else self.silence_value + 1
return self.silence_value > self.silence
def update(self, data):
"""Audio data processing"""
self.color_update()
self.audio_sample = data
if not self.is_silence(self.audio_sample[0]):
self.area.queue_draw()
elif self.silence_value == (self.silence + 1):
self.audio_sample = [0] * self.sizes.number
self.area.queue_draw()
def redraw(self, widget, cr):
"""Draw spectrum graph"""
cr.set_source_rgba(*self.color)
dx = 3
center_y = self.sizes.area.height / 2 # center vertical of the drawing area
for i, value in enumerate(self.audio_sample):
width = self.sizes.area.width / self.sizes.number - self.sizes.padding
radius = width / 2
height = max(self.sizes.bar.height * min(value, 1), self.sizes.zero) / 2
if height == self.sizes.zero / 2 + 1:
height *= 0.5
height = min(height, self.max_height)
# Draw rectangle and arcs for rounded ends
cr.rectangle(dx, center_y - height, width, height * 2)
cr.arc(dx + radius, center_y - height, radius, 0, 2 * pi)
cr.arc(dx + radius, center_y + height, radius, 0, 2 * pi)
cr.close_path()
dx += width + self.sizes.padding
cr.fill()
def size_update(self, *args):
"""Update drawing geometry"""
self.sizes.number = bars
self.sizes.padding = 100 / bars
self.sizes.zero = 0
self.sizes.area.width = self.area.get_allocated_width()
self.sizes.area.height = self.area.get_allocated_height() - 2
tw = self.sizes.area.width - self.sizes.padding * (self.sizes.number - 1)
self.sizes.bar.width = max(int(tw / self.sizes.number), 1)
self.sizes.bar.height = self.sizes.area.height
def color_update(self):
"""Set drawing color according to current settings by reading primary color from CSS"""
color = "#a5c8ff" # default value
try:
with open(get_relative_path("../styles/colors.css"), "r") as f:
content = f.read()
m = re.search(r"--primary:\s*(#[0-9a-fA-F]{6})", content)
if m:
color = m.group(1)
except Exception as e:
logger.error("Failed to read primary color: {}".format(e))
red = int(color[1:3], 16) / 255
green = int(color[3:5], 16) / 255
blue = int(color[5:7], 16) / 255
self.color = Gdk.RGBA(red=red, green=green, blue=blue, alpha=1.0)
class SpectrumRender:
def __init__(self, mode=None, **kwargs):
super().__init__(**kwargs)
self.mode = mode
self.draw = Spectrum()
self.cava = Cava(self)
self.cava.start()
def get_spectrum_box(self):
# Get the spectrum box
box = Overlay(name="cavalcade", h_align="center", v_align="center")
box.set_size_request(180, 40)
box.add_overlay(self.draw.area)
return box