more modules

This commit is contained in:
Makesesama 2025-05-05 20:13:39 +02:00
parent 736e1a47c9
commit 53713ee0f5
11 changed files with 1578 additions and 7 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
__pycache__
.direnv

3
README.md Normal file
View File

@ -0,0 +1,3 @@
# Todo
- https://github.com/jlumpe/pyorg
- https://github.com/jlumpe/ox-json

View File

@ -181,3 +181,45 @@ tooltip {
tooltip>* {
padding: 2px 4px
}
/* Vinyl button styling */
#vinyl-button {
padding: 0px 8px;
transition: padding 0.05s steps(8);
background-color: rgba(180, 180, 180, 0.2);
border-radius: 4px;
transition: all 0.2s ease;
}
/* Active state styling */
.active #vinyl-button {
background-color: rgba(108, 158, 175, 0.7);
padding: 0px 32px;
}
/* Icon styling */
#vinyl-icon {
color: #555555;
min-width: 36px;
}
/* Label styling */
#vinyl-label {
color: #333333;
}
/* Active state changes for icon and label */
.active #vinyl-icon,
.active #vinyl-label {
color: var(--pink);
padding: 0px 32px;
}
/* Hover effect */
#vinyl-button:hover {
background-color: rgba(180, 180, 180, 0.4);
}
.active #vinyl-button:hover {
background-color: rgba(108, 158, 175, 0.9);
}

View File

@ -18,6 +18,8 @@ from fabric.utils import (
invoke_repeater,
get_relative_path,
)
from bar.modules.player import Player
from bar.modules.vinyl import VinylButton
AUDIO_WIDGET = True
@ -113,6 +115,8 @@ class StatusBar(Window):
Label("", style="margin: 0px 6px 0px 0px; font-size: 12px"),
],
)
self.player = Player()
self.vinyl = VinylButton()
self.status_container = Box(
name="widgets-container",
@ -123,12 +127,14 @@ class StatusBar(Window):
self.status_container.add(VolumeWidget()) if AUDIO_WIDGET is True else None
end_container_children = [
self.vinyl,
self.status_container,
self.date_time,
]
if self.system_tray is not None:
end_container_children = [
self.vinyl,
self.status_container,
self.system_tray,
self.date_time,

0
bar/modules/__init__.py Normal file
View File

274
bar/modules/cavalcade.py Normal file
View File

@ -0,0 +1,274 @@
import os
import struct
import subprocess
import re
import ctypes
import signal
from gi.repository import GLib, Gtk, Gdk
from loguru import logger
from math import pi
from fabric.widgets.overlay import Overlay
from fabric.utils.helpers import get_relative_path
import configparser
def get_bars(file_path):
config = configparser.ConfigParser()
config.read(file_path)
return int(config["general"]["bars"])
CAVA_CONFIG = get_relative_path("../config/cavalcade/cava.ini")
bars = get_bars(CAVA_CONFIG)
def set_death_signal():
"""
Set the death signal of the child process to SIGTERM so that if the parent
process is killed, the child (cava) is automatically terminated.
"""
libc = ctypes.CDLL("libc.so.6")
PR_SET_PDEATHSIG = 1
libc.prctl(PR_SET_PDEATHSIG, signal.SIGTERM)
class Cava:
"""
CAVA wrapper.
Launch cava process with certain settings and read output.
"""
NONE = 0
RUNNING = 1
RESTARTING = 2
CLOSING = 3
def __init__(self, mainapp):
self.bars = bars
self.path = "/tmp/cava.fifo"
self.cava_config_file = CAVA_CONFIG
self.data_handler = mainapp.draw.update
self.command = ["cava", "-p", self.cava_config_file]
self.state = self.NONE
self.env = dict(os.environ)
self.env["LC_ALL"] = "en_US.UTF-8" # not sure if it's necessary
is_16bit = True
self.byte_type, self.byte_size, self.byte_norm = (
("H", 2, 65535) if is_16bit else ("B", 1, 255)
)
if not os.path.exists(self.path):
os.mkfifo(self.path)
self.fifo_fd = None
self.fifo_dummy_fd = None
self.io_watch_id = None
def _run_process(self):
logger.debug("Launching cava process...")
try:
self.process = subprocess.Popen(
self.command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env=self.env,
preexec_fn=set_death_signal, # Ensure cava gets killed when the parent dies.
)
logger.debug("cava successfully launched!")
self.state = self.RUNNING
except Exception:
logger.exception("Fail to launch cava")
def _start_io_reader(self):
logger.debug("Activating GLib IO watch for cava stream handler")
# Open FIFO in non-blocking mode for reading
self.fifo_fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
# Open dummy write end to prevent getting an EOF on our FIFO
self.fifo_dummy_fd = os.open(self.path, os.O_WRONLY | os.O_NONBLOCK)
self.io_watch_id = GLib.io_add_watch(
self.fifo_fd, GLib.IO_IN, self._io_callback
)
def _io_callback(self, source, condition):
chunk = self.byte_size * self.bars # number of bytes for given format
try:
data = os.read(self.fifo_fd, chunk)
except OSError as e:
# logger.error("Error reading FIFO: {}".format(e))
return False
# When no data is read, do not remove the IO watch immediately.
if len(data) < chunk:
# Instead of closing the FIFO, we log a warning and continue.
# logger.warning("Incomplete data packet received (expected {} bytes, got {}). Waiting for more data...".format(chunk, len(data)))
# Returning True keeps the IO watch active. A real EOF will only occur when the writer closes.
return True
fmt = self.byte_type * self.bars # format string for struct.unpack
sample = [i / self.byte_norm for i in struct.unpack(fmt, data)]
GLib.idle_add(self.data_handler, sample)
return True
def _on_stop(self):
logger.debug("Cava stream handler deactivated")
if self.state == self.RESTARTING:
self.start()
elif self.state == self.RUNNING:
self.state = self.NONE
logger.error("Cava process was unexpectedly terminated.")
# self.restart() # May cause infinity loop, need more check
def start(self):
"""Launch cava"""
self._start_io_reader()
self._run_process()
def restart(self):
"""Restart cava process"""
if self.state == self.RUNNING:
logger.debug("Restarting cava process (normal mode) ...")
self.state = self.RESTARTING
if self.process.poll() is None:
self.process.kill()
elif self.state == self.NONE:
logger.warning("Restarting cava process (after crash) ...")
self.start()
def close(self):
"""Stop cava process"""
self.state = self.CLOSING
if self.process.poll() is None:
self.process.kill()
if self.io_watch_id:
GLib.source_remove(self.io_watch_id)
if self.fifo_fd:
os.close(self.fifo_fd)
if self.fifo_dummy_fd:
os.close(self.fifo_dummy_fd)
if os.path.exists(self.path):
os.remove(self.path)
class AttributeDict(dict):
"""Dictionary with keys as attributes. Does nothing but easy reading"""
def __getattr__(self, attr):
return self.get(attr, 3)
def __setattr__(self, attr, value):
self[attr] = value
class Spectrum:
"""Spectrum drawing"""
def __init__(self):
self.silence_value = 0
self.audio_sample = []
self.color = None
self.area = Gtk.DrawingArea()
self.area.connect("draw", self.redraw)
self.area.add_events(Gdk.EventMask.BUTTON_PRESS_MASK)
self.sizes = AttributeDict()
self.sizes.area = AttributeDict()
self.sizes.bar = AttributeDict()
self.silence = 10
self.max_height = 12
self.area.connect("configure-event", self.size_update)
self.color_update()
def is_silence(self, value):
"""Check if volume level critically low during last iterations"""
self.silence_value = 0 if value > 0 else self.silence_value + 1
return self.silence_value > self.silence
def update(self, data):
"""Audio data processing"""
self.color_update()
self.audio_sample = data
if not self.is_silence(self.audio_sample[0]):
self.area.queue_draw()
elif self.silence_value == (self.silence + 1):
self.audio_sample = [0] * self.sizes.number
self.area.queue_draw()
def redraw(self, widget, cr):
"""Draw spectrum graph"""
cr.set_source_rgba(*self.color)
dx = 3
center_y = self.sizes.area.height / 2 # center vertical of the drawing area
for i, value in enumerate(self.audio_sample):
width = self.sizes.area.width / self.sizes.number - self.sizes.padding
radius = width / 2
height = max(self.sizes.bar.height * min(value, 1), self.sizes.zero) / 2
if height == self.sizes.zero / 2 + 1:
height *= 0.5
height = min(height, self.max_height)
# Draw rectangle and arcs for rounded ends
cr.rectangle(dx, center_y - height, width, height * 2)
cr.arc(dx + radius, center_y - height, radius, 0, 2 * pi)
cr.arc(dx + radius, center_y + height, radius, 0, 2 * pi)
cr.close_path()
dx += width + self.sizes.padding
cr.fill()
def size_update(self, *args):
"""Update drawing geometry"""
self.sizes.number = bars
self.sizes.padding = 100 / bars
self.sizes.zero = 0
self.sizes.area.width = self.area.get_allocated_width()
self.sizes.area.height = self.area.get_allocated_height() - 2
tw = self.sizes.area.width - self.sizes.padding * (self.sizes.number - 1)
self.sizes.bar.width = max(int(tw / self.sizes.number), 1)
self.sizes.bar.height = self.sizes.area.height
def color_update(self):
"""Set drawing color according to current settings by reading primary color from CSS"""
color = "#a5c8ff" # default value
try:
with open(get_relative_path("../styles/colors.css"), "r") as f:
content = f.read()
m = re.search(r"--primary:\s*(#[0-9a-fA-F]{6})", content)
if m:
color = m.group(1)
except Exception as e:
logger.error("Failed to read primary color: {}".format(e))
red = int(color[1:3], 16) / 255
green = int(color[3:5], 16) / 255
blue = int(color[5:7], 16) / 255
self.color = Gdk.RGBA(red=red, green=green, blue=blue, alpha=1.0)
class SpectrumRender:
def __init__(self, mode=None, **kwargs):
super().__init__(**kwargs)
self.mode = mode
self.draw = Spectrum()
self.cava = Cava(self)
self.cava.start()
def get_spectrum_box(self):
# Get the spectrum box
box = Overlay(name="cavalcade", h_align="center", v_align="center")
box.set_size_request(180, 40)
box.add_overlay(self.draw.area)
return box

738
bar/modules/player.py Normal file
View File

@ -0,0 +1,738 @@
import os
import urllib.parse
import urllib.request
import tempfile
from gi.repository import Gtk, GLib, Gio, Gdk
from fabric.widgets.box import Box
from fabric.widgets.centerbox import CenterBox
from fabric.widgets.label import Label
from fabric.widgets.button import Button
from fabric.widgets.circularprogressbar import CircularProgressBar
from fabric.widgets.overlay import Overlay
from fabric.widgets.stack import Stack
from ..widgets.circle_image import CircleImage
import bar.modules.icons as icons
from bar.services.mpris import MprisPlayerManager, MprisPlayer
# from bar.modules.cavalcade import SpectrumRender
def get_player_icon_markup_by_name(player_name):
if player_name:
pn = player_name.lower()
if pn == "firefox":
return icons.firefox
elif pn == "spotify":
return icons.spotify
elif pn in ("chromium", "brave"):
return icons.chromium
return icons.disc
def add_hover_cursor(widget):
widget.add_events(Gdk.EventMask.ENTER_NOTIFY_MASK | Gdk.EventMask.LEAVE_NOTIFY_MASK)
widget.connect(
"enter-notify-event",
lambda w, event: w.get_window().set_cursor(
Gdk.Cursor.new_from_name(Gdk.Display.get_default(), "pointer")
),
)
widget.connect(
"leave-notify-event", lambda w, event: w.get_window().set_cursor(None)
)
class PlayerBox(Box):
def __init__(self, mpris_player=None):
super().__init__(
orientation="v", h_align="fill", spacing=0, h_expand=False, v_expand=True
)
self.mpris_player = mpris_player
self._progress_timer_id = None # Initialize timer ID
self.cover = CircleImage(
name="player-cover",
image_file=os.path.expanduser("~/Pictures/wallpaper/background.jpg"),
size=162,
h_align="center",
v_align="center",
)
self.cover_placerholder = CircleImage(
name="player-cover",
size=198,
h_align="center",
v_align="center",
)
self.title = Label(
name="player-title",
h_expand=True,
h_align="fill",
ellipsization="end",
max_chars_width=1,
)
self.album = Label(
name="player-album",
h_expand=True,
h_align="fill",
ellipsization="end",
max_chars_width=1,
)
self.artist = Label(
name="player-artist",
h_expand=True,
h_align="fill",
ellipsization="end",
max_chars_width=1,
)
self.progressbar = CircularProgressBar(
name="player-progress",
size=198,
h_align="center",
v_align="center",
start_angle=180,
end_angle=360,
)
self.time = Label(name="player-time", label="--:-- / --:--")
self.overlay = Overlay(
child=self.cover_placerholder,
overlays=[self.progressbar, self.cover],
)
self.overlay_container = CenterBox(
name="player-overlay", center_children=[self.overlay]
)
self.title.set_label("Nothing Playing")
self.album.set_label("Enjoy the silence")
self.artist.set_label("¯\\_(ツ)_/¯")
self.progressbar.set_value(0.0)
self.prev = Button(
name="player-btn",
child=Label(name="player-btn-label", markup=icons.prev),
)
self.backward = Button(
name="player-btn",
child=Label(name="player-btn-label", markup=icons.skip_back),
)
self.play_pause = Button(
name="player-btn",
child=Label(name="player-btn-label", markup=icons.play),
)
self.forward = Button(
name="player-btn",
child=Label(name="player-btn-label", markup=icons.skip_forward),
)
self.next = Button(
name="player-btn",
child=Label(name="player-btn-label", markup=icons.next),
)
# Add hover effect to buttons
add_hover_cursor(self.prev)
add_hover_cursor(self.backward)
add_hover_cursor(self.play_pause)
add_hover_cursor(self.forward)
add_hover_cursor(self.next)
self.btn_box = CenterBox(
name="player-btn-box",
orientation="h",
center_children=[
Box(
orientation="h",
spacing=8,
h_expand=True,
h_align="fill",
children=[
self.prev,
self.backward,
self.play_pause,
self.forward,
self.next,
],
)
],
)
self.player_box = Box(
name="player-box",
orientation="v",
spacing=8,
children=[
self.overlay_container,
self.title,
self.album,
self.artist,
self.btn_box,
self.time,
],
)
self.add(self.player_box)
if mpris_player:
self._apply_mpris_properties() # This will handle starting the timer if needed
self.prev.connect("clicked", self._on_prev_clicked)
self.play_pause.connect("clicked", self._on_play_pause_clicked)
self.backward.connect("clicked", self._on_backward_clicked)
self.forward.connect("clicked", self._on_forward_clicked)
self.next.connect("clicked", self._on_next_clicked)
self.mpris_player.connect("changed", self._on_mpris_changed)
else:
self.play_pause.get_child().set_markup(icons.stop)
# Ensure buttons are disabled visually if no player
self.backward.add_style_class("disabled")
self.forward.add_style_class("disabled")
self.prev.add_style_class("disabled")
self.next.add_style_class("disabled")
self.progressbar.set_value(0.0)
self.time.set_text("--:-- / --:--")
def _apply_mpris_properties(self):
mp = self.mpris_player
self.title.set_visible(bool(mp.title and mp.title.strip()))
if mp.title and mp.title.strip():
self.title.set_text(mp.title)
self.album.set_visible(bool(mp.album and mp.album.strip()))
if mp.album and mp.album.strip():
self.album.set_text(mp.album)
self.artist.set_visible(bool(mp.artist and mp.artist.strip()))
if mp.artist and mp.artist.strip():
self.artist.set_text(mp.artist)
if mp.arturl:
parsed = urllib.parse.urlparse(mp.arturl)
if parsed.scheme == "file":
local_arturl = urllib.parse.unquote(parsed.path)
self._set_cover_image(local_arturl)
elif parsed.scheme in ("http", "https"):
GLib.Thread.new(
"download-artwork", self._download_and_set_artwork, mp.arturl
)
else:
self._set_cover_image(mp.arturl)
else:
fallback = os.path.expanduser("~/Pictures/wallpaper/background.jpg")
self._set_cover_image(fallback)
file_obj = Gio.File.new_for_path(fallback)
monitor = file_obj.monitor_file(Gio.FileMonitorFlags.NONE, None)
monitor.connect("changed", self.on_wallpaper_changed)
self._wallpaper_monitor = monitor
self.update_play_pause_icon()
# Keep progress bar and time visible always
self.progressbar.set_visible(True)
self.time.set_visible(True)
player_name = (
mp.player_name.lower()
if hasattr(mp, "player_name") and mp.player_name
else ""
)
can_seek = hasattr(mp, "can_seek") and mp.can_seek
if player_name == "firefox" or not can_seek:
# Disable seeking buttons and reset progress/time display
self.backward.add_style_class("disabled")
self.forward.add_style_class("disabled")
self.progressbar.set_value(0.0)
self.time.set_text("--:-- / --:--")
# Stop the timer if it's running
if self._progress_timer_id:
GLib.source_remove(self._progress_timer_id)
self._progress_timer_id = None
else:
# Enable seeking buttons
self.backward.remove_style_class("disabled")
self.forward.remove_style_class("disabled")
# Start the timer if it's not already running
if not self._progress_timer_id:
self._progress_timer_id = GLib.timeout_add(1000, self._update_progress)
# Initial progress update if possible
self._update_progress() # Call once for immediate update
# Enable/disable prev/next based on capabilities
if hasattr(mp, "can_go_previous") and mp.can_go_previous:
self.prev.remove_style_class("disabled")
else:
self.prev.add_style_class("disabled")
if hasattr(mp, "can_go_next") and mp.can_go_next:
self.next.remove_style_class("disabled")
else:
self.next.add_style_class("disabled")
def _set_cover_image(self, image_path):
if image_path and os.path.isfile(image_path):
self.cover.set_image_from_file(image_path)
else:
fallback = os.path.expanduser("~/Pictures/wallpaper/background.jpg")
self.cover.set_image_from_file(fallback)
file_obj = Gio.File.new_for_path(fallback)
monitor = file_obj.monitor_file(Gio.FileMonitorFlags.NONE, None)
monitor.connect("changed", self.on_wallpaper_changed)
self._wallpaper_monitor = monitor
def _download_and_set_artwork(self, arturl):
"""
Download the artwork from the given URL asynchronously and update the cover image
using GLib.idle_add to ensure UI updates occur on the main thread.
"""
try:
parsed = urllib.parse.urlparse(arturl)
suffix = os.path.splitext(parsed.path)[1] or ".png"
with urllib.request.urlopen(arturl) as response:
data = response.read()
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
temp_file.write(data)
temp_file.close()
local_arturl = temp_file.name
except Exception:
local_arturl = os.path.expanduser("~/.current.wall")
GLib.idle_add(self._set_cover_image, local_arturl)
return None
def update_play_pause_icon(self):
if self.mpris_player.playback_status == "playing":
self.play_pause.get_child().set_markup(icons.pause)
else:
self.play_pause.get_child().set_markup(icons.play)
def on_wallpaper_changed(self, monitor, file, other_file, event):
self.cover.set_image_from_file(os.path.expanduser("~/.current.wall"))
# --- Control methods, defined only once each ---
def _on_prev_clicked(self, button):
if self.mpris_player:
self.mpris_player.previous()
def _on_play_pause_clicked(self, button):
if self.mpris_player:
self.mpris_player.play_pause()
self.update_play_pause_icon()
def _on_backward_clicked(self, button):
# Only seek if player exists, can seek, and button is not disabled
if (
self.mpris_player
and self.mpris_player.can_seek
and "disabled" not in self.backward.get_style_context().list_classes()
):
new_pos = max(0, self.mpris_player.position - 5000000) # 5 seconds backward
self.mpris_player.position = new_pos
def _on_forward_clicked(self, button):
# Only seek if player exists, can seek, and button is not disabled
if (
self.mpris_player
and self.mpris_player.can_seek
and "disabled" not in self.forward.get_style_context().list_classes()
):
new_pos = self.mpris_player.position + 5000000 # 5 seconds forward
self.mpris_player.position = new_pos
def _on_next_clicked(self, button):
if self.mpris_player:
self.mpris_player.next()
def _update_progress(self):
# Timer is now only active if can_seek is true, so no need for the initial check
if not self.mpris_player: # Still need to check if player exists
# Should not happen if timer logic is correct, but good safeguard
if self._progress_timer_id:
GLib.source_remove(self._progress_timer_id)
self._progress_timer_id = None
return False # Stop timer
try:
current = self.mpris_player.position
except Exception:
current = 0
try:
total = int(self.mpris_player.length or 0)
except Exception:
total = 0
# Prevent division by zero or invalid updates
if total <= 0:
progress = 0.0
self.time.set_text("--:-- / --:--")
# Don't stop the timer here, length might become available later
else:
progress = current / total
self.time.set_text(
f"{self._format_time(current)} / {self._format_time(total)}"
)
self.progressbar.set_value(progress)
return True # Continue the timer
def _format_time(self, us):
seconds = int(us / 1000000)
minutes = seconds // 60
seconds = seconds % 60
return f"{minutes}:{seconds:02}"
def _update_metadata(self):
if not self.mpris_player:
return False
self._apply_mpris_properties()
return True
def _on_mpris_changed(self, *args):
# Debounce metadata updates to avoid excessive work on the main thread.
if not hasattr(self, "_update_pending") or not self._update_pending:
self._update_pending = True
# Use idle_add for potentially faster UI response than timeout_add(100)
GLib.idle_add(self._apply_mpris_properties_debounced)
def _apply_mpris_properties_debounced(self):
# Ensure player still exists before applying properties
if self.mpris_player:
self._apply_mpris_properties()
else:
# Player vanished, ensure timer is stopped if it was running
if self._progress_timer_id:
GLib.source_remove(self._progress_timer_id)
self._progress_timer_id = None
self._update_pending = False
return False
class Player(Box):
def __init__(self):
super().__init__(
name="player",
orientation="v",
h_align="fill",
spacing=0,
h_expand=False,
v_expand=True,
)
self.player_stack = Stack(
name="player-stack",
transition_type="slide-left-right",
transition_duration=500,
v_align="center",
v_expand=True,
)
self.switcher = Gtk.StackSwitcher(
name="player-switcher",
spacing=8,
)
self.switcher.set_stack(self.player_stack)
self.switcher.set_halign(Gtk.Align.CENTER)
self.mpris_manager = MprisPlayerManager()
players = self.mpris_manager.players
if players:
for p in players:
mp = MprisPlayer(p)
pb = PlayerBox(mpris_player=mp)
self.player_stack.add_titled(pb, mp.player_name, mp.player_name)
else:
pb = PlayerBox(mpris_player=None)
self.player_stack.add_titled(pb, "nothing", "Nothing Playing")
self.mpris_manager.connect("player-appeared", self.on_player_appeared)
self.mpris_manager.connect("player-vanished", self.on_player_vanished)
self.switcher.set_visible(True)
self.add(self.player_stack)
self.add(self.switcher)
GLib.idle_add(self._replace_switcher_labels)
def on_player_appeared(self, manager, player):
children = self.player_stack.get_children()
if len(children) == 1 and not getattr(children[0], "mpris_player", None):
self.player_stack.remove(children[0])
mp = MprisPlayer(player)
pb = PlayerBox(mpris_player=mp)
self.player_stack.add_titled(pb, mp.player_name, mp.player_name)
# Timer is now started conditionally within PlayerBox.__init__
self.switcher.set_visible(True)
GLib.idle_add(lambda: self._update_switcher_for_player(mp.player_name))
GLib.idle_add(self._replace_switcher_labels)
def on_player_vanished(self, manager, player_name):
for child in self.player_stack.get_children():
if (
hasattr(child, "mpris_player")
and child.mpris_player
and child.mpris_player.player_name == player_name
):
self.player_stack.remove(child)
break
if not any(
getattr(child, "mpris_player", None)
for child in self.player_stack.get_children()
):
pb = PlayerBox(mpris_player=None)
self.player_stack.add_titled(pb, "nothing", "Nothing Playing")
self.switcher.set_visible(True)
GLib.idle_add(self._replace_switcher_labels)
def _replace_switcher_labels(self):
buttons = self.switcher.get_children()
for btn in buttons:
if isinstance(btn, Gtk.ToggleButton):
default_label = None
for child in btn.get_children():
if isinstance(child, Gtk.Label):
default_label = child
break
if default_label:
label_player_name = getattr(
default_label, "player_name", default_label.get_text().lower()
)
icon_markup = get_player_icon_markup_by_name(label_player_name)
btn.remove(default_label)
new_label = Label(name="player-label", markup=icon_markup)
new_label.player_name = label_player_name
btn.add(new_label)
new_label.show_all()
return False
def _update_switcher_for_player(self, player_name):
for btn in self.switcher.get_children():
if isinstance(btn, Gtk.ToggleButton):
default_label = None
for child in btn.get_children():
if isinstance(child, Gtk.Label):
default_label = child
break
if default_label:
label_player_name = getattr(
default_label, "player_name", default_label.get_text().lower()
)
if label_player_name == player_name.lower():
icon_markup = get_player_icon_markup_by_name(player_name)
btn.remove(default_label)
new_label = Label(name="player-label", markup=icon_markup)
new_label.player_name = player_name.lower()
btn.add(new_label)
new_label.show_all()
return False
class PlayerSmall(CenterBox):
def __init__(self):
super().__init__(
name="player-small", orientation="h", h_align="fill", v_align="center"
)
self._show_artist = False # toggle flag
self._display_options = ["cavalcade", "title", "artist"]
self._display_index = 0
self._current_display = "cavalcade"
self.mpris_icon = Button(
name="compact-mpris-icon",
h_align="center",
v_align="center",
child=Label(name="compact-mpris-icon-label", markup=icons.disc),
)
# Remove scroll events; instead, add button press events.
self.mpris_icon.add_events(Gdk.EventMask.BUTTON_PRESS_MASK)
self.mpris_icon.connect("button-press-event", self._on_icon_button_press)
# Prevent the child from propagating events.
child = self.mpris_icon.get_child()
child.add_events(Gdk.EventMask.BUTTON_PRESS_MASK)
child.connect("button-press-event", lambda widget, event: True)
# Add hover effect
add_hover_cursor(self.mpris_icon)
self.mpris_label = Label(
name="compact-mpris-label",
label="Nothing Playing",
ellipsization="end",
max_chars_width=26,
h_align="center",
)
self.mpris_button = Button(
name="compact-mpris-button",
h_align="center",
v_align="center",
child=Label(name="compact-mpris-button-label", markup=icons.play),
)
self.mpris_button.add_events(Gdk.EventMask.BUTTON_PRESS_MASK)
self.mpris_button.connect(
"button-press-event", self._on_play_pause_button_press
)
# Add hover effect
add_hover_cursor(self.mpris_button)
# self.cavalcade = SpectrumRender()
# self.cavalcade_box = self.cavalcade.get_spectrum_box()
self.center_stack = Stack(
name="compact-mpris",
transition_type="crossfade",
transition_duration=100,
v_align="center",
v_expand=False,
children=[
# self.cavalcade_box,
self.mpris_label,
],
)
# self.center_stack.set_visible_child(self.cavalcade_box) # default to cavalcade
# Create additional compact view.
self.mpris_small = CenterBox(
name="compact-mpris",
orientation="h",
h_expand=True,
h_align="fill",
v_align="center",
v_expand=False,
start_children=self.mpris_icon,
center_children=self.center_stack, # Changed to center_stack to handle stack switching
end_children=self.mpris_button,
)
self.add(self.mpris_small)
self.mpris_manager = MprisPlayerManager()
self.mpris_player = None
# Almacenar el índice del reproductor actual
self.current_index = 0
players = self.mpris_manager.players
if players:
mp = MprisPlayer(players[self.current_index])
self.mpris_player = mp
self._apply_mpris_properties()
self.mpris_player.connect("changed", self._on_mpris_changed)
else:
self._apply_mpris_properties()
self.mpris_manager.connect("player-appeared", self.on_player_appeared)
self.mpris_manager.connect("player-vanished", self.on_player_vanished)
self.mpris_button.connect("clicked", self._on_play_pause_clicked)
def _apply_mpris_properties(self):
if not self.mpris_player:
self.mpris_label.set_text("Nothing Playing")
self.mpris_button.get_child().set_markup(icons.stop)
self.mpris_icon.get_child().set_markup(icons.disc)
if self._current_display != "cavalcade":
self.center_stack.set_visible_child(
self.mpris_label
) # if was title or artist, keep showing label
# else:
# self.center_stack.set_visible_child(
# self.cavalcade_box
# ) # default to cavalcade if no player
return
mp = self.mpris_player
# Choose icon based on player name.
player_name = (
mp.player_name.lower()
if hasattr(mp, "player_name") and mp.player_name
else ""
)
icon_markup = get_player_icon_markup_by_name(player_name)
self.mpris_icon.get_child().set_markup(icon_markup)
self.update_play_pause_icon()
if self._current_display == "title":
text = mp.title if mp.title and mp.title.strip() else "Nothing Playing"
self.mpris_label.set_text(text)
self.center_stack.set_visible_child(self.mpris_label)
elif self._current_display == "artist":
text = mp.artist if mp.artist else "Nothing Playing"
self.mpris_label.set_text(text)
self.center_stack.set_visible_child(self.mpris_label)
# else: # default cavalcade
# self.center_stack.set_visible_child(self.cavalcade_box)
def _on_icon_button_press(self, widget, event):
from gi.repository import Gdk
if event.type == Gdk.EventType.BUTTON_PRESS:
players = self.mpris_manager.players
if not players:
return True
if event.button == 2: # Middle-click: cycle display
self._display_index = (self._display_index + 1) % len(
self._display_options
)
self._current_display = self._display_options[self._display_index]
self._apply_mpris_properties() # Re-apply to update label/cavalcade
return True
# Cambiar de reproductor según el botón presionado.
if event.button == 1: # Left-click: next player
self.current_index = (self.current_index + 1) % len(players)
elif event.button == 3: # Right-click: previous player
self.current_index = (self.current_index - 1) % len(players)
if self.current_index < 0:
self.current_index = len(players) - 1
mp_new = MprisPlayer(players[self.current_index])
self.mpris_player = mp_new
# Conectar el evento "changed" para que se actualice
self.mpris_player.connect("changed", self._on_mpris_changed)
self._apply_mpris_properties()
return True # Se consume el evento
return True
def _on_play_pause_button_press(self, widget, event):
if event.type == Gdk.EventType.BUTTON_PRESS:
if event.button == 1: # Click izquierdo -> track anterior
if self.mpris_player:
self.mpris_player.previous()
self.mpris_button.get_child().set_markup(icons.prev)
GLib.timeout_add(500, self._restore_play_pause_icon)
elif event.button == 3: # Click derecho -> siguiente track
if self.mpris_player:
self.mpris_player.next()
self.mpris_button.get_child().set_markup(icons.next)
GLib.timeout_add(500, self._restore_play_pause_icon)
elif event.button == 2: # Click medio -> play/pausa
if self.mpris_player:
self.mpris_player.play_pause()
self.update_play_pause_icon()
return True
return True
def _restore_play_pause_icon(self):
self.update_play_pause_icon()
return False
def _on_icon_clicked(
self, widget
): # No longer used, logic moved to _on_icon_button_press
pass
def update_play_pause_icon(self):
if self.mpris_player and self.mpris_player.playback_status == "playing":
self.mpris_button.get_child().set_markup(icons.pause)
else:
self.mpris_button.get_child().set_markup(icons.play)
def _on_play_pause_clicked(self, button):
if self.mpris_player:
self.mpris_player.play_pause()
self.update_play_pause_icon()
def _on_mpris_changed(self, *args):
# Update properties when the player's state changes.
self._apply_mpris_properties()
def on_player_appeared(self, manager, player):
# When a new player appears, use it if no player is active.
if not self.mpris_player:
mp = MprisPlayer(player)
self.mpris_player = mp
self._apply_mpris_properties()
self.mpris_player.connect("changed", self._on_mpris_changed)
def on_player_vanished(self, manager, player_name):
players = self.mpris_manager.players
if (
players
and self.mpris_player
and self.mpris_player.player_name == player_name
):
if players: # Check if players is not empty after vanishing
self.current_index = self.current_index % len(players)
new_player = MprisPlayer(players[self.current_index])
self.mpris_player = new_player
self.mpris_player.connect("changed", self._on_mpris_changed)
else:
self.mpris_player = None # No players left
elif not players:
self.mpris_player = None
self._apply_mpris_properties()

92
bar/modules/vinyl.py Normal file
View File

@ -0,0 +1,92 @@
from fabric.widgets.box import Box
from fabric.widgets.label import Label
from fabric.widgets.eventbox import EventBox
from fabric.widgets.overlay import Overlay
from fabric.core.service import Property
import subprocess
class VinylButton(Box):
@Property(bool, "read-write", default_value=False)
def active(self) -> bool:
return self._active
@active.setter
def active(self, value: bool):
self._active = value
# Update appearance based on state
self._update_appearance()
# Execute shell command based on new state
if self._active:
self._execute_active_command()
else:
self._execute_inactive_command()
def __init__(
self,
active_command="""pw-link alsa_input.pci-0000_12_00.6.analog-stereo:capture_FL alsa_output.usb-BEHRINGER_UMC1820_A71E9E3E-00.multichannel-output:playback_AUX0
pw-link alsa_input.pci-0000_12_00.6.analog-stereo:capture_FR alsa_output.usb-BEHRINGER_UMC1820_A71E9E3E-00.multichannel-output:playback_AUX1""",
inactive_command="""pw-link -d alsa_input.pci-0000_12_00.6.analog-stereo:capture_FL alsa_output.usb-BEHRINGER_UMC1820_A71E9E3E-00.multichannel-output:playback_AUX0
pw-link -d alsa_input.pci-0000_12_00.6.analog-stereo:capture_FR alsa_output.usb-BEHRINGER_UMC1820_A71E9E3E-00.multichannel-output:playback_AUX1 """,
**kwargs,
):
super().__init__(**kwargs)
# Initialize properties
self._active = False
self._active_command = active_command
self._inactive_command = inactive_command
# Set up the icon
self.icon = Label(
label="", # CD icon
name="vinyl-icon",
style="",
)
# Set up event box to handle clicks
self.event_box = EventBox(
events="button-press",
child=Overlay(
child=self.icon,
),
name="vinyl-button",
)
# Connect click event
self.event_box.connect("button-press-event", self._on_clicked)
# Add to parent box
self.add(self.event_box)
# Initialize appearance
self._update_appearance()
def _update_appearance(self):
"""Update CSS class based on active state"""
if self._active:
self.add_style_class("active")
else:
self.remove_style_class("active")
def _on_clicked(self, _, event):
"""Handle button click event"""
if event.button == 1: # Left click
# Toggle active state
self.active = not self.active
return True
def _execute_active_command(self):
"""Execute shell command when button is activated"""
try:
subprocess.Popen(self._active_command, shell=True)
except Exception as e:
print(f"Error executing active command: {e}")
def _execute_inactive_command(self):
"""Execute shell command when button is deactivated"""
try:
subprocess.Popen(self._inactive_command, shell=True)
except Exception as e:
print(f"Error executing inactive command: {e}")

282
bar/services/mpris.py Normal file
View File

@ -0,0 +1,282 @@
# Standard library imports
import contextlib
# Third-party imports
import gi
from gi.repository import GLib # type: ignore
from loguru import logger
# Fabric imports
from fabric.core.service import Property, Service, Signal
from fabric.utils import bulk_connect
class PlayerctlImportError(ImportError):
"""An error to raise when playerctl is not installed."""
def __init__(self, *args):
super().__init__(
"Playerctl is not installed, please install it first",
*args,
)
# Try to import Playerctl, raise custom error if not available
try:
gi.require_version("Playerctl", "2.0")
from gi.repository import Playerctl
except ValueError:
raise PlayerctlImportError
class MprisPlayer(Service):
"""A service to manage a mpris player."""
@Signal
def exit(self, value: bool) -> bool: ...
@Signal
def changed(self) -> None: ...
def __init__(
self,
player: Playerctl.Player,
**kwargs,
):
self._signal_connectors: dict = {}
self._player: Playerctl.Player = player
super().__init__(**kwargs)
for sn in ["playback-status", "loop-status", "shuffle", "volume", "seeked"]:
self._signal_connectors[sn] = self._player.connect(
sn,
lambda *args, sn=sn: self.notifier(sn, args),
)
self._signal_connectors["exit"] = self._player.connect(
"exit",
self.on_player_exit,
)
self._signal_connectors["metadata"] = self._player.connect(
"metadata",
lambda *args: self.update_status(),
)
GLib.idle_add(lambda *args: self.update_status_once())
def update_status(self):
# schedule each notifier asynchronously.
def notify_property(prop):
if self.get_property(prop) is not None:
self.notifier(prop)
for prop in [
"metadata",
"title",
"artist",
"arturl",
"length",
]:
GLib.idle_add(lambda p=prop: (notify_property(p), False))
for prop in [
"can-seek",
"can-pause",
"can-shuffle",
"can-go-next",
"can-go-previous",
]:
GLib.idle_add(lambda p=prop: (self.notifier(p), False))
def update_status_once(self):
# schedule notifier calls for each property
def notify_all():
for prop in self.list_properties(): # type: ignore
self.notifier(prop.name)
return False
GLib.idle_add(notify_all, priority=GLib.PRIORITY_DEFAULT_IDLE)
def notifier(self, name: str, args=None):
def notify_and_emit():
self.notify(name)
self.emit("changed")
return False
GLib.idle_add(notify_and_emit, priority=GLib.PRIORITY_DEFAULT_IDLE)
def on_player_exit(self, player):
for id in list(self._signal_connectors.values()):
with contextlib.suppress(Exception):
self._player.disconnect(id)
del self._signal_connectors
GLib.idle_add(lambda: (self.emit("exit", True), False))
del self._player
def toggle_shuffle(self):
if self.can_shuffle:
# schedule the shuffle toggle in the GLib idle loop
GLib.idle_add(lambda: (setattr(self, "shuffle", not self.shuffle), False))
# else do nothing
def play_pause(self):
if self.can_pause:
GLib.idle_add(lambda: (self._player.play_pause(), False))
def next(self):
if self.can_go_next:
GLib.idle_add(lambda: (self._player.next(), False))
def previous(self):
if self.can_go_previous:
GLib.idle_add(lambda: (self._player.previous(), False))
# Properties
@Property(str, "readable")
def player_name(self) -> int:
return self._player.get_property("player-name") # type: ignore
@Property(int, "read-write", default_value=0)
def position(self) -> int:
return self._player.get_property("position") # type: ignore
@position.setter
def position(self, new_pos: int):
self._player.set_position(new_pos)
@Property(object, "readable")
def metadata(self) -> dict:
return self._player.get_property("metadata") # type: ignore
@Property(str or None, "readable")
def arturl(self) -> str | None:
if "mpris:artUrl" in self.metadata.keys(): # type: ignore # noqa: SIM118
return self.metadata["mpris:artUrl"] # type: ignore
return None
@Property(str or None, "readable")
def length(self) -> str | None:
if "mpris:length" in self.metadata.keys(): # type: ignore # noqa: SIM118
return self.metadata["mpris:length"] # type: ignore
return None
@Property(str, "readable")
def artist(self) -> str:
artist = self._player.get_artist() # type: ignore
if isinstance(artist, (list, tuple)):
return ", ".join(artist)
return artist
@Property(str, "readable")
def album(self) -> str:
return self._player.get_album() # type: ignore
@Property(str, "readable")
def title(self):
return self._player.get_title()
@Property(bool, "read-write", default_value=False)
def shuffle(self) -> bool:
return self._player.get_property("shuffle") # type: ignore
@shuffle.setter
def shuffle(self, do_shuffle: bool):
self.notifier("shuffle")
return self._player.set_shuffle(do_shuffle)
@Property(str, "readable")
def playback_status(self) -> str:
return {
Playerctl.PlaybackStatus.PAUSED: "paused",
Playerctl.PlaybackStatus.PLAYING: "playing",
Playerctl.PlaybackStatus.STOPPED: "stopped",
}.get(self._player.get_property("playback_status"), "unknown") # type: ignore
@Property(str, "read-write")
def loop_status(self) -> str:
return {
Playerctl.LoopStatus.NONE: "none",
Playerctl.LoopStatus.TRACK: "track",
Playerctl.LoopStatus.PLAYLIST: "playlist",
}.get(self._player.get_property("loop_status"), "unknown") # type: ignore
@loop_status.setter
def loop_status(self, status: str):
loop_status = {
"none": Playerctl.LoopStatus.NONE,
"track": Playerctl.LoopStatus.TRACK,
"playlist": Playerctl.LoopStatus.PLAYLIST,
}.get(status)
self._player.set_loop_status(loop_status) if loop_status else None
@Property(bool, "readable", default_value=False)
def can_go_next(self) -> bool:
return self._player.get_property("can_go_next") # type: ignore
@Property(bool, "readable", default_value=False)
def can_go_previous(self) -> bool:
return self._player.get_property("can_go_previous") # type: ignore
@Property(bool, "readable", default_value=False)
def can_seek(self) -> bool:
return self._player.get_property("can_seek") # type: ignore
@Property(bool, "readable", default_value=False)
def can_pause(self) -> bool:
return self._player.get_property("can_pause") # type: ignore
@Property(bool, "readable", default_value=False)
def can_shuffle(self) -> bool:
try:
self._player.set_shuffle(self._player.get_property("shuffle"))
return True
except Exception:
return False
@Property(bool, "readable", default_value=False)
def can_loop(self) -> bool:
try:
self._player.set_shuffle(self._player.get_property("shuffle"))
return True
except Exception:
return False
class MprisPlayerManager(Service):
"""A service to manage mpris players."""
@Signal
def player_appeared(self, player: Playerctl.Player) -> Playerctl.Player: ...
@Signal
def player_vanished(self, player_name: str) -> str: ...
def __init__(
self,
**kwargs,
):
self._manager = Playerctl.PlayerManager.new()
bulk_connect(
self._manager,
{
"name-appeared": self.on_name_appeard,
"name-vanished": self.on_name_vanished,
},
)
self.add_players()
super().__init__(**kwargs)
def on_name_appeard(self, manager, player_name: Playerctl.PlayerName):
logger.info(f"[MprisPlayer] {player_name.name} appeared")
new_player = Playerctl.Player.new_from_name(player_name)
manager.manage_player(new_player)
self.emit("player-appeared", new_player) # type: ignore
def on_name_vanished(self, manager, player_name: Playerctl.PlayerName):
logger.info(f"[MprisPlayer] {player_name.name} vanished")
self.emit("player-vanished", player_name.name) # type: ignore
def add_players(self):
for player in self._manager.get_property("player-names"): # type: ignore
self._manager.manage_player(Playerctl.Player.new_from_name(player)) # type: ignore
@Property(object, "readable")
def players(self):
return self._manager.get_property("players") # type: ignore

126
bar/widgets/circle_image.py Normal file
View File

@ -0,0 +1,126 @@
import math
from typing import Literal
import cairo
import gi
from fabric.core.service import Property
from fabric.widgets.widget import Widget
gi.require_version("Gtk", "3.0")
from gi.repository import Gdk, GdkPixbuf, Gtk # noqa: E402
class CircleImage(Gtk.DrawingArea, Widget):
"""A widget that displays an image in a circular shape with a 1:1 aspect ratio."""
@Property(int, "read-write")
def angle(self) -> int:
return self._angle
@angle.setter
def angle(self, value: int):
self._angle = value % 360
self.queue_draw()
def __init__(
self,
image_file: str | None = None,
pixbuf: GdkPixbuf.Pixbuf | None = None,
name: str | None = None,
visible: bool = True,
all_visible: bool = False,
style: str | None = None,
tooltip_text: str | None = None,
tooltip_markup: str | None = None,
h_align: Literal["fill", "start", "end", "center", "baseline"]
| Gtk.Align
| None = None,
v_align: Literal["fill", "start", "end", "center", "baseline"]
| Gtk.Align
| None = None,
h_expand: bool = False,
v_expand: bool = False,
size: int | None = None,
**kwargs,
):
Gtk.DrawingArea.__init__(self)
Widget.__init__(
self,
name=name,
visible=visible,
all_visible=all_visible,
style=style,
tooltip_text=tooltip_text,
tooltip_markup=tooltip_markup,
h_align=h_align,
v_align=v_align,
h_expand=h_expand,
v_expand=v_expand,
size=size,
**kwargs,
)
self.size = size if size is not None else 100 # Default size if not provided
self._angle = 0
self._orig_image: GdkPixbuf.Pixbuf | None = (
None # Original image for reprocessing
)
self._image: GdkPixbuf.Pixbuf | None = None
if image_file:
pix = GdkPixbuf.Pixbuf.new_from_file(image_file)
self._orig_image = pix
self._image = self._process_image(pix)
elif pixbuf:
self._orig_image = pixbuf
self._image = self._process_image(pixbuf)
self.connect("draw", self.on_draw)
def _process_image(self, pixbuf: GdkPixbuf.Pixbuf) -> GdkPixbuf.Pixbuf:
"""Crop the image to a centered square and scale it to the widgets size."""
width, height = pixbuf.get_width(), pixbuf.get_height()
if width != height:
square_size = min(width, height)
x_offset = (width - square_size) // 2
y_offset = (height - square_size) // 2
pixbuf = pixbuf.new_subpixbuf(x_offset, y_offset, square_size, square_size)
else:
square_size = width
if square_size != self.size:
pixbuf = pixbuf.scale_simple(
self.size, self.size, GdkPixbuf.InterpType.BILINEAR
)
return pixbuf
def on_draw(self, widget: "CircleImage", ctx: cairo.Context):
if self._image:
ctx.save()
# Create a circular clipping path
ctx.arc(self.size / 2, self.size / 2, self.size / 2, 0, 2 * math.pi)
ctx.clip()
# Rotate around the center of the square image
ctx.translate(self.size / 2, self.size / 2)
ctx.rotate(self._angle * math.pi / 180.0)
ctx.translate(-self.size / 2, -self.size / 2)
Gdk.cairo_set_source_pixbuf(ctx, self._image, 0, 0)
ctx.paint()
ctx.restore()
def set_image_from_file(self, new_image_file: str):
if not new_image_file:
return
pixbuf = GdkPixbuf.Pixbuf.new_from_file(new_image_file)
self._orig_image = pixbuf
self._image = self._process_image(pixbuf)
self.queue_draw()
def set_image_from_pixbuf(self, pixbuf: GdkPixbuf.Pixbuf):
if not pixbuf:
return
self._orig_image = pixbuf
self._image = self._process_image(pixbuf)
self.queue_draw()
def set_image_size(self, size: int):
self.size = size
if self._orig_image:
self._image = self._process_image(self._orig_image)
self.queue_draw()

View File

@ -10,6 +10,8 @@
gnome,
cinnamon,
wrapGAppsHook3,
playerctl,
webp-pixbuf-loader,
...
}:
@ -24,15 +26,19 @@ python3Packages.buildPythonApplication {
wrapGAppsHook3
gtk3
gobject-introspection
python3Packages.pygobject3
cairo
playerctl
];
buildInputs = [
libdbusmenu-gtk3
gtk-layer-shell
gnome.gnome-bluetooth
cinnamon.cinnamon-desktop
gdk-pixbuf
playerctl
webp-pixbuf-loader
];
# buildInputs = [
# libdbusmenu-gtk3
# gtk-layer-shell
# gnome.gnome-bluetooth
# cinnamon.cinnamon-desktop
# gdk-pixbuf
# ];
dependencies = with python3Packages; [
python-fabric