275 lines
8.9 KiB
Python
275 lines
8.9 KiB
Python
import os
|
|
import struct
|
|
import subprocess
|
|
import re
|
|
import ctypes
|
|
import signal
|
|
|
|
from gi.repository import GLib, Gtk, Gdk
|
|
from loguru import logger
|
|
from math import pi
|
|
|
|
from fabric.widgets.overlay import Overlay
|
|
from fabric.utils.helpers import get_relative_path
|
|
|
|
import configparser
|
|
|
|
|
|
def get_bars(file_path):
|
|
config = configparser.ConfigParser()
|
|
config.read(file_path)
|
|
return int(config["general"]["bars"])
|
|
|
|
|
|
CAVA_CONFIG = get_relative_path("../config/cavalcade/cava.ini")
|
|
|
|
bars = get_bars(CAVA_CONFIG)
|
|
|
|
|
|
def set_death_signal():
|
|
"""
|
|
Set the death signal of the child process to SIGTERM so that if the parent
|
|
process is killed, the child (cava) is automatically terminated.
|
|
"""
|
|
libc = ctypes.CDLL("libc.so.6")
|
|
PR_SET_PDEATHSIG = 1
|
|
libc.prctl(PR_SET_PDEATHSIG, signal.SIGTERM)
|
|
|
|
|
|
class Cava:
|
|
"""
|
|
CAVA wrapper.
|
|
Launch cava process with certain settings and read output.
|
|
"""
|
|
|
|
NONE = 0
|
|
RUNNING = 1
|
|
RESTARTING = 2
|
|
CLOSING = 3
|
|
|
|
def __init__(self, mainapp):
|
|
self.bars = bars
|
|
self.path = "/tmp/cava.fifo"
|
|
|
|
self.cava_config_file = CAVA_CONFIG
|
|
self.data_handler = mainapp.draw.update
|
|
self.command = ["cava", "-p", self.cava_config_file]
|
|
self.state = self.NONE
|
|
|
|
self.env = dict(os.environ)
|
|
self.env["LC_ALL"] = "en_US.UTF-8" # not sure if it's necessary
|
|
|
|
is_16bit = True
|
|
self.byte_type, self.byte_size, self.byte_norm = (
|
|
("H", 2, 65535) if is_16bit else ("B", 1, 255)
|
|
)
|
|
|
|
if not os.path.exists(self.path):
|
|
os.mkfifo(self.path)
|
|
|
|
self.fifo_fd = None
|
|
self.fifo_dummy_fd = None
|
|
self.io_watch_id = None
|
|
|
|
def _run_process(self):
|
|
logger.debug("Launching cava process...")
|
|
try:
|
|
self.process = subprocess.Popen(
|
|
self.command,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
env=self.env,
|
|
preexec_fn=set_death_signal, # Ensure cava gets killed when the parent dies.
|
|
)
|
|
logger.debug("cava successfully launched!")
|
|
self.state = self.RUNNING
|
|
except Exception:
|
|
logger.exception("Fail to launch cava")
|
|
|
|
def _start_io_reader(self):
|
|
logger.debug("Activating GLib IO watch for cava stream handler")
|
|
# Open FIFO in non-blocking mode for reading
|
|
self.fifo_fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
|
|
# Open dummy write end to prevent getting an EOF on our FIFO
|
|
self.fifo_dummy_fd = os.open(self.path, os.O_WRONLY | os.O_NONBLOCK)
|
|
self.io_watch_id = GLib.io_add_watch(
|
|
self.fifo_fd, GLib.IO_IN, self._io_callback
|
|
)
|
|
|
|
def _io_callback(self, source, condition):
|
|
chunk = self.byte_size * self.bars # number of bytes for given format
|
|
try:
|
|
data = os.read(self.fifo_fd, chunk)
|
|
except OSError as e:
|
|
# logger.error("Error reading FIFO: {}".format(e))
|
|
return False
|
|
|
|
# When no data is read, do not remove the IO watch immediately.
|
|
if len(data) < chunk:
|
|
# Instead of closing the FIFO, we log a warning and continue.
|
|
# logger.warning("Incomplete data packet received (expected {} bytes, got {}). Waiting for more data...".format(chunk, len(data)))
|
|
# Returning True keeps the IO watch active. A real EOF will only occur when the writer closes.
|
|
return True
|
|
|
|
fmt = self.byte_type * self.bars # format string for struct.unpack
|
|
sample = [i / self.byte_norm for i in struct.unpack(fmt, data)]
|
|
GLib.idle_add(self.data_handler, sample)
|
|
return True
|
|
|
|
def _on_stop(self):
|
|
logger.debug("Cava stream handler deactivated")
|
|
if self.state == self.RESTARTING:
|
|
self.start()
|
|
elif self.state == self.RUNNING:
|
|
self.state = self.NONE
|
|
logger.error("Cava process was unexpectedly terminated.")
|
|
# self.restart() # May cause infinity loop, need more check
|
|
|
|
def start(self):
|
|
"""Launch cava"""
|
|
self._start_io_reader()
|
|
self._run_process()
|
|
|
|
def restart(self):
|
|
"""Restart cava process"""
|
|
if self.state == self.RUNNING:
|
|
logger.debug("Restarting cava process (normal mode) ...")
|
|
self.state = self.RESTARTING
|
|
if self.process.poll() is None:
|
|
self.process.kill()
|
|
elif self.state == self.NONE:
|
|
logger.warning("Restarting cava process (after crash) ...")
|
|
self.start()
|
|
|
|
def close(self):
|
|
"""Stop cava process"""
|
|
self.state = self.CLOSING
|
|
if self.process.poll() is None:
|
|
self.process.kill()
|
|
if self.io_watch_id:
|
|
GLib.source_remove(self.io_watch_id)
|
|
if self.fifo_fd:
|
|
os.close(self.fifo_fd)
|
|
if self.fifo_dummy_fd:
|
|
os.close(self.fifo_dummy_fd)
|
|
if os.path.exists(self.path):
|
|
os.remove(self.path)
|
|
|
|
|
|
class AttributeDict(dict):
|
|
"""Dictionary with keys as attributes. Does nothing but easy reading"""
|
|
|
|
def __getattr__(self, attr):
|
|
return self.get(attr, 3)
|
|
|
|
def __setattr__(self, attr, value):
|
|
self[attr] = value
|
|
|
|
|
|
class Spectrum:
|
|
"""Spectrum drawing"""
|
|
|
|
def __init__(self):
|
|
self.silence_value = 0
|
|
self.audio_sample = []
|
|
self.color = None
|
|
|
|
self.area = Gtk.DrawingArea()
|
|
self.area.connect("draw", self.redraw)
|
|
self.area.add_events(Gdk.EventMask.BUTTON_PRESS_MASK)
|
|
|
|
self.sizes = AttributeDict()
|
|
self.sizes.area = AttributeDict()
|
|
self.sizes.bar = AttributeDict()
|
|
|
|
self.silence = 10
|
|
self.max_height = 12
|
|
|
|
self.area.connect("configure-event", self.size_update)
|
|
self.color_update()
|
|
|
|
def is_silence(self, value):
|
|
"""Check if volume level critically low during last iterations"""
|
|
self.silence_value = 0 if value > 0 else self.silence_value + 1
|
|
return self.silence_value > self.silence
|
|
|
|
def update(self, data):
|
|
"""Audio data processing"""
|
|
self.color_update()
|
|
self.audio_sample = data
|
|
if not self.is_silence(self.audio_sample[0]):
|
|
self.area.queue_draw()
|
|
elif self.silence_value == (self.silence + 1):
|
|
self.audio_sample = [0] * self.sizes.number
|
|
self.area.queue_draw()
|
|
|
|
def redraw(self, widget, cr):
|
|
"""Draw spectrum graph"""
|
|
cr.set_source_rgba(*self.color)
|
|
dx = 3
|
|
|
|
center_y = self.sizes.area.height / 2 # center vertical of the drawing area
|
|
for i, value in enumerate(self.audio_sample):
|
|
width = self.sizes.area.width / self.sizes.number - self.sizes.padding
|
|
radius = width / 2
|
|
height = max(self.sizes.bar.height * min(value, 1), self.sizes.zero) / 2
|
|
if height == self.sizes.zero / 2 + 1:
|
|
height *= 0.5
|
|
|
|
height = min(height, self.max_height)
|
|
|
|
# Draw rectangle and arcs for rounded ends
|
|
cr.rectangle(dx, center_y - height, width, height * 2)
|
|
cr.arc(dx + radius, center_y - height, radius, 0, 2 * pi)
|
|
cr.arc(dx + radius, center_y + height, radius, 0, 2 * pi)
|
|
|
|
cr.close_path()
|
|
dx += width + self.sizes.padding
|
|
cr.fill()
|
|
|
|
def size_update(self, *args):
|
|
"""Update drawing geometry"""
|
|
self.sizes.number = bars
|
|
self.sizes.padding = 100 / bars
|
|
self.sizes.zero = 0
|
|
|
|
self.sizes.area.width = self.area.get_allocated_width()
|
|
self.sizes.area.height = self.area.get_allocated_height() - 2
|
|
|
|
tw = self.sizes.area.width - self.sizes.padding * (self.sizes.number - 1)
|
|
self.sizes.bar.width = max(int(tw / self.sizes.number), 1)
|
|
self.sizes.bar.height = self.sizes.area.height
|
|
|
|
def color_update(self):
|
|
"""Set drawing color according to current settings by reading primary color from CSS"""
|
|
color = "#a5c8ff" # default value
|
|
try:
|
|
with open(get_relative_path("../styles/colors.css"), "r") as f:
|
|
content = f.read()
|
|
m = re.search(r"--primary:\s*(#[0-9a-fA-F]{6})", content)
|
|
if m:
|
|
color = m.group(1)
|
|
except Exception as e:
|
|
logger.error("Failed to read primary color: {}".format(e))
|
|
red = int(color[1:3], 16) / 255
|
|
green = int(color[3:5], 16) / 255
|
|
blue = int(color[5:7], 16) / 255
|
|
self.color = Gdk.RGBA(red=red, green=green, blue=blue, alpha=1.0)
|
|
|
|
|
|
class SpectrumRender:
|
|
def __init__(self, mode=None, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.mode = mode
|
|
|
|
self.draw = Spectrum()
|
|
self.cava = Cava(self)
|
|
self.cava.start()
|
|
|
|
def get_spectrum_box(self):
|
|
# Get the spectrum box
|
|
box = Overlay(name="cavalcade", h_align="center", v_align="center")
|
|
box.set_size_request(180, 40)
|
|
box.add_overlay(self.draw.area)
|
|
return box
|