Compare commits

17 Commits

Author SHA1 Message Date
8ecec8768d squash 2025-05-19 09:32:17 +02:00
5d08a48b6c init: window finder 2025-05-15 23:29:20 +02:00
82b0cf7aaa feat: nixos dbus availability 2025-05-15 18:38:00 +02:00
e4744bab81 right module order 2025-05-13 23:22:03 +02:00
872dbfc792 fix vinyl enable 2025-05-13 23:18:06 +02:00
64781af68f add default 2025-05-13 23:13:45 +02:00
0ebfbdb3a9 add yaml example 2025-05-13 23:10:23 +02:00
bf3920ad35 config 2025-05-13 23:09:05 +02:00
72c76c9fda move river service to fabric fork
so i could request merge to upstream later
2025-05-12 12:31:30 +02:00
743e1ed0c5 update fabric 2025-05-12 12:01:43 +02:00
f28dd0b6a2 move nix 2025-05-12 11:18:04 +02:00
0b8190ae8b not working rework 2025-05-10 12:24:52 +02:00
9495dfba62 feat: river-control-unstable-v1 2025-05-06 13:57:30 +02:00
0cf1c5aeb7 isort 2025-05-06 13:15:44 +02:00
f8b352d624 multiple monitor support 2025-05-06 13:09:40 +02:00
53713ee0f5 more modules 2025-05-05 20:13:39 +02:00
736e1a47c9 urgent tags 2025-05-05 09:31:17 +02:00
51 changed files with 3651 additions and 447 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
__pycache__
.direnv

3
README.md Normal file
View File

@@ -0,0 +1,3 @@
# Todo
- https://github.com/jlumpe/pyorg
- https://github.com/jlumpe/ox-json

View File

@@ -1,174 +0,0 @@
/* Fabric bar.css
* https://github.com/Fabric-Development/fabric/blob/rewrite/examples/bar/bar.css
* Modified with Camellia Theme: https://github.com/camellia-theme/camellia
*/
/* we can use webcss variables, fabric compiles that to gtk css.
global variables can be stored in :vars */
:vars {
/* Base colors from Camellia theme */
--background: #17181C; /* BG */
--mid-bg: #1E1F24; /* Mid BG */
--light-bg: #26272B; /* Light BG */
--dark-grey: #333438; /* Dark Grey */
--light-grey: #8F9093; /* Light Grey */
--dark-fg: #B0B1B4; /* Dark FG */
--mid-fg: #CBCCCE; /* Mid FG */
--foreground: #E4E5E7; /* FG */
/* Accent colors from Camellia theme */
--pink: #FA3867; /* Pink */
--orange: #F3872F; /* Orange */
--gold: #FEBD16; /* Gold */
--lime: #3FD43B; /* Lime */
--turquoise: #47E7CE; /* Turquoise */
--blue: #53ADE1; /* Blue */
--violet: #AD60FF; /* Violet */
--red: #FC3F2C; /* Red */
/* Functional variables */
--window-bg: alpha(var(--background), 0.9);
--module-bg: alpha(var(--mid-bg), 0.8);
--border-color: var(--light-bg);
--ws-active: var(--pink);
--ws-inactive: var(--blue);
--ws-empty: var(--dark-grey);
--ws-hover: var(--turquoise);
--ws-urgent: var(--red);
}
/* unset so we can style everything from the ground up. */
* {
all: unset;
color: var(--foreground);
font-size: 16px;
font-family: "Jost*", sans-serif;
}
button {
background-size: 400% 400%;
}
#bar-inner {
padding: 4px;
border-bottom: solid 2px;
border-color: var(--border-color);
background-color: var(--window-bg);
min-height: 28px;
}
#workspaces {
padding: 6px;
min-width: 0px;
background-color: var(--module-bg);
}
#workspaces>button {
padding: 0px 8px;
transition: padding 0.05s steps(8);
background-color: var(--foreground);
border-radius: 100px;
}
#workspaces>button>label {
font-size: 0px;
}
#workspaces>button:hover {
background-color: var(--ws-hover);
}
#workspaces>button.urgent {
background-color: var(--ws-urgent);
}
#workspaces>button.empty {
background-color: var(--ws-empty);
}
#workspaces>button.active {
padding: 0px 32px;
background-color: var(--ws-active);
}
#center-container {
color: var(--foreground);
}
.active-window {
color: var(--foreground);
font-weight: bold;
}
#date-time,
menu>menuitem>label,
#date-time>label,
/* system tray */
#system-tray {
padding: 2px 4px;
background-color: var(--module-bg);
}
/* menu and menu items (written for the system tray) */
menu {
border: solid 2px;
border-radius: 10px;
border-color: var(--border-color);
background-color: var(--window-bg);
}
menu>menuitem {
border-radius: 0px;
background-color: var(--module-bg);
padding: 6px;
margin-left: 2px;
margin-right: 2px;
}
menu>menuitem:first-child {
margin-top: 1px;
border-radius: 8px 8px 0px 0px;
}
menu>menuitem:last-child {
margin-bottom: 1px;
border-radius: 0px 0px 8px 8px;
}
menu>menuitem:hover {
background-color: var(--pink);
}
#cpu-progress-bar,
#ram-progress-bar,
#volume-progress-bar {
color: transparent;
background-color: transparent
}
#cpu-progress-bar {
border: solid 0px alpha(var(--violet), 0.8);
}
#ram-progress-bar,
#volume-progress-bar {
border: solid 0px var(--blue);
}
#widgets-container {
background-color: var(--module-bg);
padding: 2px;
}
#nixos-label {
color: var(--blue);
}
tooltip {
border: solid 2px;
border-color: var(--border-color);
background-color: var(--window-bg);
}
tooltip>* {
padding: 2px 4px
}

52
bar/config.py Normal file
View File

@@ -0,0 +1,52 @@
import yaml
import os
from platformdirs import user_config_dir
import argparse
APP_NAME = "makku_bar"
XDG_CONFIG_HOME = user_config_dir(appname=APP_NAME)
XDG_CONFIG_FILE = os.path.join(XDG_CONFIG_HOME, "config.yaml")
def load_config(config_path=XDG_CONFIG_FILE):
"""Loads configuration from a YAML file."""
if config_path is None:
print("No configuration file path provided or found.")
return None
try:
with open(config_path, "r") as f:
config = yaml.safe_load(f)
return config
except FileNotFoundError:
print(f"Error: Configuration file not found at {config_path}")
return None
except yaml.YAMLError as e:
print(f"Error parsing YAML file '{config_path}': {e}")
return None
except Exception as e:
print(f"An unexpected error occurred loading config file '{config_path}': {e}")
return None
def load_args():
parser = argparse.ArgumentParser(description="makku_bar")
parser.add_argument(
"-c",
"--config",
help="Path to a custom configuration file.",
type=str,
)
args = parser.parse_args()
return args.config
app_config = load_config() if not load_args() else load_config(load_args())
if app_config is None:
raise Exception("Config file missing")
VINYL = app_config.get("vinyl", {"enable": False})

61
bar/main.py Normal file
View File

@@ -0,0 +1,61 @@
from loguru import logger
from fabric import Application
from fabric.system_tray.widgets import SystemTray
from fabric.widgets.wayland import WaylandWindow as Window
from fabric.utils import (
get_relative_path,
)
from .modules.bar import StatusBar
from .modules.window_fuzzy import FuzzyWindowFinder
from .services.river.widgets import get_river_connection
from .services.wlr.event_loop import WaylandEventLoopService
tray = SystemTray(name="system-tray", spacing=4)
wayland_event_loop = WaylandEventLoopService()
river = get_river_connection()
dummy = Window(visible=False)
finder = FuzzyWindowFinder()
bar_windows = []
app = Application("bar", dummy, finder)
app.set_stylesheet_from_file(get_relative_path("styles/main.css"))
def spawn_bars():
logger.info("[Bar] Spawning bars after river ready")
outputs = river.outputs
if not outputs:
logger.warning("[Bar] No outputs found — skipping bar spawn")
return
output_ids = sorted(outputs.keys())
for i, output_id in enumerate(output_ids):
bar = StatusBar(
display=output_id,
tray=tray if i == 0 else None,
monitor=i,
river_service=river,
)
bar_windows.append(bar)
return False
def main():
if river.ready:
spawn_bars()
else:
river.connect("notify::ready", lambda sender, pspec: spawn_bars())
app.run()
if __name__ == "__main__":
main()

View File

@@ -1,78 +1,36 @@
# fabric bar.py example
# https://github.com/Fabric-Development/fabric/blob/rewrite/examples/bar/bar.py
import psutil
from fabric import Application
from fabric.widgets.box import Box
from fabric.widgets.label import Label
from fabric.widgets.overlay import Overlay
from fabric.widgets.eventbox import EventBox
from fabric.widgets.datetime import DateTime
from fabric.widgets.centerbox import CenterBox
from fabric.system_tray.widgets import SystemTray
from fabric.widgets.circularprogressbar import CircularProgressBar
from bar.modules.player import Player
from bar.modules.vinyl import VinylButton
from fabric.widgets.wayland import WaylandWindow as Window
from .river.widgets import RiverWorkspaces, RiverWorkspaceButton, RiverActiveWindow
from fabric.system_tray.widgets import SystemTray
from fabric.utils import (
FormattedString,
bulk_replace,
invoke_repeater,
get_relative_path,
)
from fabric.widgets.circularprogressbar import CircularProgressBar
AUDIO_WIDGET = True
if AUDIO_WIDGET is True:
try:
from fabric.audio.service import Audio
except Exception as e:
print(e)
AUDIO_WIDGET = False
class VolumeWidget(Box):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.audio = Audio()
self.progress_bar = CircularProgressBar(
name="volume-progress-bar", pie=True, size=24
)
self.event_box = EventBox(
events="scroll",
child=Overlay(
child=self.progress_bar,
overlays=Label(
label="",
style="margin: 0px 6px 0px 0px; font-size: 12px", # to center the icon glyph
),
),
)
self.audio.connect("notify::speaker", self.on_speaker_changed)
self.event_box.connect("scroll-event", self.on_scroll)
self.add(self.event_box)
def on_scroll(self, _, event):
match event.direction:
case 0:
self.audio.speaker.volume += 8
case 1:
self.audio.speaker.volume -= 8
return
def on_speaker_changed(self, *_):
if not self.audio.speaker:
return
self.progress_bar.value = self.audio.speaker.volume / 100
self.audio.speaker.bind(
"volume", "value", self.progress_bar, lambda _, v: v / 100
)
return
from bar.config import VINYL
from bar.services.river.widgets import (
RiverWorkspaces,
RiverWorkspaceButton,
RiverActiveWindow,
get_river_connection,
)
class StatusBar(Window):
def __init__(self, display: int, monitor: int = 1, with_system_tray: bool = False):
def __init__(
self,
display: int,
tray: SystemTray | None = None,
monitor: int = 1,
river_service=None,
):
super().__init__(
name="bar",
layer="top",
@@ -83,18 +41,21 @@ class StatusBar(Window):
all_visible=False,
monitor=monitor,
)
if river_service:
self.river = river_service
self.workspaces = RiverWorkspaces(
display,
name="workspaces",
spacing=4,
buttons_factory=lambda ws_id: RiverWorkspaceButton(id=ws_id, label=None),
river_service=self.river,
)
self.date_time = DateTime(name="date-time", formatters="%d %b - %H:%M")
self.system_tray = None
if with_system_tray:
self.system_tray = SystemTray(name="system-tray", spacing=4)
self.system_tray = tray
self.active_window = RiverActiveWindow(
river_service=self.river,
name="active-window",
max_length=50,
style="color: #ffffff; font-size: 14px; font-weight: bold;",
@@ -106,13 +67,18 @@ class StatusBar(Window):
self.cpu_progress_bar = CircularProgressBar(
name="cpu-progress-bar", pie=True, size=24
)
self.progress_label = Label(
"", style="margin: 0px 6px 0px 0px; font-size: 12px"
)
self.progress_bars_overlay = Overlay(
child=self.ram_progress_bar,
overlays=[
self.cpu_progress_bar,
Label("", style="margin: 0px 6px 0px 0px; font-size: 12px"),
],
overlays=[self.cpu_progress_bar, self.progress_label],
)
self.player = Player()
self.vinyl = None
if VINYL["enable"]:
self.vinyl = VinylButton()
self.status_container = Box(
name="widgets-container",
@@ -120,19 +86,18 @@ class StatusBar(Window):
orientation="h",
children=self.progress_bars_overlay,
)
self.status_container.add(VolumeWidget()) if AUDIO_WIDGET is True else None
end_container_children = [
self.status_container,
self.date_time,
]
end_container_children = []
if self.vinyl:
end_container_children.append(self.vinyl)
end_container_children.append(self.status_container)
if self.system_tray:
end_container_children.append(self.system_tray)
end_container_children.append(self.date_time)
if self.system_tray is not None:
end_container_children = [
self.status_container,
self.system_tray,
self.date_time,
]
self.children = CenterBox(
name="bar-inner",
start_children=Box(
@@ -166,16 +131,3 @@ class StatusBar(Window):
self.ram_progress_bar.value = psutil.virtual_memory().percent / 100
self.cpu_progress_bar.value = psutil.cpu_percent() / 100
return True
def main():
bar = StatusBar(45)
bar_two = StatusBar(44, monitor=2, with_system_tray=True)
app = Application("bar", bar, bar_two)
app.set_stylesheet_from_file(get_relative_path("bar.css"))
app.run()
if __name__ == "__main__":
main()

274
bar/modules/cavalcade.py Normal file
View File

@@ -0,0 +1,274 @@
import os
import struct
import subprocess
import re
import ctypes
import signal
from gi.repository import GLib, Gtk, Gdk
from loguru import logger
from math import pi
from fabric.widgets.overlay import Overlay
from fabric.utils.helpers import get_relative_path
import configparser
def get_bars(file_path):
config = configparser.ConfigParser()
config.read(file_path)
return int(config["general"]["bars"])
CAVA_CONFIG = get_relative_path("../config/cavalcade/cava.ini")
bars = get_bars(CAVA_CONFIG)
def set_death_signal():
"""
Set the death signal of the child process to SIGTERM so that if the parent
process is killed, the child (cava) is automatically terminated.
"""
libc = ctypes.CDLL("libc.so.6")
PR_SET_PDEATHSIG = 1
libc.prctl(PR_SET_PDEATHSIG, signal.SIGTERM)
class Cava:
"""
CAVA wrapper.
Launch cava process with certain settings and read output.
"""
NONE = 0
RUNNING = 1
RESTARTING = 2
CLOSING = 3
def __init__(self, mainapp):
self.bars = bars
self.path = "/tmp/cava.fifo"
self.cava_config_file = CAVA_CONFIG
self.data_handler = mainapp.draw.update
self.command = ["cava", "-p", self.cava_config_file]
self.state = self.NONE
self.env = dict(os.environ)
self.env["LC_ALL"] = "en_US.UTF-8" # not sure if it's necessary
is_16bit = True
self.byte_type, self.byte_size, self.byte_norm = (
("H", 2, 65535) if is_16bit else ("B", 1, 255)
)
if not os.path.exists(self.path):
os.mkfifo(self.path)
self.fifo_fd = None
self.fifo_dummy_fd = None
self.io_watch_id = None
def _run_process(self):
logger.debug("Launching cava process...")
try:
self.process = subprocess.Popen(
self.command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env=self.env,
preexec_fn=set_death_signal, # Ensure cava gets killed when the parent dies.
)
logger.debug("cava successfully launched!")
self.state = self.RUNNING
except Exception:
logger.exception("Fail to launch cava")
def _start_io_reader(self):
logger.debug("Activating GLib IO watch for cava stream handler")
# Open FIFO in non-blocking mode for reading
self.fifo_fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
# Open dummy write end to prevent getting an EOF on our FIFO
self.fifo_dummy_fd = os.open(self.path, os.O_WRONLY | os.O_NONBLOCK)
self.io_watch_id = GLib.io_add_watch(
self.fifo_fd, GLib.IO_IN, self._io_callback
)
def _io_callback(self, source, condition):
chunk = self.byte_size * self.bars # number of bytes for given format
try:
data = os.read(self.fifo_fd, chunk)
except OSError as e:
# logger.error("Error reading FIFO: {}".format(e))
return False
# When no data is read, do not remove the IO watch immediately.
if len(data) < chunk:
# Instead of closing the FIFO, we log a warning and continue.
# logger.warning("Incomplete data packet received (expected {} bytes, got {}). Waiting for more data...".format(chunk, len(data)))
# Returning True keeps the IO watch active. A real EOF will only occur when the writer closes.
return True
fmt = self.byte_type * self.bars # format string for struct.unpack
sample = [i / self.byte_norm for i in struct.unpack(fmt, data)]
GLib.idle_add(self.data_handler, sample)
return True
def _on_stop(self):
logger.debug("Cava stream handler deactivated")
if self.state == self.RESTARTING:
self.start()
elif self.state == self.RUNNING:
self.state = self.NONE
logger.error("Cava process was unexpectedly terminated.")
# self.restart() # May cause infinity loop, need more check
def start(self):
"""Launch cava"""
self._start_io_reader()
self._run_process()
def restart(self):
"""Restart cava process"""
if self.state == self.RUNNING:
logger.debug("Restarting cava process (normal mode) ...")
self.state = self.RESTARTING
if self.process.poll() is None:
self.process.kill()
elif self.state == self.NONE:
logger.warning("Restarting cava process (after crash) ...")
self.start()
def close(self):
"""Stop cava process"""
self.state = self.CLOSING
if self.process.poll() is None:
self.process.kill()
if self.io_watch_id:
GLib.source_remove(self.io_watch_id)
if self.fifo_fd:
os.close(self.fifo_fd)
if self.fifo_dummy_fd:
os.close(self.fifo_dummy_fd)
if os.path.exists(self.path):
os.remove(self.path)
class AttributeDict(dict):
"""Dictionary with keys as attributes. Does nothing but easy reading"""
def __getattr__(self, attr):
return self.get(attr, 3)
def __setattr__(self, attr, value):
self[attr] = value
class Spectrum:
"""Spectrum drawing"""
def __init__(self):
self.silence_value = 0
self.audio_sample = []
self.color = None
self.area = Gtk.DrawingArea()
self.area.connect("draw", self.redraw)
self.area.add_events(Gdk.EventMask.BUTTON_PRESS_MASK)
self.sizes = AttributeDict()
self.sizes.area = AttributeDict()
self.sizes.bar = AttributeDict()
self.silence = 10
self.max_height = 12
self.area.connect("configure-event", self.size_update)
self.color_update()
def is_silence(self, value):
"""Check if volume level critically low during last iterations"""
self.silence_value = 0 if value > 0 else self.silence_value + 1
return self.silence_value > self.silence
def update(self, data):
"""Audio data processing"""
self.color_update()
self.audio_sample = data
if not self.is_silence(self.audio_sample[0]):
self.area.queue_draw()
elif self.silence_value == (self.silence + 1):
self.audio_sample = [0] * self.sizes.number
self.area.queue_draw()
def redraw(self, widget, cr):
"""Draw spectrum graph"""
cr.set_source_rgba(*self.color)
dx = 3
center_y = self.sizes.area.height / 2 # center vertical of the drawing area
for i, value in enumerate(self.audio_sample):
width = self.sizes.area.width / self.sizes.number - self.sizes.padding
radius = width / 2
height = max(self.sizes.bar.height * min(value, 1), self.sizes.zero) / 2
if height == self.sizes.zero / 2 + 1:
height *= 0.5
height = min(height, self.max_height)
# Draw rectangle and arcs for rounded ends
cr.rectangle(dx, center_y - height, width, height * 2)
cr.arc(dx + radius, center_y - height, radius, 0, 2 * pi)
cr.arc(dx + radius, center_y + height, radius, 0, 2 * pi)
cr.close_path()
dx += width + self.sizes.padding
cr.fill()
def size_update(self, *args):
"""Update drawing geometry"""
self.sizes.number = bars
self.sizes.padding = 100 / bars
self.sizes.zero = 0
self.sizes.area.width = self.area.get_allocated_width()
self.sizes.area.height = self.area.get_allocated_height() - 2
tw = self.sizes.area.width - self.sizes.padding * (self.sizes.number - 1)
self.sizes.bar.width = max(int(tw / self.sizes.number), 1)
self.sizes.bar.height = self.sizes.area.height
def color_update(self):
"""Set drawing color according to current settings by reading primary color from CSS"""
color = "#a5c8ff" # default value
try:
with open(get_relative_path("../styles/colors.css"), "r") as f:
content = f.read()
m = re.search(r"--primary:\s*(#[0-9a-fA-F]{6})", content)
if m:
color = m.group(1)
except Exception as e:
logger.error("Failed to read primary color: {}".format(e))
red = int(color[1:3], 16) / 255
green = int(color[3:5], 16) / 255
blue = int(color[5:7], 16) / 255
self.color = Gdk.RGBA(red=red, green=green, blue=blue, alpha=1.0)
class SpectrumRender:
def __init__(self, mode=None, **kwargs):
super().__init__(**kwargs)
self.mode = mode
self.draw = Spectrum()
self.cava = Cava(self)
self.cava.start()
def get_spectrum_box(self):
# Get the spectrum box
box = Overlay(name="cavalcade", h_align="center", v_align="center")
box.set_size_request(180, 40)
box.add_overlay(self.draw.area)
return box

738
bar/modules/player.py Normal file
View File

@@ -0,0 +1,738 @@
import os
import urllib.parse
import urllib.request
import tempfile
from gi.repository import Gtk, GLib, Gio, Gdk
from fabric.widgets.box import Box
from fabric.widgets.centerbox import CenterBox
from fabric.widgets.label import Label
from fabric.widgets.button import Button
from fabric.widgets.circularprogressbar import CircularProgressBar
from fabric.widgets.overlay import Overlay
from fabric.widgets.stack import Stack
from ..widgets.circle_image import CircleImage
import bar.modules.icons as icons
from bar.services.mpris import MprisPlayerManager, MprisPlayer
# from bar.modules.cavalcade import SpectrumRender
def get_player_icon_markup_by_name(player_name):
if player_name:
pn = player_name.lower()
if pn == "firefox":
return icons.firefox
elif pn == "spotify":
return icons.spotify
elif pn in ("chromium", "brave"):
return icons.chromium
return icons.disc
def add_hover_cursor(widget):
widget.add_events(Gdk.EventMask.ENTER_NOTIFY_MASK | Gdk.EventMask.LEAVE_NOTIFY_MASK)
widget.connect(
"enter-notify-event",
lambda w, event: w.get_window().set_cursor(
Gdk.Cursor.new_from_name(Gdk.Display.get_default(), "pointer")
),
)
widget.connect(
"leave-notify-event", lambda w, event: w.get_window().set_cursor(None)
)
class PlayerBox(Box):
def __init__(self, mpris_player=None):
super().__init__(
orientation="v", h_align="fill", spacing=0, h_expand=False, v_expand=True
)
self.mpris_player = mpris_player
self._progress_timer_id = None # Initialize timer ID
self.cover = CircleImage(
name="player-cover",
image_file=os.path.expanduser("~/Pictures/wallpaper/background.jpg"),
size=162,
h_align="center",
v_align="center",
)
self.cover_placerholder = CircleImage(
name="player-cover",
size=198,
h_align="center",
v_align="center",
)
self.title = Label(
name="player-title",
h_expand=True,
h_align="fill",
ellipsization="end",
max_chars_width=1,
)
self.album = Label(
name="player-album",
h_expand=True,
h_align="fill",
ellipsization="end",
max_chars_width=1,
)
self.artist = Label(
name="player-artist",
h_expand=True,
h_align="fill",
ellipsization="end",
max_chars_width=1,
)
self.progressbar = CircularProgressBar(
name="player-progress",
size=198,
h_align="center",
v_align="center",
start_angle=180,
end_angle=360,
)
self.time = Label(name="player-time", label="--:-- / --:--")
self.overlay = Overlay(
child=self.cover_placerholder,
overlays=[self.progressbar, self.cover],
)
self.overlay_container = CenterBox(
name="player-overlay", center_children=[self.overlay]
)
self.title.set_label("Nothing Playing")
self.album.set_label("Enjoy the silence")
self.artist.set_label("¯\\_(ツ)_/¯")
self.progressbar.set_value(0.0)
self.prev = Button(
name="player-btn",
child=Label(name="player-btn-label", markup=icons.prev),
)
self.backward = Button(
name="player-btn",
child=Label(name="player-btn-label", markup=icons.skip_back),
)
self.play_pause = Button(
name="player-btn",
child=Label(name="player-btn-label", markup=icons.play),
)
self.forward = Button(
name="player-btn",
child=Label(name="player-btn-label", markup=icons.skip_forward),
)
self.next = Button(
name="player-btn",
child=Label(name="player-btn-label", markup=icons.next),
)
# Add hover effect to buttons
add_hover_cursor(self.prev)
add_hover_cursor(self.backward)
add_hover_cursor(self.play_pause)
add_hover_cursor(self.forward)
add_hover_cursor(self.next)
self.btn_box = CenterBox(
name="player-btn-box",
orientation="h",
center_children=[
Box(
orientation="h",
spacing=8,
h_expand=True,
h_align="fill",
children=[
self.prev,
self.backward,
self.play_pause,
self.forward,
self.next,
],
)
],
)
self.player_box = Box(
name="player-box",
orientation="v",
spacing=8,
children=[
self.overlay_container,
self.title,
self.album,
self.artist,
self.btn_box,
self.time,
],
)
self.add(self.player_box)
if mpris_player:
self._apply_mpris_properties() # This will handle starting the timer if needed
self.prev.connect("clicked", self._on_prev_clicked)
self.play_pause.connect("clicked", self._on_play_pause_clicked)
self.backward.connect("clicked", self._on_backward_clicked)
self.forward.connect("clicked", self._on_forward_clicked)
self.next.connect("clicked", self._on_next_clicked)
self.mpris_player.connect("changed", self._on_mpris_changed)
else:
self.play_pause.get_child().set_markup(icons.stop)
# Ensure buttons are disabled visually if no player
self.backward.add_style_class("disabled")
self.forward.add_style_class("disabled")
self.prev.add_style_class("disabled")
self.next.add_style_class("disabled")
self.progressbar.set_value(0.0)
self.time.set_text("--:-- / --:--")
def _apply_mpris_properties(self):
mp = self.mpris_player
self.title.set_visible(bool(mp.title and mp.title.strip()))
if mp.title and mp.title.strip():
self.title.set_text(mp.title)
self.album.set_visible(bool(mp.album and mp.album.strip()))
if mp.album and mp.album.strip():
self.album.set_text(mp.album)
self.artist.set_visible(bool(mp.artist and mp.artist.strip()))
if mp.artist and mp.artist.strip():
self.artist.set_text(mp.artist)
if mp.arturl:
parsed = urllib.parse.urlparse(mp.arturl)
if parsed.scheme == "file":
local_arturl = urllib.parse.unquote(parsed.path)
self._set_cover_image(local_arturl)
elif parsed.scheme in ("http", "https"):
GLib.Thread.new(
"download-artwork", self._download_and_set_artwork, mp.arturl
)
else:
self._set_cover_image(mp.arturl)
else:
fallback = os.path.expanduser("~/Pictures/wallpaper/background.jpg")
self._set_cover_image(fallback)
file_obj = Gio.File.new_for_path(fallback)
monitor = file_obj.monitor_file(Gio.FileMonitorFlags.NONE, None)
monitor.connect("changed", self.on_wallpaper_changed)
self._wallpaper_monitor = monitor
self.update_play_pause_icon()
# Keep progress bar and time visible always
self.progressbar.set_visible(True)
self.time.set_visible(True)
player_name = (
mp.player_name.lower()
if hasattr(mp, "player_name") and mp.player_name
else ""
)
can_seek = hasattr(mp, "can_seek") and mp.can_seek
if player_name == "firefox" or not can_seek:
# Disable seeking buttons and reset progress/time display
self.backward.add_style_class("disabled")
self.forward.add_style_class("disabled")
self.progressbar.set_value(0.0)
self.time.set_text("--:-- / --:--")
# Stop the timer if it's running
if self._progress_timer_id:
GLib.source_remove(self._progress_timer_id)
self._progress_timer_id = None
else:
# Enable seeking buttons
self.backward.remove_style_class("disabled")
self.forward.remove_style_class("disabled")
# Start the timer if it's not already running
if not self._progress_timer_id:
self._progress_timer_id = GLib.timeout_add(1000, self._update_progress)
# Initial progress update if possible
self._update_progress() # Call once for immediate update
# Enable/disable prev/next based on capabilities
if hasattr(mp, "can_go_previous") and mp.can_go_previous:
self.prev.remove_style_class("disabled")
else:
self.prev.add_style_class("disabled")
if hasattr(mp, "can_go_next") and mp.can_go_next:
self.next.remove_style_class("disabled")
else:
self.next.add_style_class("disabled")
def _set_cover_image(self, image_path):
if image_path and os.path.isfile(image_path):
self.cover.set_image_from_file(image_path)
else:
fallback = os.path.expanduser("~/Pictures/wallpaper/background.jpg")
self.cover.set_image_from_file(fallback)
file_obj = Gio.File.new_for_path(fallback)
monitor = file_obj.monitor_file(Gio.FileMonitorFlags.NONE, None)
monitor.connect("changed", self.on_wallpaper_changed)
self._wallpaper_monitor = monitor
def _download_and_set_artwork(self, arturl):
"""
Download the artwork from the given URL asynchronously and update the cover image
using GLib.idle_add to ensure UI updates occur on the main thread.
"""
try:
parsed = urllib.parse.urlparse(arturl)
suffix = os.path.splitext(parsed.path)[1] or ".png"
with urllib.request.urlopen(arturl) as response:
data = response.read()
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
temp_file.write(data)
temp_file.close()
local_arturl = temp_file.name
except Exception:
local_arturl = os.path.expanduser("~/.current.wall")
GLib.idle_add(self._set_cover_image, local_arturl)
return None
def update_play_pause_icon(self):
if self.mpris_player.playback_status == "playing":
self.play_pause.get_child().set_markup(icons.pause)
else:
self.play_pause.get_child().set_markup(icons.play)
def on_wallpaper_changed(self, monitor, file, other_file, event):
self.cover.set_image_from_file(os.path.expanduser("~/.current.wall"))
# --- Control methods, defined only once each ---
def _on_prev_clicked(self, button):
if self.mpris_player:
self.mpris_player.previous()
def _on_play_pause_clicked(self, button):
if self.mpris_player:
self.mpris_player.play_pause()
self.update_play_pause_icon()
def _on_backward_clicked(self, button):
# Only seek if player exists, can seek, and button is not disabled
if (
self.mpris_player
and self.mpris_player.can_seek
and "disabled" not in self.backward.get_style_context().list_classes()
):
new_pos = max(0, self.mpris_player.position - 5000000) # 5 seconds backward
self.mpris_player.position = new_pos
def _on_forward_clicked(self, button):
# Only seek if player exists, can seek, and button is not disabled
if (
self.mpris_player
and self.mpris_player.can_seek
and "disabled" not in self.forward.get_style_context().list_classes()
):
new_pos = self.mpris_player.position + 5000000 # 5 seconds forward
self.mpris_player.position = new_pos
def _on_next_clicked(self, button):
if self.mpris_player:
self.mpris_player.next()
def _update_progress(self):
# Timer is now only active if can_seek is true, so no need for the initial check
if not self.mpris_player: # Still need to check if player exists
# Should not happen if timer logic is correct, but good safeguard
if self._progress_timer_id:
GLib.source_remove(self._progress_timer_id)
self._progress_timer_id = None
return False # Stop timer
try:
current = self.mpris_player.position
except Exception:
current = 0
try:
total = int(self.mpris_player.length or 0)
except Exception:
total = 0
# Prevent division by zero or invalid updates
if total <= 0:
progress = 0.0
self.time.set_text("--:-- / --:--")
# Don't stop the timer here, length might become available later
else:
progress = current / total
self.time.set_text(
f"{self._format_time(current)} / {self._format_time(total)}"
)
self.progressbar.set_value(progress)
return True # Continue the timer
def _format_time(self, us):
seconds = int(us / 1000000)
minutes = seconds // 60
seconds = seconds % 60
return f"{minutes}:{seconds:02}"
def _update_metadata(self):
if not self.mpris_player:
return False
self._apply_mpris_properties()
return True
def _on_mpris_changed(self, *args):
# Debounce metadata updates to avoid excessive work on the main thread.
if not hasattr(self, "_update_pending") or not self._update_pending:
self._update_pending = True
# Use idle_add for potentially faster UI response than timeout_add(100)
GLib.idle_add(self._apply_mpris_properties_debounced)
def _apply_mpris_properties_debounced(self):
# Ensure player still exists before applying properties
if self.mpris_player:
self._apply_mpris_properties()
else:
# Player vanished, ensure timer is stopped if it was running
if self._progress_timer_id:
GLib.source_remove(self._progress_timer_id)
self._progress_timer_id = None
self._update_pending = False
return False
class Player(Box):
def __init__(self):
super().__init__(
name="player",
orientation="v",
h_align="fill",
spacing=0,
h_expand=False,
v_expand=True,
)
self.player_stack = Stack(
name="player-stack",
transition_type="slide-left-right",
transition_duration=500,
v_align="center",
v_expand=True,
)
self.switcher = Gtk.StackSwitcher(
name="player-switcher",
spacing=8,
)
self.switcher.set_stack(self.player_stack)
self.switcher.set_halign(Gtk.Align.CENTER)
self.mpris_manager = MprisPlayerManager()
players = self.mpris_manager.players
if players:
for p in players:
mp = MprisPlayer(p)
pb = PlayerBox(mpris_player=mp)
self.player_stack.add_titled(pb, mp.player_name, mp.player_name)
else:
pb = PlayerBox(mpris_player=None)
self.player_stack.add_titled(pb, "nothing", "Nothing Playing")
self.mpris_manager.connect("player-appeared", self.on_player_appeared)
self.mpris_manager.connect("player-vanished", self.on_player_vanished)
self.switcher.set_visible(True)
self.add(self.player_stack)
self.add(self.switcher)
GLib.idle_add(self._replace_switcher_labels)
def on_player_appeared(self, manager, player):
children = self.player_stack.get_children()
if len(children) == 1 and not getattr(children[0], "mpris_player", None):
self.player_stack.remove(children[0])
mp = MprisPlayer(player)
pb = PlayerBox(mpris_player=mp)
self.player_stack.add_titled(pb, mp.player_name, mp.player_name)
# Timer is now started conditionally within PlayerBox.__init__
self.switcher.set_visible(True)
GLib.idle_add(lambda: self._update_switcher_for_player(mp.player_name))
GLib.idle_add(self._replace_switcher_labels)
def on_player_vanished(self, manager, player_name):
for child in self.player_stack.get_children():
if (
hasattr(child, "mpris_player")
and child.mpris_player
and child.mpris_player.player_name == player_name
):
self.player_stack.remove(child)
break
if not any(
getattr(child, "mpris_player", None)
for child in self.player_stack.get_children()
):
pb = PlayerBox(mpris_player=None)
self.player_stack.add_titled(pb, "nothing", "Nothing Playing")
self.switcher.set_visible(True)
GLib.idle_add(self._replace_switcher_labels)
def _replace_switcher_labels(self):
buttons = self.switcher.get_children()
for btn in buttons:
if isinstance(btn, Gtk.ToggleButton):
default_label = None
for child in btn.get_children():
if isinstance(child, Gtk.Label):
default_label = child
break
if default_label:
label_player_name = getattr(
default_label, "player_name", default_label.get_text().lower()
)
icon_markup = get_player_icon_markup_by_name(label_player_name)
btn.remove(default_label)
new_label = Label(name="player-label", markup=icon_markup)
new_label.player_name = label_player_name
btn.add(new_label)
new_label.show_all()
return False
def _update_switcher_for_player(self, player_name):
for btn in self.switcher.get_children():
if isinstance(btn, Gtk.ToggleButton):
default_label = None
for child in btn.get_children():
if isinstance(child, Gtk.Label):
default_label = child
break
if default_label:
label_player_name = getattr(
default_label, "player_name", default_label.get_text().lower()
)
if label_player_name == player_name.lower():
icon_markup = get_player_icon_markup_by_name(player_name)
btn.remove(default_label)
new_label = Label(name="player-label", markup=icon_markup)
new_label.player_name = player_name.lower()
btn.add(new_label)
new_label.show_all()
return False
class PlayerSmall(CenterBox):
def __init__(self):
super().__init__(
name="player-small", orientation="h", h_align="fill", v_align="center"
)
self._show_artist = False # toggle flag
self._display_options = ["cavalcade", "title", "artist"]
self._display_index = 0
self._current_display = "cavalcade"
self.mpris_icon = Button(
name="compact-mpris-icon",
h_align="center",
v_align="center",
child=Label(name="compact-mpris-icon-label", markup=icons.disc),
)
# Remove scroll events; instead, add button press events.
self.mpris_icon.add_events(Gdk.EventMask.BUTTON_PRESS_MASK)
self.mpris_icon.connect("button-press-event", self._on_icon_button_press)
# Prevent the child from propagating events.
child = self.mpris_icon.get_child()
child.add_events(Gdk.EventMask.BUTTON_PRESS_MASK)
child.connect("button-press-event", lambda widget, event: True)
# Add hover effect
add_hover_cursor(self.mpris_icon)
self.mpris_label = Label(
name="compact-mpris-label",
label="Nothing Playing",
ellipsization="end",
max_chars_width=26,
h_align="center",
)
self.mpris_button = Button(
name="compact-mpris-button",
h_align="center",
v_align="center",
child=Label(name="compact-mpris-button-label", markup=icons.play),
)
self.mpris_button.add_events(Gdk.EventMask.BUTTON_PRESS_MASK)
self.mpris_button.connect(
"button-press-event", self._on_play_pause_button_press
)
# Add hover effect
add_hover_cursor(self.mpris_button)
# self.cavalcade = SpectrumRender()
# self.cavalcade_box = self.cavalcade.get_spectrum_box()
self.center_stack = Stack(
name="compact-mpris",
transition_type="crossfade",
transition_duration=100,
v_align="center",
v_expand=False,
children=[
# self.cavalcade_box,
self.mpris_label,
],
)
# self.center_stack.set_visible_child(self.cavalcade_box) # default to cavalcade
# Create additional compact view.
self.mpris_small = CenterBox(
name="compact-mpris",
orientation="h",
h_expand=True,
h_align="fill",
v_align="center",
v_expand=False,
start_children=self.mpris_icon,
center_children=self.center_stack, # Changed to center_stack to handle stack switching
end_children=self.mpris_button,
)
self.add(self.mpris_small)
self.mpris_manager = MprisPlayerManager()
self.mpris_player = None
# Almacenar el índice del reproductor actual
self.current_index = 0
players = self.mpris_manager.players
if players:
mp = MprisPlayer(players[self.current_index])
self.mpris_player = mp
self._apply_mpris_properties()
self.mpris_player.connect("changed", self._on_mpris_changed)
else:
self._apply_mpris_properties()
self.mpris_manager.connect("player-appeared", self.on_player_appeared)
self.mpris_manager.connect("player-vanished", self.on_player_vanished)
self.mpris_button.connect("clicked", self._on_play_pause_clicked)
def _apply_mpris_properties(self):
if not self.mpris_player:
self.mpris_label.set_text("Nothing Playing")
self.mpris_button.get_child().set_markup(icons.stop)
self.mpris_icon.get_child().set_markup(icons.disc)
if self._current_display != "cavalcade":
self.center_stack.set_visible_child(
self.mpris_label
) # if was title or artist, keep showing label
# else:
# self.center_stack.set_visible_child(
# self.cavalcade_box
# ) # default to cavalcade if no player
return
mp = self.mpris_player
# Choose icon based on player name.
player_name = (
mp.player_name.lower()
if hasattr(mp, "player_name") and mp.player_name
else ""
)
icon_markup = get_player_icon_markup_by_name(player_name)
self.mpris_icon.get_child().set_markup(icon_markup)
self.update_play_pause_icon()
if self._current_display == "title":
text = mp.title if mp.title and mp.title.strip() else "Nothing Playing"
self.mpris_label.set_text(text)
self.center_stack.set_visible_child(self.mpris_label)
elif self._current_display == "artist":
text = mp.artist if mp.artist else "Nothing Playing"
self.mpris_label.set_text(text)
self.center_stack.set_visible_child(self.mpris_label)
# else: # default cavalcade
# self.center_stack.set_visible_child(self.cavalcade_box)
def _on_icon_button_press(self, widget, event):
from gi.repository import Gdk
if event.type == Gdk.EventType.BUTTON_PRESS:
players = self.mpris_manager.players
if not players:
return True
if event.button == 2: # Middle-click: cycle display
self._display_index = (self._display_index + 1) % len(
self._display_options
)
self._current_display = self._display_options[self._display_index]
self._apply_mpris_properties() # Re-apply to update label/cavalcade
return True
# Cambiar de reproductor según el botón presionado.
if event.button == 1: # Left-click: next player
self.current_index = (self.current_index + 1) % len(players)
elif event.button == 3: # Right-click: previous player
self.current_index = (self.current_index - 1) % len(players)
if self.current_index < 0:
self.current_index = len(players) - 1
mp_new = MprisPlayer(players[self.current_index])
self.mpris_player = mp_new
# Conectar el evento "changed" para que se actualice
self.mpris_player.connect("changed", self._on_mpris_changed)
self._apply_mpris_properties()
return True # Se consume el evento
return True
def _on_play_pause_button_press(self, widget, event):
if event.type == Gdk.EventType.BUTTON_PRESS:
if event.button == 1: # Click izquierdo -> track anterior
if self.mpris_player:
self.mpris_player.previous()
self.mpris_button.get_child().set_markup(icons.prev)
GLib.timeout_add(500, self._restore_play_pause_icon)
elif event.button == 3: # Click derecho -> siguiente track
if self.mpris_player:
self.mpris_player.next()
self.mpris_button.get_child().set_markup(icons.next)
GLib.timeout_add(500, self._restore_play_pause_icon)
elif event.button == 2: # Click medio -> play/pausa
if self.mpris_player:
self.mpris_player.play_pause()
self.update_play_pause_icon()
return True
return True
def _restore_play_pause_icon(self):
self.update_play_pause_icon()
return False
def _on_icon_clicked(
self, widget
): # No longer used, logic moved to _on_icon_button_press
pass
def update_play_pause_icon(self):
if self.mpris_player and self.mpris_player.playback_status == "playing":
self.mpris_button.get_child().set_markup(icons.pause)
else:
self.mpris_button.get_child().set_markup(icons.play)
def _on_play_pause_clicked(self, button):
if self.mpris_player:
self.mpris_player.play_pause()
self.update_play_pause_icon()
def _on_mpris_changed(self, *args):
# Update properties when the player's state changes.
self._apply_mpris_properties()
def on_player_appeared(self, manager, player):
# When a new player appears, use it if no player is active.
if not self.mpris_player:
mp = MprisPlayer(player)
self.mpris_player = mp
self._apply_mpris_properties()
self.mpris_player.connect("changed", self._on_mpris_changed)
def on_player_vanished(self, manager, player_name):
players = self.mpris_manager.players
if (
players
and self.mpris_player
and self.mpris_player.player_name == player_name
):
if players: # Check if players is not empty after vanishing
self.current_index = self.current_index % len(players)
new_player = MprisPlayer(players[self.current_index])
self.mpris_player = new_player
self.mpris_player.connect("changed", self._on_mpris_changed)
else:
self.mpris_player = None # No players left
elif not players:
self.mpris_player = None
self._apply_mpris_properties()

92
bar/modules/vinyl.py Normal file
View File

@@ -0,0 +1,92 @@
from fabric.widgets.box import Box
from fabric.widgets.label import Label
from fabric.widgets.eventbox import EventBox
from fabric.widgets.overlay import Overlay
from fabric.core.service import Property
import subprocess
class VinylButton(Box):
@Property(bool, "read-write", default_value=False)
def active(self) -> bool:
return self._active
@active.setter
def active(self, value: bool):
self._active = value
# Update appearance based on state
self._update_appearance()
# Execute shell command based on new state
if self._active:
self._execute_active_command()
else:
self._execute_inactive_command()
def __init__(
self,
active_command="""pw-link alsa_input.pci-0000_12_00.6.analog-stereo:capture_FL alsa_output.usb-BEHRINGER_UMC1820_A71E9E3E-00.multichannel-output:playback_AUX0
pw-link alsa_input.pci-0000_12_00.6.analog-stereo:capture_FR alsa_output.usb-BEHRINGER_UMC1820_A71E9E3E-00.multichannel-output:playback_AUX1""",
inactive_command="""pw-link -d alsa_input.pci-0000_12_00.6.analog-stereo:capture_FL alsa_output.usb-BEHRINGER_UMC1820_A71E9E3E-00.multichannel-output:playback_AUX0
pw-link -d alsa_input.pci-0000_12_00.6.analog-stereo:capture_FR alsa_output.usb-BEHRINGER_UMC1820_A71E9E3E-00.multichannel-output:playback_AUX1 """,
**kwargs,
):
super().__init__(**kwargs)
# Initialize properties
self._active = False
self._active_command = active_command
self._inactive_command = inactive_command
# Set up the icon
self.icon = Label(
label="", # CD icon
name="vinyl-icon",
style="",
)
# Set up event box to handle clicks
self.event_box = EventBox(
events="button-press",
child=Overlay(
child=self.icon,
),
name="vinyl-button",
)
# Connect click event
self.event_box.connect("button-press-event", self._on_clicked)
# Add to parent box
self.add(self.event_box)
# Initialize appearance
self._update_appearance()
def _update_appearance(self):
"""Update CSS class based on active state"""
if self._active:
self.add_style_class("active")
else:
self.remove_style_class("active")
def _on_clicked(self, _, event):
"""Handle button click event"""
if event.button == 1: # Left click
# Toggle active state
self.active = not self.active
return True
def _execute_active_command(self):
"""Execute shell command when button is activated"""
try:
subprocess.Popen(self._active_command, shell=True)
except Exception as e:
print(f"Error executing active command: {e}")
def _execute_inactive_command(self):
"""Execute shell command when button is deactivated"""
try:
subprocess.Popen(self._inactive_command, shell=True)
except Exception as e:
print(f"Error executing inactive command: {e}")

48
bar/modules/volume.py Normal file
View File

@@ -0,0 +1,48 @@
from fabric.widgets.circularprogressbar import CircularProgressBar
from fabric.audio.service import Audio
from fabric.widgets.eventbox import EventBox
from fabric.widgets.box import Box
from fabric.widgets.overlay import Overlay
from fabric.widgets.label import Label
class VolumeWidget(Box):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.audio = Audio()
self.progress_bar = CircularProgressBar(
name="volume-progress-bar", pie=True, size=24
)
self.event_box = EventBox(
events="scroll",
child=Overlay(
child=self.progress_bar,
overlays=Label(
label="",
style="margin: 0px 6px 0px 0px; font-size: 12px", # to center the icon glyph
),
),
)
self.audio.connect("notify::speaker", self.on_speaker_changed)
self.event_box.connect("scroll-event", self.on_scroll)
self.add(self.event_box)
def on_scroll(self, _, event):
match event.direction:
case 0:
self.audio.speaker.volume += 8
case 1:
self.audio.speaker.volume -= 8
return
def on_speaker_changed(self, *_):
if not self.audio.speaker:
return
self.progress_bar.value = self.audio.speaker.volume / 100
self.audio.speaker.bind(
"volume", "value", self.progress_bar, lambda _, v: v / 100
)
return

View File

@@ -0,0 +1,88 @@
import operator
from fabric.widgets.wayland import WaylandWindow as Window
from fabric.widgets.box import Box
from fabric.widgets.label import Label
from fabric.widgets.entry import Entry
from gi.repository import Gdk
from bar.services.wlr.service import WaylandWindowTracker, Window as WaylandWindow
from pywayland.client import Display
class FuzzyWindowFinder(Window):
def __init__(
self,
monitor: int = 0,
):
super().__init__(
name="finder",
anchor="center",
monitor=monitor,
keyboard_mode="on-demand",
type="popup",
visible=False,
)
self.window_tracker = WaylandWindowTracker()
self.window_tracker.ready_signal.connect(lambda *_: print("Tracker is ready"))
self._all_windows: list[WaylandWindow] = []
self.viewport = Box(name="viewport", spacing=4, orientation="v")
self.search_entry = Entry(
name="search-entry",
placeholder="Search Windows...",
h_expand=True,
editable=True,
notify_text=self.notify_text,
on_activate=lambda entry, *_: self.on_search_entry_activate(
entry.get_text()
),
on_key_press_event=self.on_search_entry_key_press,
)
self.picker_box = Box(
name="picker-box",
spacing=4,
orientation="v",
children=[self.search_entry, self.viewport],
)
self.add(self.picker_box)
self.arrange_viewport("")
def open(self):
self._all_windows = self.window_tracker.windows
print(self._all_windows[0])
self.arrange_viewport("")
self.show()
def notify_text(self, entry, *_):
text = entry.get_text()
self.arrange_viewport(text) # Update list on typing
print(text)
def on_search_entry_key_press(self, widget, event):
# if event.keyval in (Gdk.KEY_Up, Gdk.KEY_Down, Gdk.KEY_Left, Gdk.KEY_Right):
# self.move_selection_2d(event.keyval)
# return True
print(event.keyval)
if event.keyval == Gdk.KEY_Return:
self.window_tracker.activate_window(self._filtered[0])
if event.keyval in [Gdk.KEY_Escape, 103]:
self.hide()
return True
return False
def on_search_entry_activate(self, text):
print(f"activate {text}")
def arrange_viewport(self, query: str = ""):
self.viewport.children = [] # Clear previous entries
self._filtered = [
w for w in self._all_windows if query.lower() in w.title.lower()
]
titles = [w.title for w in self._filtered]
for window in titles:
self.viewport.add(
Box(name="slot-box", orientation="h", children=[Label(label=window)])
)

View File

@@ -1,3 +0,0 @@
from .service import River
__all__ = ["River"]

0
bar/services/__init__.py Normal file
View File

282
bar/services/mpris.py Normal file
View File

@@ -0,0 +1,282 @@
# Standard library imports
import contextlib
# Third-party imports
import gi
from gi.repository import GLib # type: ignore
from loguru import logger
# Fabric imports
from fabric.core.service import Property, Service, Signal
from fabric.utils import bulk_connect
class PlayerctlImportError(ImportError):
"""An error to raise when playerctl is not installed."""
def __init__(self, *args):
super().__init__(
"Playerctl is not installed, please install it first",
*args,
)
# Try to import Playerctl, raise custom error if not available
try:
gi.require_version("Playerctl", "2.0")
from gi.repository import Playerctl
except ValueError:
raise PlayerctlImportError
class MprisPlayer(Service):
"""A service to manage a mpris player."""
@Signal
def exit(self, value: bool) -> bool: ...
@Signal
def changed(self) -> None: ...
def __init__(
self,
player: Playerctl.Player,
**kwargs,
):
self._signal_connectors: dict = {}
self._player: Playerctl.Player = player
super().__init__(**kwargs)
for sn in ["playback-status", "loop-status", "shuffle", "volume", "seeked"]:
self._signal_connectors[sn] = self._player.connect(
sn,
lambda *args, sn=sn: self.notifier(sn, args),
)
self._signal_connectors["exit"] = self._player.connect(
"exit",
self.on_player_exit,
)
self._signal_connectors["metadata"] = self._player.connect(
"metadata",
lambda *args: self.update_status(),
)
GLib.idle_add(lambda *args: self.update_status_once())
def update_status(self):
# schedule each notifier asynchronously.
def notify_property(prop):
if self.get_property(prop) is not None:
self.notifier(prop)
for prop in [
"metadata",
"title",
"artist",
"arturl",
"length",
]:
GLib.idle_add(lambda p=prop: (notify_property(p), False))
for prop in [
"can-seek",
"can-pause",
"can-shuffle",
"can-go-next",
"can-go-previous",
]:
GLib.idle_add(lambda p=prop: (self.notifier(p), False))
def update_status_once(self):
# schedule notifier calls for each property
def notify_all():
for prop in self.list_properties(): # type: ignore
self.notifier(prop.name)
return False
GLib.idle_add(notify_all, priority=GLib.PRIORITY_DEFAULT_IDLE)
def notifier(self, name: str, args=None):
def notify_and_emit():
self.notify(name)
self.emit("changed")
return False
GLib.idle_add(notify_and_emit, priority=GLib.PRIORITY_DEFAULT_IDLE)
def on_player_exit(self, player):
for id in list(self._signal_connectors.values()):
with contextlib.suppress(Exception):
self._player.disconnect(id)
del self._signal_connectors
GLib.idle_add(lambda: (self.emit("exit", True), False))
del self._player
def toggle_shuffle(self):
if self.can_shuffle:
# schedule the shuffle toggle in the GLib idle loop
GLib.idle_add(lambda: (setattr(self, "shuffle", not self.shuffle), False))
# else do nothing
def play_pause(self):
if self.can_pause:
GLib.idle_add(lambda: (self._player.play_pause(), False))
def next(self):
if self.can_go_next:
GLib.idle_add(lambda: (self._player.next(), False))
def previous(self):
if self.can_go_previous:
GLib.idle_add(lambda: (self._player.previous(), False))
# Properties
@Property(str, "readable")
def player_name(self) -> int:
return self._player.get_property("player-name") # type: ignore
@Property(int, "read-write", default_value=0)
def position(self) -> int:
return self._player.get_property("position") # type: ignore
@position.setter
def position(self, new_pos: int):
self._player.set_position(new_pos)
@Property(object, "readable")
def metadata(self) -> dict:
return self._player.get_property("metadata") # type: ignore
@Property(str or None, "readable")
def arturl(self) -> str | None:
if "mpris:artUrl" in self.metadata.keys(): # type: ignore # noqa: SIM118
return self.metadata["mpris:artUrl"] # type: ignore
return None
@Property(str or None, "readable")
def length(self) -> str | None:
if "mpris:length" in self.metadata.keys(): # type: ignore # noqa: SIM118
return self.metadata["mpris:length"] # type: ignore
return None
@Property(str, "readable")
def artist(self) -> str:
artist = self._player.get_artist() # type: ignore
if isinstance(artist, (list, tuple)):
return ", ".join(artist)
return artist
@Property(str, "readable")
def album(self) -> str:
return self._player.get_album() # type: ignore
@Property(str, "readable")
def title(self):
return self._player.get_title()
@Property(bool, "read-write", default_value=False)
def shuffle(self) -> bool:
return self._player.get_property("shuffle") # type: ignore
@shuffle.setter
def shuffle(self, do_shuffle: bool):
self.notifier("shuffle")
return self._player.set_shuffle(do_shuffle)
@Property(str, "readable")
def playback_status(self) -> str:
return {
Playerctl.PlaybackStatus.PAUSED: "paused",
Playerctl.PlaybackStatus.PLAYING: "playing",
Playerctl.PlaybackStatus.STOPPED: "stopped",
}.get(self._player.get_property("playback_status"), "unknown") # type: ignore
@Property(str, "read-write")
def loop_status(self) -> str:
return {
Playerctl.LoopStatus.NONE: "none",
Playerctl.LoopStatus.TRACK: "track",
Playerctl.LoopStatus.PLAYLIST: "playlist",
}.get(self._player.get_property("loop_status"), "unknown") # type: ignore
@loop_status.setter
def loop_status(self, status: str):
loop_status = {
"none": Playerctl.LoopStatus.NONE,
"track": Playerctl.LoopStatus.TRACK,
"playlist": Playerctl.LoopStatus.PLAYLIST,
}.get(status)
self._player.set_loop_status(loop_status) if loop_status else None
@Property(bool, "readable", default_value=False)
def can_go_next(self) -> bool:
return self._player.get_property("can_go_next") # type: ignore
@Property(bool, "readable", default_value=False)
def can_go_previous(self) -> bool:
return self._player.get_property("can_go_previous") # type: ignore
@Property(bool, "readable", default_value=False)
def can_seek(self) -> bool:
return self._player.get_property("can_seek") # type: ignore
@Property(bool, "readable", default_value=False)
def can_pause(self) -> bool:
return self._player.get_property("can_pause") # type: ignore
@Property(bool, "readable", default_value=False)
def can_shuffle(self) -> bool:
try:
self._player.set_shuffle(self._player.get_property("shuffle"))
return True
except Exception:
return False
@Property(bool, "readable", default_value=False)
def can_loop(self) -> bool:
try:
self._player.set_shuffle(self._player.get_property("shuffle"))
return True
except Exception:
return False
class MprisPlayerManager(Service):
"""A service to manage mpris players."""
@Signal
def player_appeared(self, player: Playerctl.Player) -> Playerctl.Player: ...
@Signal
def player_vanished(self, player_name: str) -> str: ...
def __init__(
self,
**kwargs,
):
self._manager = Playerctl.PlayerManager.new()
bulk_connect(
self._manager,
{
"name-appeared": self.on_name_appeard,
"name-vanished": self.on_name_vanished,
},
)
self.add_players()
super().__init__(**kwargs)
def on_name_appeard(self, manager, player_name: Playerctl.PlayerName):
logger.info(f"[MprisPlayer] {player_name.name} appeared")
new_player = Playerctl.Player.new_from_name(player_name)
manager.manage_player(new_player)
self.emit("player-appeared", new_player) # type: ignore
def on_name_vanished(self, manager, player_name: Playerctl.PlayerName):
logger.info(f"[MprisPlayer] {player_name.name} vanished")
self.emit("player-vanished", player_name.name) # type: ignore
def add_players(self):
for player in self._manager.get_property("player-names"): # type: ignore
self._manager.manage_player(Playerctl.Player.new_from_name(player)) # type: ignore
@Property(object, "readable")
def players(self):
return self._manager.get_property("players") # type: ignore

View File

@@ -0,0 +1,3 @@
from .service import River, RiverEvent
__all__ = ["River", "RiverEvent"]

View File

View File

@@ -16,14 +16,8 @@
from __future__ import annotations
from pywayland.protocol_core import (
Argument,
ArgumentType,
Global,
Interface,
Proxy,
Resource,
)
from pywayland.protocol_core import (Argument, ArgumentType, Global, Interface,
Proxy, Resource)
class ZriverCommandCallbackV1(Interface):

View File

@@ -25,7 +25,7 @@ from pywayland.protocol_core import (
Resource,
)
from ..wayland import WlSeat
from pywayland.protocol.wayland import WlSeat
from .zriver_command_callback_v1 import ZriverCommandCallbackV1

View File

@@ -16,14 +16,8 @@
from __future__ import annotations
from pywayland.protocol_core import (
Argument,
ArgumentType,
Global,
Interface,
Proxy,
Resource,
)
from pywayland.protocol_core import (Argument, ArgumentType, Global, Interface,
Proxy, Resource)
class ZriverOutputStatusV1(Interface):

View File

@@ -16,16 +16,9 @@
from __future__ import annotations
from pywayland.protocol_core import (
Argument,
ArgumentType,
Global,
Interface,
Proxy,
Resource,
)
from pywayland.protocol.wayland import WlOutput
from pywayland.protocol_core import (Argument, ArgumentType, Global, Interface,
Proxy, Resource)
class ZriverSeatStatusV1(Interface):

View File

@@ -16,17 +16,10 @@
from __future__ import annotations
from pywayland.protocol_core import (
Argument,
ArgumentType,
Global,
Interface,
Proxy,
Resource,
)
from pywayland.protocol.wayland import WlOutput, WlSeat
from pywayland.protocol_core import (Argument, ArgumentType, Global, Interface,
Proxy, Resource)
from pywayland.protocol.wayland import WlOutput
from pywayland.protocol.wayland import WlSeat
from .zriver_output_status_v1 import ZriverOutputStatusV1
from .zriver_seat_status_v1 import ZriverSeatStatusV1

View File

@@ -1,17 +1,19 @@
import os
import threading
import time
from loguru import logger
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Set
from typing import Any, Dict, List, Optional, Set
from fabric.core.service import Service, Signal, Property
from fabric.core.service import Property, Service, Signal
from fabric.utils.helpers import idle_add
from gi.repository import GLib
from loguru import logger
# Import pywayland components - ensure these imports are correct
from pywayland.client import Display
from pywayland.protocol.wayland import WlOutput, WlRegistry, WlSeat
from .generated.river_status_unstable_v1 import ZriverStatusManagerV1
from pywayland.protocol.wayland import WlOutput, WlSeat
from .protocols.generated.river_control_unstable_v1 import ZriverControlV1
from .protocols.generated.river_status_unstable_v1 import ZriverStatusManagerV1
@dataclass
@@ -23,6 +25,7 @@ class OutputInfo:
status: Any = None # ZriverOutputStatusV1
tags_view: List[int] = field(default_factory=list)
tags_focused: List[int] = field(default_factory=list)
tags_urgent: List[int] = field(default_factory=list)
@dataclass(frozen=True)
@@ -47,7 +50,7 @@ class River(Service):
return self._active_window_title
@Signal
def ready(self):
def ready_signal(self):
return self.notify("ready")
@Signal("event", flags="detailed")
@@ -59,23 +62,22 @@ class River(Service):
self._ready = False
self._active_window_title = ""
self.outputs: Dict[int, OutputInfo] = {}
self._display = None
self.river_status_mgr = None
self.river_control = None
self.seat = None
self.seat_status = None
# Start the connection in a separate thread
self.river_thread = threading.Thread(
target=self._river_connection_task, daemon=True, name="river-status-service"
self.river_thread = GLib.Thread.new(
"river-status-service", self._river_connection_task
)
self.river_thread.start()
def _river_connection_task(self):
"""Main thread that connects to River and listens for events"""
try:
# Create a new display connection - THIS IS WHERE THE ERROR OCCURS
logger.info("[RiverService] Starting connection to River")
# Let's add some more diagnostic info to help troubleshoot
logger.debug(
f"[RiverService] XDG_RUNTIME_DIR={os.environ.get('XDG_RUNTIME_DIR', 'Not set')}"
)
@@ -83,32 +85,24 @@ class River(Service):
f"[RiverService] WAYLAND_DISPLAY={os.environ.get('WAYLAND_DISPLAY', 'Not set')}"
)
# Create the display connection
# with Display() as display:
# registry = display.get_registry()
# logger.debug("[RiverService] Registry obtained")
# Discover globals
display = Display("wayland-1")
display.connect()
logger.debug("[RiverService] Display connection created")
self._display = Display()
self._display.connect()
# Get the registry
registry = display.get_registry()
registry = self._display.get_registry()
logger.debug("[RiverService] Registry obtained")
# Create state object to hold our data
state = {
"display": display,
"display": self._display,
"registry": registry,
"outputs": {},
"river_status_mgr": None,
"river_control": None,
"seat": None,
"seat_status": None,
}
# Set up registry handlers - using more direct approach like your example
def handle_global(registry, name, iface, version):
logger.debug(
f"[RiverService] Global: {iface} (v{version}, name={name})"
@@ -118,6 +112,11 @@ class River(Service):
name, ZriverStatusManagerV1, version
)
logger.info("[RiverService] Found river status manager")
elif iface == "zriver_control_v1":
state["river_control"] = registry.bind(
name, ZriverControlV1, version
)
logger.info("[RiverService] Found river control interface")
elif iface == "wl_output":
output = registry.bind(name, WlOutput, version)
state["outputs"][name] = OutputInfo(name=name, output=output)
@@ -143,7 +142,7 @@ class River(Service):
# Discover globals
logger.debug("[RiverService] Performing initial roundtrip")
display.roundtrip()
self._display.roundtrip()
# Check if we found the river status manager
if not state["river_status_mgr"]:
@@ -152,6 +151,12 @@ class River(Service):
# Handle the window title updates through seat status
if not state["river_control"]:
logger.error(
"[RiverService] River control interface not found - falling back to riverctl"
)
# You could still fall back to the old riverctl method here if needed
def focused_view_handler(_, title):
logger.debug(f"[RiverService] Focused view title: {title}")
self._active_window_title = title
@@ -190,32 +195,42 @@ class River(Service):
return handler
def make_urgent_tags_handler(output_id):
def handler(_, tags):
decoded = self._decode_bitfields(tags)
state["outputs"][output_id].tags_urgent = decoded
logger.debug(
f"[RiverService] Output {output_id} urgent tags: {decoded}"
)
idle_add(lambda: self._emit_urgent_tags(output_id, decoded))
return handler
# Bind output status listeners
for name, info in list(state["outputs"].items()):
status = state["river_status_mgr"].get_river_output_status(info.output)
status.dispatcher["view_tags"] = make_view_tags_handler(name)
status.dispatcher["focused_tags"] = make_focused_tags_handler(name)
status.dispatcher["urgent_tags"] = make_urgent_tags_handler(name)
info.status = status
logger.info(f"[RiverService] Set up status for output {name}")
# Initial data fetch
logger.debug("[RiverService] Performing second roundtrip")
display.roundtrip()
self._display.roundtrip()
# Update our outputs dictionary
self.outputs.update(state["outputs"])
self.river_status_mgr = state["river_status_mgr"]
self.river_control = state["river_control"]
self.seat = state["seat"]
self.seat_status = state.get("seat_status")
# Mark service as ready
idle_add(self._set_ready)
# Main event loop
logger.info("[RiverService] Entering main event loop")
while True:
display.dispatch(block=True)
time.sleep(0.01) # Small sleep to prevent CPU spinning
self._display.dispatch(block=True)
except Exception as e:
logger.error(f"[RiverService] Error in River connection: {e}")
@@ -223,11 +238,13 @@ class River(Service):
logger.error(traceback.format_exc())
return True
def _set_ready(self):
"""Set the service as ready (called on main thread via idle_add)"""
self._ready = True
logger.info("[RiverService] Service ready")
self.ready.emit()
self.ready_signal.emit()
return False # Don't repeat
def _emit_view_tags(self, output_id, tags):
@@ -251,6 +268,13 @@ class River(Service):
self.notify("active-window")
return False # Don't repeat
def _emit_urgent_tags(self, output_id, tags):
"""Emit urgent_tags events (called on main thread)"""
event = RiverEvent("urgent_tags", tags, output_id)
self.emit("event::urgent_tags", event)
self.emit(f"event::urgent_tags::{output_id}", tags)
return False # Don't repeat
@staticmethod
def _decode_bitfields(bitfields) -> List[int]:
"""Decode River's tag bitfields into a list of tag indices"""
@@ -267,8 +291,48 @@ class River(Service):
return sorted(tags)
def run_command(self, command, *args):
def run_command(self, command, *args, callback=None):
"""Run a riverctl command"""
if not self.river_control or not self.seat:
logger.warning(
"[RiverService] River control or seat not available, falling back to riverctl"
)
return self._run_command_fallback(command, *args)
self.river_control.add_argument(command)
for arg in args:
self.river_control.add_argument(str(arg))
# Execute the command
command_callback = self.river_control.run_command(self.seat)
# Set up callback handlers
result = {"stdout": None, "stderr": None, "success": None}
def handle_success(_, output):
logger.debug(f"[RiverService] Command success: {output}")
result["stdout"] = output
result["success"] = True
if callback:
idle_add(lambda: callback(True, output, None))
def handle_failure(_, failure_message):
logger.debug(f"[RiverService] Command failure: {failure_message}")
result["stderr"] = failure_message
result["success"] = False
if callback:
idle_add(lambda: callback(False, None, failure_message))
command_callback.dispatcher["success"] = handle_success
command_callback.dispatcher["failure"] = handle_failure
if hasattr(self, "_display"):
self._display.flush()
return True
def _run_command_fallback(self, command, *args):
"""Fallback to riverctl"""
import subprocess
cmd = ["riverctl", command] + [str(arg) for arg in args]
@@ -282,7 +346,7 @@ class River(Service):
)
return None
def toggle_focused_tag(self, tag):
def toggle_focused_tag(self, tag, callback=None):
"""Toggle a tag in the focused tags"""
tag_mask = 1 << int(tag)
self.run_command("set-focused-tags", str(tag_mask))
self.run_command("set-focused-tags", str(tag_mask), callback=callback)

View File

@@ -1,23 +1,21 @@
from loguru import logger
from fabric.core.service import Property
from fabric.widgets.button import Button
from fabric.widgets.box import Box
from fabric.widgets.button import Button
from fabric.widgets.eventbox import EventBox
from fabric.widgets.label import Label
from fabric.utils.helpers import bulk_connect
from gi.repository import Gdk
from loguru import logger
from .service import River
from gi.repository import Gdk
_connection: River | None = None
connection: River | None = None
def get_river_connection() -> River:
global _connection
if not _connection:
_connection = River()
return _connection
global connection
if not connection:
connection = River()
return connection
class RiverWorkspaceButton(Button):
@@ -43,20 +41,47 @@ class RiverWorkspaceButton(Button):
self._empty = value
(self.remove_style_class if not value else self.add_style_class)("empty")
@Property(bool, "read-write", default_value=False)
def urgent(self) -> bool:
return self._urgent
@urgent.setter
def urgent(self, value: bool):
self._urgent = value
self._update_style()
def __init__(self, id: int, label: str = None, **kwargs):
super().__init__(label or str(id), **kwargs)
self._id = id
self._active = False
self._empty = True
self._urgent = False
def _update_style(self):
"""Update button styles based on states"""
# Remove all state-related styles first
self.remove_style_class("active")
self.remove_style_class("empty")
self.remove_style_class("urgent")
# Then apply current states
if self._active:
self.add_style_class("active")
if self._empty:
self.add_style_class("empty")
if self._urgent:
self.add_style_class("urgent")
class RiverWorkspaces(EventBox):
def __init__(self, output_id=None, max_tags=9, **kwargs):
def __init__(self, output_id, river_service=None, max_tags=9, **kwargs):
super().__init__(events="scroll")
self.service = get_river_connection()
self._box = Box(**kwargs)
self.children = self._box
if river_service:
self.river = river_service
# Store output_id as received
self.output_id = output_id
@@ -69,49 +94,41 @@ class RiverWorkspaces(EventBox):
self._box.add(btn)
# Connect to service events
self.service.connect("event::focused_tags", self.on_focus_change_general)
self.service.connect("event::view_tags", self.on_view_change_general)
self.service.connect("event::output_removed", self.on_output_removed)
self.river.connect("event::focused_tags", self.on_focus_change_general)
self.river.connect("event::view_tags", self.on_view_change_general)
self.river.connect("event::urgent_tags", self.on_urgent_change_general)
self.river.connect("event::output_removed", self.on_output_removed)
# Initial setup when service is ready
if self.service.ready:
if self.river.ready:
self.on_ready(None)
else:
self.service.connect("event::ready", self.on_ready)
self.river.connect("event::ready", self.on_ready)
self.connect("scroll-event", self.on_scroll)
def on_ready(self, _):
"""Initialize widget state when service is ready"""
logger.debug(
f"[RiverWorkspaces] Service ready, outputs: {list(self.service.outputs.keys())}"
)
# If no output_id was specified, take the first one
if self.output_id is None and self.service.outputs:
self.output_id = next(iter(self.service.outputs.keys()))
if self.output_id is None and self.river.outputs:
self.output_id = next(iter(self.river.outputs.keys()))
logger.info(f"[RiverWorkspaces] Selected output {self.output_id}")
# Initialize state from selected output
if self.output_id is not None and self.output_id in self.service.outputs:
output_info = self.service.outputs[self.output_id]
if self.output_id is not None and self.output_id in self.river.outputs:
output_info = self.river.outputs[self.output_id]
# Initialize buttons with current state
# Access fields directly on the OutputInfo dataclass
focused_tags = output_info.tags_focused
view_tags = output_info.tags_view
logger.debug(
f"[RiverWorkspaces] Initial state - focused: {focused_tags}, view: {view_tags}"
)
urgent_tags = output_info.tags_urgent
for i, btn in self._buttons.items():
btn.active = i in focused_tags
btn.empty = i not in view_tags
btn.urgent = i in urgent_tags
def on_focus_change(self, _, tags):
"""Handle focused tags change for our specific output"""
logger.debug(
logger.info(
f"[RiverWorkspaces] Focus change on output {self.output_id}: {tags}"
)
for i, btn in self._buttons.items():
@@ -119,9 +136,7 @@ class RiverWorkspaces(EventBox):
def on_view_change(self, _, tags):
"""Handle view tags change for our specific output"""
logger.debug(
f"[RiverWorkspaces] View change on output {self.output_id}: {tags}"
)
logger.info(f"[RiverWorkspaces] View change on output {self.output_id}: {tags}")
for i, btn in self._buttons.items():
btn.empty = i not in tags
@@ -129,7 +144,7 @@ class RiverWorkspaces(EventBox):
"""Handle general focused tags event"""
# Only handle event if it's for our output
if event.output_id == self.output_id:
logger.debug(
logger.info(
f"[RiverWorkspaces] General focus change for output {self.output_id}"
)
self.on_focus_change(_, event.data)
@@ -138,11 +153,28 @@ class RiverWorkspaces(EventBox):
"""Handle general view tags event"""
# Only handle event if it's for our output
if event.output_id == self.output_id:
logger.debug(
logger.info(
f"[RiverWorkspaces] General view change for output {self.output_id}"
)
self.on_view_change(_, event.data)
def on_urgent_change(self, _, tags):
"""Handle urgent tags change for our specific output"""
logger.info(
f"[RiverWorkspaces] Urgent change on output {self.output_id}: {tags}"
)
for i, btn in self._buttons.items():
btn.urgent = i in tags
def on_urgent_change_general(self, _, event):
"""Handle general urgent tags event"""
# Only handle event if it's for our output
if event.output_id == self.output_id:
logger.info(
f"[RiverWorkspaces] General urgent change for output {self.output_id}"
)
self.on_urgent_change(_, event.data)
def on_output_removed(self, _, event):
"""Handle output removal"""
removed_id = event.data[0]
@@ -151,13 +183,13 @@ class RiverWorkspaces(EventBox):
logger.info(f"[RiverWorkspaces] Our output {self.output_id} was removed")
# Try to find another output
if self.service.outputs:
self.output_id = next(iter(self.service.outputs.keys()))
if self.river.outputs:
self.output_id = next(iter(self.river.outputs.keys()))
logger.info(f"[RiverWorkspaces] Switching to output {self.output_id}")
# Update state for new output
if self.output_id in self.service.outputs:
output_info = self.service.outputs[self.output_id]
if self.output_id in self.river.outputs:
output_info = self.river.outputs[self.output_id]
# Access fields directly on the OutputInfo dataclass
focused_tags = output_info.tags_focused
view_tags = output_info.tags_view
@@ -169,41 +201,44 @@ class RiverWorkspaces(EventBox):
def on_workspace_click(self, btn):
"""Handle workspace button click"""
logger.info(f"[RiverWorkspaces] Clicked on workspace {btn.id}")
self.service.toggle_focused_tag(btn.id)
self.river.toggle_focused_tag(btn.id)
def on_scroll(self, _, event):
"""Handle scroll events"""
direction = event.direction
if direction == Gdk.ScrollDirection.DOWN:
logger.info("[RiverWorkspaces] Scroll down - focusing next view")
self.service.run_command("focus-view", "next")
self.river.run_command("focus-view", "next")
elif direction == Gdk.ScrollDirection.UP:
logger.info("[RiverWorkspaces] Scroll up - focusing previous view")
self.service.run_command("focus-view", "previous")
self.river.run_command("focus-view", "previous")
class RiverActiveWindow(Label):
"""Widget to display the currently active window's title"""
def __init__(self, max_length=None, ellipsize="end", **kwargs):
def __init__(self, max_length=None, ellipsize="end", river_service=None, **kwargs):
super().__init__(**kwargs)
self.service = get_river_connection()
if river_service:
self.river = river_service
self.max_length = max_length
self.ellipsize = ellipsize
# Set initial state
if self.service.ready:
if self.river.ready:
self.on_ready(None)
else:
self.service.connect("event::ready", self.on_ready)
self.river.connect("event::ready", self.on_ready)
# Connect to active window changes
self.service.connect("event::active_window", self.on_active_window_changed)
self.river.connect("event::active_window", self.on_active_window_changed)
def on_ready(self, _):
"""Initialize widget when service is ready"""
logger.debug("[RiverActiveWindow] Service ready")
self.update_title(self.service.active_window)
logger.info("[RiverActiveWindow] Connected to service")
self.update_title(self.river.active_window)
def on_active_window_changed(self, _, event):
"""Update widget when active window changes"""

View File

@@ -0,0 +1,21 @@
from fabric.core.service import Service, Property
from pywayland.client import Display
from gi.repository import GLib
class WaylandEventLoopService(Service):
@Property(object, "readable", "display")
def display_property(self):
return self._display
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._display = Display()
self._display.connect()
self.thread = GLib.Thread.new("wayland-loop", self._loop)
def _loop(self):
while True:
self._display.dispatch(block=True)
print("DISPATCHING...")

View File

@@ -0,0 +1,233 @@
#!/usr/bin/env python3
import sys
from typing import Dict, List, Optional
import pywayland
from pywayland.client import Display
from pywayland.protocol.wayland import WlOutput, WlSeat
# Import the protocol interfaces from your files
from wlr_foreign_toplevel_management_unstable_v1.zwlr_foreign_toplevel_manager_v1 import (
ZwlrForeignToplevelManagerV1,
)
from wlr_foreign_toplevel_management_unstable_v1.zwlr_foreign_toplevel_handle_v1 import (
ZwlrForeignToplevelHandleV1,
)
class Window:
"""Represents a toplevel window in the compositor."""
def __init__(self, handle: ZwlrForeignToplevelHandleV1):
self.handle = handle
self.title: str = "Unknown"
self.app_id: str = "Unknown"
self.states: List[str] = []
self.outputs: List[WlOutput] = []
self.parent: Optional["Window"] = None
self.closed = False
def __str__(self) -> str:
state_str = (
", ".join([ZwlrForeignToplevelHandleV1.state(s).name for s in self.states])
if self.states
else "normal"
)
return (
f"Window(title='{self.title}', app_id='{self.app_id}', state={state_str})"
)
class WaylandWindowManager:
"""Manages Wayland windows using the foreign toplevel protocol."""
def __init__(self):
self.display = Display()
self.windows: Dict[ZwlrForeignToplevelHandleV1, Window] = {}
self.manager = None
self.running = False
def connect(self) -> bool:
"""Connect to the Wayland display and bind to the toplevel manager."""
try:
self.display.connect()
print("Connected to Wayland display")
# Get the registry to find the foreign toplevel manager
registry = self.display.get_registry()
registry.dispatcher["global"] = self._registry_global_handler
# Roundtrip to process registry events
self.display.roundtrip()
if not self.manager:
print(
"Foreign toplevel manager not found. Is wlr-foreign-toplevel-management protocol supported?"
)
return False
return True
except Exception as e:
print(f"Failed to connect: {e}")
return False
def _registry_global_handler(self, registry, id, interface, version):
"""Handle registry global objects."""
if interface == ZwlrForeignToplevelManagerV1.name:
print(f"Found foreign toplevel manager (id={id}, version={version})")
self.manager = registry.bind(
id, ZwlrForeignToplevelManagerV1, min(version, 3)
)
self.manager.dispatcher["toplevel"] = self._handle_toplevel
self.manager.dispatcher["finished"] = self._handle_manager_finished
def _handle_toplevel(self, manager, toplevel):
"""Handle a new toplevel window."""
window = Window(toplevel)
self.windows[toplevel] = window
print(window)
# Setup event dispatchers for the toplevel
toplevel.dispatcher["title"] = self._handle_title
toplevel.dispatcher["app_id"] = self._handle_app_id
toplevel.dispatcher["state"] = self._handle_state
toplevel.dispatcher["done"] = self._handle_done
toplevel.dispatcher["closed"] = self._handle_closed
toplevel.dispatcher["output_enter"] = self._handle_output_enter
toplevel.dispatcher["output_leave"] = self._handle_output_leave
def _handle_title(self, toplevel, title):
"""Handle toplevel title changes."""
window = self.windows.get(toplevel)
if window:
window.title = title
def _handle_app_id(self, toplevel, app_id):
"""Handle toplevel app_id changes."""
window = self.windows.get(toplevel)
if window:
window.app_id = app_id
def _handle_state(self, toplevel, states):
"""Handle toplevel state changes."""
window = self.windows.get(toplevel)
if window:
window.states = states
def _handle_done(self, toplevel):
"""Handle toplevel done event."""
window = self.windows.get(toplevel)
if window and not window.closed:
print(f"Window updated: {window}")
def _handle_closed(self, toplevel):
"""Handle toplevel closed event."""
window = self.windows.get(toplevel)
if window:
window.closed = True
print(f"Window closed: {window}")
# Clean up the toplevel object
toplevel.destroy()
# Remove from our dictionary
del self.windows[toplevel]
def _handle_output_enter(self, toplevel, output):
"""Handle toplevel entering an output."""
window = self.windows.get(toplevel)
if window and output not in window.outputs:
window.outputs.append(output)
def _handle_output_leave(self, toplevel, output):
"""Handle toplevel leaving an output."""
window = self.windows.get(toplevel)
if window and output in window.outputs:
window.outputs.remove(output)
def _handle_parent(self, toplevel, parent):
"""Handle toplevel parent changes."""
window = self.windows.get(toplevel)
if window:
if parent is None:
window.parent = None
else:
parent_window = self.windows.get(parent)
if parent_window:
window.parent = parent_window
def _handle_manager_finished(self, manager):
"""Handle manager finished event."""
print("Foreign toplevel manager finished")
self.running = False
def get_windows(self) -> List[Window]:
"""Get all currently active windows."""
# Filter out closed windows
active_windows = [
window for window in self.windows.values() if not window.closed
]
return active_windows
def run(self):
"""Run the event loop to receive window updates."""
self.running = True
print("Listening for window events (press Ctrl+C to exit)...")
try:
while self.running:
self.display.dispatch(block=True)
except KeyboardInterrupt:
print("\nExiting...")
finally:
self.cleanup()
def cleanup(self):
"""Clean up resources."""
print("cleanup")
if self.manager:
self.manager.stop()
# Destroy all toplevel handles
for toplevel, window in list(self.windows.items()):
if not window.closed:
toplevel.destroy()
# Disconnect from display
if self.display:
self.display.disconnect()
self.running = False
def main():
"""Main entry point."""
manager = WaylandWindowManager()
if not manager.connect():
return 1
# # Run for a short time to collect initial windows
for _ in range(1):
manager.display.dispatch(block=True)
# Print all windows
windows = manager.get_windows()
print("\nActive windows:")
if windows:
for i, window in enumerate(windows, 1):
print(f"{i}. {window}")
else:
print("No windows found")
# # Option to keep monitoring window events
# if len(sys.argv) > 1 and sys.argv[1] == "--monitor":
# manager.run()
# else:
manager.cleanup()
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,270 @@
<?xml version="1.0" encoding="UTF-8"?>
<protocol name="wlr_foreign_toplevel_management_unstable_v1">
<copyright>
Copyright © 2018 Ilia Bozhinov
Permission to use, copy, modify, distribute, and sell this
software and its documentation for any purpose is hereby granted
without fee, provided that the above copyright notice appear in
all copies and that both that copyright notice and this permission
notice appear in supporting documentation, and that the name of
the copyright holders not be used in advertising or publicity
pertaining to distribution of the software without specific,
written prior permission. The copyright holders make no
representations about the suitability of this software for any
purpose. It is provided "as is" without express or implied
warranty.
THE COPYRIGHT HOLDERS DISCLAIM ALL WARRANTIES WITH REGARD TO THIS
SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
FITNESS, IN NO EVENT SHALL THE COPYRIGHT HOLDERS BE LIABLE FOR ANY
SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION,
ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF
THIS SOFTWARE.
</copyright>
<interface name="zwlr_foreign_toplevel_manager_v1" version="3">
<description summary="list and control opened apps">
The purpose of this protocol is to enable the creation of taskbars
and docks by providing them with a list of opened applications and
letting them request certain actions on them, like maximizing, etc.
After a client binds the zwlr_foreign_toplevel_manager_v1, each opened
toplevel window will be sent via the toplevel event
</description>
<event name="toplevel">
<description summary="a toplevel has been created">
This event is emitted whenever a new toplevel window is created. It
is emitted for all toplevels, regardless of the app that has created
them.
All initial details of the toplevel(title, app_id, states, etc.) will
be sent immediately after this event via the corresponding events in
zwlr_foreign_toplevel_handle_v1.
</description>
<arg name="toplevel" type="new_id" interface="zwlr_foreign_toplevel_handle_v1"/>
</event>
<request name="stop">
<description summary="stop sending events">
Indicates the client no longer wishes to receive events for new toplevels.
However the compositor may emit further toplevel_created events, until
the finished event is emitted.
The client must not send any more requests after this one.
</description>
</request>
<event name="finished" type="destructor">
<description summary="the compositor has finished with the toplevel manager">
This event indicates that the compositor is done sending events to the
zwlr_foreign_toplevel_manager_v1. The server will destroy the object
immediately after sending this request, so it will become invalid and
the client should free any resources associated with it.
</description>
</event>
</interface>
<interface name="zwlr_foreign_toplevel_handle_v1" version="3">
<description summary="an opened toplevel">
A zwlr_foreign_toplevel_handle_v1 object represents an opened toplevel
window. Each app may have multiple opened toplevels.
Each toplevel has a list of outputs it is visible on, conveyed to the
client with the output_enter and output_leave events.
</description>
<event name="title">
<description summary="title change">
This event is emitted whenever the title of the toplevel changes.
</description>
<arg name="title" type="string"/>
</event>
<event name="app_id">
<description summary="app-id change">
This event is emitted whenever the app-id of the toplevel changes.
</description>
<arg name="app_id" type="string"/>
</event>
<event name="output_enter">
<description summary="toplevel entered an output">
This event is emitted whenever the toplevel becomes visible on
the given output. A toplevel may be visible on multiple outputs.
</description>
<arg name="output" type="object" interface="wl_output"/>
</event>
<event name="output_leave">
<description summary="toplevel left an output">
This event is emitted whenever the toplevel stops being visible on
the given output. It is guaranteed that an entered-output event
with the same output has been emitted before this event.
</description>
<arg name="output" type="object" interface="wl_output"/>
</event>
<request name="set_maximized">
<description summary="requests that the toplevel be maximized">
Requests that the toplevel be maximized. If the maximized state actually
changes, this will be indicated by the state event.
</description>
</request>
<request name="unset_maximized">
<description summary="requests that the toplevel be unmaximized">
Requests that the toplevel be unmaximized. If the maximized state actually
changes, this will be indicated by the state event.
</description>
</request>
<request name="set_minimized">
<description summary="requests that the toplevel be minimized">
Requests that the toplevel be minimized. If the minimized state actually
changes, this will be indicated by the state event.
</description>
</request>
<request name="unset_minimized">
<description summary="requests that the toplevel be unminimized">
Requests that the toplevel be unminimized. If the minimized state actually
changes, this will be indicated by the state event.
</description>
</request>
<request name="activate">
<description summary="activate the toplevel">
Request that this toplevel be activated on the given seat.
There is no guarantee the toplevel will be actually activated.
</description>
<arg name="seat" type="object" interface="wl_seat"/>
</request>
<enum name="state">
<description summary="types of states on the toplevel">
The different states that a toplevel can have. These have the same meaning
as the states with the same names defined in xdg-toplevel
</description>
<entry name="maximized" value="0" summary="the toplevel is maximized"/>
<entry name="minimized" value="1" summary="the toplevel is minimized"/>
<entry name="activated" value="2" summary="the toplevel is active"/>
<entry name="fullscreen" value="3" summary="the toplevel is fullscreen" since="2"/>
</enum>
<event name="state">
<description summary="the toplevel state changed">
This event is emitted immediately after the zlw_foreign_toplevel_handle_v1
is created and each time the toplevel state changes, either because of a
compositor action or because of a request in this protocol.
</description>
<arg name="state" type="array"/>
</event>
<event name="done">
<description summary="all information about the toplevel has been sent">
This event is sent after all changes in the toplevel state have been
sent.
This allows changes to the zwlr_foreign_toplevel_handle_v1 properties
to be seen as atomic, even if they happen via multiple events.
</description>
</event>
<request name="close">
<description summary="request that the toplevel be closed">
Send a request to the toplevel to close itself. The compositor would
typically use a shell-specific method to carry out this request, for
example by sending the xdg_toplevel.close event. However, this gives
no guarantees the toplevel will actually be destroyed. If and when
this happens, the zwlr_foreign_toplevel_handle_v1.closed event will
be emitted.
</description>
</request>
<request name="set_rectangle">
<description summary="the rectangle which represents the toplevel">
The rectangle of the surface specified in this request corresponds to
the place where the app using this protocol represents the given toplevel.
It can be used by the compositor as a hint for some operations, e.g
minimizing. The client is however not required to set this, in which
case the compositor is free to decide some default value.
If the client specifies more than one rectangle, only the last one is
considered.
The dimensions are given in surface-local coordinates.
Setting width=height=0 removes the already-set rectangle.
</description>
<arg name="surface" type="object" interface="wl_surface"/>
<arg name="x" type="int"/>
<arg name="y" type="int"/>
<arg name="width" type="int"/>
<arg name="height" type="int"/>
</request>
<enum name="error">
<entry name="invalid_rectangle" value="0"
summary="the provided rectangle is invalid"/>
</enum>
<event name="closed">
<description summary="this toplevel has been destroyed">
This event means the toplevel has been destroyed. It is guaranteed there
won't be any more events for this zwlr_foreign_toplevel_handle_v1. The
toplevel itself becomes inert so any requests will be ignored except the
destroy request.
</description>
</event>
<request name="destroy" type="destructor">
<description summary="destroy the zwlr_foreign_toplevel_handle_v1 object">
Destroys the zwlr_foreign_toplevel_handle_v1 object.
This request should be called either when the client does not want to
use the toplevel anymore or after the closed event to finalize the
destruction of the object.
</description>
</request>
<!-- Version 2 additions -->
<request name="set_fullscreen" since="2">
<description summary="request that the toplevel be fullscreened">
Requests that the toplevel be fullscreened on the given output. If the
fullscreen state and/or the outputs the toplevel is visible on actually
change, this will be indicated by the state and output_enter/leave
events.
The output parameter is only a hint to the compositor. Also, if output
is NULL, the compositor should decide which output the toplevel will be
fullscreened on, if at all.
</description>
<arg name="output" type="object" interface="wl_output" allow-null="true"/>
</request>
<request name="unset_fullscreen" since="2">
<description summary="request that the toplevel be unfullscreened">
Requests that the toplevel be unfullscreened. If the fullscreen state
actually changes, this will be indicated by the state event.
</description>
</request>
<!-- Version 3 additions -->
<event name="parent" since="3">
<description summary="parent change">
This event is emitted whenever the parent of the toplevel changes.
No event is emitted when the parent handle is destroyed by the client.
</description>
<arg name="parent" type="object" interface="zwlr_foreign_toplevel_handle_v1" allow-null="true"/>
</event>
</interface>
</protocol>

View File

@@ -0,0 +1,27 @@
# This file has been autogenerated by the pywayland scanner
# Copyright © 2018 Ilia Bozhinov
#
# Permission to use, copy, modify, distribute, and sell this
# software and its documentation for any purpose is hereby granted
# without fee, provided that the above copyright notice appear in
# all copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of
# the copyright holders not be used in advertising or publicity
# pertaining to distribution of the software without specific,
# written prior permission. The copyright holders make no
# representations about the suitability of this software for any
# purpose. It is provided "as is" without express or implied
# warranty.
#
# THE COPYRIGHT HOLDERS DISCLAIM ALL WARRANTIES WITH REGARD TO THIS
# SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
# FITNESS, IN NO EVENT SHALL THE COPYRIGHT HOLDERS BE LIABLE FOR ANY
# SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
# AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION,
# ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF
# THIS SOFTWARE.
from .zwlr_foreign_toplevel_handle_v1 import ZwlrForeignToplevelHandleV1 # noqa: F401
from .zwlr_foreign_toplevel_manager_v1 import ZwlrForeignToplevelManagerV1 # noqa: F401

View File

@@ -0,0 +1,352 @@
# This file has been autogenerated by the pywayland scanner
# Copyright © 2018 Ilia Bozhinov
#
# Permission to use, copy, modify, distribute, and sell this
# software and its documentation for any purpose is hereby granted
# without fee, provided that the above copyright notice appear in
# all copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of
# the copyright holders not be used in advertising or publicity
# pertaining to distribution of the software without specific,
# written prior permission. The copyright holders make no
# representations about the suitability of this software for any
# purpose. It is provided "as is" without express or implied
# warranty.
#
# THE COPYRIGHT HOLDERS DISCLAIM ALL WARRANTIES WITH REGARD TO THIS
# SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
# FITNESS, IN NO EVENT SHALL THE COPYRIGHT HOLDERS BE LIABLE FOR ANY
# SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
# AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION,
# ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF
# THIS SOFTWARE.
from __future__ import annotations
import enum
from pywayland.protocol_core import (
Argument,
ArgumentType,
Global,
Interface,
Proxy,
Resource,
)
from pywayland.protocol.wayland import WlOutput
from pywayland.protocol.wayland import WlSeat
from pywayland.protocol.wayland import WlSurface
class ZwlrForeignToplevelHandleV1(Interface):
"""An opened toplevel
A :class:`ZwlrForeignToplevelHandleV1` object represents an opened toplevel
window. Each app may have multiple opened toplevels.
Each toplevel has a list of outputs it is visible on, conveyed to the
client with the output_enter and output_leave events.
"""
name = "zwlr_foreign_toplevel_handle_v1"
version = 3
class state(enum.IntEnum):
maximized = 0
minimized = 1
activated = 2
fullscreen = 3
class error(enum.IntEnum):
invalid_rectangle = 0
class ZwlrForeignToplevelHandleV1Proxy(Proxy[ZwlrForeignToplevelHandleV1]):
interface = ZwlrForeignToplevelHandleV1
@ZwlrForeignToplevelHandleV1.request()
def set_maximized(self) -> None:
"""Requests that the toplevel be maximized
Requests that the toplevel be maximized. If the maximized state
actually changes, this will be indicated by the state event.
"""
self._marshal(0)
@ZwlrForeignToplevelHandleV1.request()
def unset_maximized(self) -> None:
"""Requests that the toplevel be unmaximized
Requests that the toplevel be unmaximized. If the maximized state
actually changes, this will be indicated by the state event.
"""
self._marshal(1)
@ZwlrForeignToplevelHandleV1.request()
def set_minimized(self) -> None:
"""Requests that the toplevel be minimized
Requests that the toplevel be minimized. If the minimized state
actually changes, this will be indicated by the state event.
"""
self._marshal(2)
@ZwlrForeignToplevelHandleV1.request()
def unset_minimized(self) -> None:
"""Requests that the toplevel be unminimized
Requests that the toplevel be unminimized. If the minimized state
actually changes, this will be indicated by the state event.
"""
self._marshal(3)
@ZwlrForeignToplevelHandleV1.request(
Argument(ArgumentType.Object, interface=WlSeat),
)
def activate(self, seat: WlSeat) -> None:
"""Activate the toplevel
Request that this toplevel be activated on the given seat. There is no
guarantee the toplevel will be actually activated.
:param seat:
:type seat:
:class:`~pywayland.protocol.wayland.WlSeat`
"""
self._marshal(4, seat)
@ZwlrForeignToplevelHandleV1.request()
def close(self) -> None:
"""Request that the toplevel be closed
Send a request to the toplevel to close itself. The compositor would
typically use a shell-specific method to carry out this request, for
example by sending the xdg_toplevel.close event. However, this gives no
guarantees the toplevel will actually be destroyed. If and when this
happens, the :func:`ZwlrForeignToplevelHandleV1.closed()` event will be
emitted.
"""
self._marshal(5)
@ZwlrForeignToplevelHandleV1.request(
Argument(ArgumentType.Object, interface=WlSurface),
Argument(ArgumentType.Int),
Argument(ArgumentType.Int),
Argument(ArgumentType.Int),
Argument(ArgumentType.Int),
)
def set_rectangle(
self, surface: WlSurface, x: int, y: int, width: int, height: int
) -> None:
"""The rectangle which represents the toplevel
The rectangle of the surface specified in this request corresponds to
the place where the app using this protocol represents the given
toplevel. It can be used by the compositor as a hint for some
operations, e.g minimizing. The client is however not required to set
this, in which case the compositor is free to decide some default
value.
If the client specifies more than one rectangle, only the last one is
considered.
The dimensions are given in surface-local coordinates. Setting
width=height=0 removes the already-set rectangle.
:param surface:
:type surface:
:class:`~pywayland.protocol.wayland.WlSurface`
:param x:
:type x:
`ArgumentType.Int`
:param y:
:type y:
`ArgumentType.Int`
:param width:
:type width:
`ArgumentType.Int`
:param height:
:type height:
`ArgumentType.Int`
"""
self._marshal(6, surface, x, y, width, height)
@ZwlrForeignToplevelHandleV1.request()
def destroy(self) -> None:
"""Destroy the :class:`ZwlrForeignToplevelHandleV1` object
Destroys the :class:`ZwlrForeignToplevelHandleV1` object.
This request should be called either when the client does not want to
use the toplevel anymore or after the closed event to finalize the
destruction of the object.
"""
self._marshal(7)
self._destroy()
@ZwlrForeignToplevelHandleV1.request(
Argument(ArgumentType.Object, interface=WlOutput, nullable=True),
version=2,
)
def set_fullscreen(self, output: WlOutput | None) -> None:
"""Request that the toplevel be fullscreened
Requests that the toplevel be fullscreened on the given output. If the
fullscreen state and/or the outputs the toplevel is visible on actually
change, this will be indicated by the state and output_enter/leave
events.
The output parameter is only a hint to the compositor. Also, if output
is NULL, the compositor should decide which output the toplevel will be
fullscreened on, if at all.
:param output:
:type output:
:class:`~pywayland.protocol.wayland.WlOutput` or `None`
"""
self._marshal(8, output)
@ZwlrForeignToplevelHandleV1.request(version=2)
def unset_fullscreen(self) -> None:
"""Request that the toplevel be unfullscreened
Requests that the toplevel be unfullscreened. If the fullscreen state
actually changes, this will be indicated by the state event.
"""
self._marshal(9)
class ZwlrForeignToplevelHandleV1Resource(Resource):
interface = ZwlrForeignToplevelHandleV1
@ZwlrForeignToplevelHandleV1.event(
Argument(ArgumentType.String),
)
def title(self, title: str) -> None:
"""Title change
This event is emitted whenever the title of the toplevel changes.
:param title:
:type title:
`ArgumentType.String`
"""
self._post_event(0, title)
@ZwlrForeignToplevelHandleV1.event(
Argument(ArgumentType.String),
)
def app_id(self, app_id: str) -> None:
"""App-id change
This event is emitted whenever the app-id of the toplevel changes.
:param app_id:
:type app_id:
`ArgumentType.String`
"""
self._post_event(1, app_id)
@ZwlrForeignToplevelHandleV1.event(
Argument(ArgumentType.Object, interface=WlOutput),
)
def output_enter(self, output: WlOutput) -> None:
"""Toplevel entered an output
This event is emitted whenever the toplevel becomes visible on the
given output. A toplevel may be visible on multiple outputs.
:param output:
:type output:
:class:`~pywayland.protocol.wayland.WlOutput`
"""
self._post_event(2, output)
@ZwlrForeignToplevelHandleV1.event(
Argument(ArgumentType.Object, interface=WlOutput),
)
def output_leave(self, output: WlOutput) -> None:
"""Toplevel left an output
This event is emitted whenever the toplevel stops being visible on the
given output. It is guaranteed that an entered-output event with the
same output has been emitted before this event.
:param output:
:type output:
:class:`~pywayland.protocol.wayland.WlOutput`
"""
self._post_event(3, output)
@ZwlrForeignToplevelHandleV1.event(
Argument(ArgumentType.Array),
)
def state(self, state: list) -> None:
"""The toplevel state changed
This event is emitted immediately after the
zlw_foreign_toplevel_handle_v1 is created and each time the toplevel
state changes, either because of a compositor action or because of a
request in this protocol.
:param state:
:type state:
`ArgumentType.Array`
"""
self._post_event(4, state)
@ZwlrForeignToplevelHandleV1.event()
def done(self) -> None:
"""All information about the toplevel has been sent
This event is sent after all changes in the toplevel state have been
sent.
This allows changes to the :class:`ZwlrForeignToplevelHandleV1`
properties to be seen as atomic, even if they happen via multiple
events.
"""
self._post_event(5)
@ZwlrForeignToplevelHandleV1.event()
def closed(self) -> None:
"""This toplevel has been destroyed
This event means the toplevel has been destroyed. It is guaranteed
there won't be any more events for this
:class:`ZwlrForeignToplevelHandleV1`. The toplevel itself becomes inert
so any requests will be ignored except the destroy request.
"""
self._post_event(6)
@ZwlrForeignToplevelHandleV1.event(
Argument(
ArgumentType.Object, interface=ZwlrForeignToplevelHandleV1, nullable=True
),
version=3,
)
def parent(self, parent: ZwlrForeignToplevelHandleV1 | None) -> None:
"""Parent change
This event is emitted whenever the parent of the toplevel changes.
No event is emitted when the parent handle is destroyed by the client.
:param parent:
:type parent:
:class:`ZwlrForeignToplevelHandleV1` or `None`
"""
self._post_event(7, parent)
class ZwlrForeignToplevelHandleV1Global(Global):
interface = ZwlrForeignToplevelHandleV1
ZwlrForeignToplevelHandleV1._gen_c()
ZwlrForeignToplevelHandleV1.proxy_class = ZwlrForeignToplevelHandleV1Proxy
ZwlrForeignToplevelHandleV1.resource_class = ZwlrForeignToplevelHandleV1Resource
ZwlrForeignToplevelHandleV1.global_class = ZwlrForeignToplevelHandleV1Global

View File

@@ -0,0 +1,112 @@
# This file has been autogenerated by the pywayland scanner
# Copyright © 2018 Ilia Bozhinov
#
# Permission to use, copy, modify, distribute, and sell this
# software and its documentation for any purpose is hereby granted
# without fee, provided that the above copyright notice appear in
# all copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of
# the copyright holders not be used in advertising or publicity
# pertaining to distribution of the software without specific,
# written prior permission. The copyright holders make no
# representations about the suitability of this software for any
# purpose. It is provided "as is" without express or implied
# warranty.
#
# THE COPYRIGHT HOLDERS DISCLAIM ALL WARRANTIES WITH REGARD TO THIS
# SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
# FITNESS, IN NO EVENT SHALL THE COPYRIGHT HOLDERS BE LIABLE FOR ANY
# SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
# AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION,
# ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF
# THIS SOFTWARE.
from __future__ import annotations
from pywayland.protocol_core import (
Argument,
ArgumentType,
Global,
Interface,
Proxy,
Resource,
)
from .zwlr_foreign_toplevel_handle_v1 import ZwlrForeignToplevelHandleV1
class ZwlrForeignToplevelManagerV1(Interface):
"""List and control opened apps
The purpose of this protocol is to enable the creation of taskbars and
docks by providing them with a list of opened applications and letting them
request certain actions on them, like maximizing, etc.
After a client binds the :class:`ZwlrForeignToplevelManagerV1`, each opened
toplevel window will be sent via the toplevel event
"""
name = "zwlr_foreign_toplevel_manager_v1"
version = 3
class ZwlrForeignToplevelManagerV1Proxy(Proxy[ZwlrForeignToplevelManagerV1]):
interface = ZwlrForeignToplevelManagerV1
@ZwlrForeignToplevelManagerV1.request()
def stop(self) -> None:
"""Stop sending events
Indicates the client no longer wishes to receive events for new
toplevels. However the compositor may emit further toplevel_created
events, until the finished event is emitted.
The client must not send any more requests after this one.
"""
self._marshal(0)
class ZwlrForeignToplevelManagerV1Resource(Resource):
interface = ZwlrForeignToplevelManagerV1
@ZwlrForeignToplevelManagerV1.event(
Argument(ArgumentType.NewId, interface=ZwlrForeignToplevelHandleV1),
)
def toplevel(self, toplevel: ZwlrForeignToplevelHandleV1) -> None:
"""A toplevel has been created
This event is emitted whenever a new toplevel window is created. It is
emitted for all toplevels, regardless of the app that has created them.
All initial details of the toplevel(title, app_id, states, etc.) will
be sent immediately after this event via the corresponding events in
:class:`~pywayland.protocol.wlr_foreign_toplevel_management_unstable_v1.ZwlrForeignToplevelHandleV1`.
:param toplevel:
:type toplevel:
:class:`~pywayland.protocol.wlr_foreign_toplevel_management_unstable_v1.ZwlrForeignToplevelHandleV1`
"""
self._post_event(0, toplevel)
@ZwlrForeignToplevelManagerV1.event()
def finished(self) -> None:
"""The compositor has finished with the toplevel manager
This event indicates that the compositor is done sending events to the
:class:`ZwlrForeignToplevelManagerV1`. The server will destroy the
object immediately after sending this request, so it will become
invalid and the client should free any resources associated with it.
"""
self._post_event(1)
class ZwlrForeignToplevelManagerV1Global(Global):
interface = ZwlrForeignToplevelManagerV1
ZwlrForeignToplevelManagerV1._gen_c()
ZwlrForeignToplevelManagerV1.proxy_class = ZwlrForeignToplevelManagerV1Proxy
ZwlrForeignToplevelManagerV1.resource_class = ZwlrForeignToplevelManagerV1Resource
ZwlrForeignToplevelManagerV1.global_class = ZwlrForeignToplevelManagerV1Global

238
bar/services/wlr/service.py Normal file
View File

@@ -0,0 +1,238 @@
import time
from gi.repository import GLib
from typing import Dict, List, Optional
from pywayland.client import Display
from pywayland.protocol.wayland import WlOutput, WlSeat
from fabric.core.service import Property, Service, Signal
from fabric.utils.helpers import idle_add
from bar.services.wlr.protocol.wlr_foreign_toplevel_management_unstable_v1.zwlr_foreign_toplevel_manager_v1 import (
ZwlrForeignToplevelManagerV1,
)
from bar.services.wlr.protocol.wlr_foreign_toplevel_management_unstable_v1.zwlr_foreign_toplevel_handle_v1 import (
ZwlrForeignToplevelHandleV1,
)
class Window:
"""Represents a toplevel window in the compositor."""
def __init__(self, handle: ZwlrForeignToplevelHandleV1):
self.handle = handle
self.title: str = "Unknown"
self.app_id: str = "Unknown"
self.states: List[str] = []
self.outputs: List[WlOutput] = []
self.parent: Optional["Window"] = None
self.closed = False
def __str__(self) -> str:
state_str = (
", ".join([ZwlrForeignToplevelHandleV1.state(s).name for s in self.states])
if self.states
else "normal"
)
return (
f"Window(title='{self.title}', app_id='{self.app_id}', state={state_str})"
)
class WaylandWindowTracker(Service):
"""Track Wayland windows in the background and provide access on demand."""
@Property(bool, "readable", "is-ready", default_value=False)
def ready(self) -> bool:
return self._ready
@Signal
def ready_signal(self):
return self.notify("ready")
@Property(list[Window], "readable", "windows")
def windows(self) -> list[Window]:
return [window for window in self._window_dict.values() if not window.closed]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.display = None
self._window_dict: Dict[ZwlrForeignToplevelHandleV1, Window] = {}
self._windows = []
self.manager = None
self.seat: Optional[WlSeat] = None
self.thread = GLib.Thread.new(
"wayland-window-service", self._run_display_thread
)
def _run_display_thread(self):
"""Run the Wayland event loop in a background thread."""
try:
self.display = Display()
self.display.connect()
# Get the registry to find the foreign toplevel manager
registry = self.display.get_registry()
registry.dispatcher["global"] = self._registry_global_handler
# Process registry events
self.display.roundtrip()
if not self.manager:
print("Foreign toplevel manager not found")
return
# Process more events to get initial windows
for _ in range(5):
self.display.roundtrip()
idle_add(self._set_ready)
while True:
self.display.dispatch(block=True)
except Exception as e:
print(f"Display thread error: {e}")
finally:
self.cleanup()
def _registry_global_handler(self, registry, id, interface, version):
"""Handle registry global objects."""
if interface == WlSeat.name:
self.seat = registry.bind(id, WlSeat, version)
print(f"Found seat (id={id}, version={version})")
elif interface == ZwlrForeignToplevelManagerV1.name:
self.manager = registry.bind(
id, ZwlrForeignToplevelManagerV1, min(version, 3)
)
self.manager.dispatcher["toplevel"] = self._handle_toplevel
self.manager.dispatcher["finished"] = self._handle_manager_finished
def _handle_toplevel(self, manager, toplevel):
"""Handle a new toplevel window."""
print("TOPLEVEL IS TRIGGERD")
window = Window(toplevel)
self._window_dict[toplevel] = window
# Setup event dispatchers for the toplevel
toplevel.dispatcher["title"] = self._handle_title
toplevel.dispatcher["app_id"] = self._handle_app_id
toplevel.dispatcher["state"] = self._handle_state
toplevel.dispatcher["done"] = self._handle_done
toplevel.dispatcher["closed"] = self._handle_closed
toplevel.dispatcher["output_enter"] = self._handle_output_enter
toplevel.dispatcher["output_leave"] = self._handle_output_leave
def _handle_title(self, toplevel, title):
"""Handle toplevel title changes."""
window = self._window_dict.get(toplevel)
if window:
print("there is a window, putting title")
window.title = title
def _handle_app_id(self, toplevel, app_id):
"""Handle toplevel app_id changes."""
window = self._window_dict.get(toplevel)
if window:
window.app_id = app_id
def _handle_state(self, toplevel, states):
"""Handle toplevel state changes."""
window = self._window_dict.get(toplevel)
if window:
window.states = states
def _handle_done(self, toplevel):
"""Handle toplevel done event."""
# We don't need to print anything here as we're just tracking silently
pass
def _handle_closed(self, toplevel):
"""Handle toplevel closed event."""
window = self._window_dict.get(toplevel)
if window:
window.closed = True
# Remove from our dictionary
del self._window_dict[toplevel]
# Clean up the toplevel object
toplevel.destroy()
def _handle_output_enter(self, toplevel, output):
"""Handle toplevel entering an output."""
window = self._window_dict.get(toplevel)
if window and output not in window.outputs:
window.outputs.append(output)
def _handle_output_leave(self, toplevel, output):
"""Handle toplevel leaving an output."""
window = self._window_dict.get(toplevel)
if window and output in window.outputs:
window.outputs.remove(output)
def _handle_parent(self, toplevel, parent):
"""Handle toplevel parent changes."""
window = self._window_dict.get(toplevel)
if window:
if parent is None:
window.parent = None
else:
parent_window = self._window_dict.get(parent)
if parent_window:
window.parent = parent_window
def _handle_manager_finished(self, manager):
"""Handle manager finished event."""
self.running = False
def _set_ready(self):
print("IM READY")
self._ready = True
self.ready_signal.emit()
return False
def get_windows(self) -> List[Window]:
"""Get all currently active windows."""
print([window for window in self._window_dict.values()])
print("YOU CALLED WINDOWS")
return [window for window in self._window_dict.values() if not window.closed]
def activate_window(self, window: Window):
if self.seat is None:
print("Cannot activate window: no seat available")
return
print(f"Activating window: {window.title}")
window.handle.activate(self.seat)
self.display.flush() # flush the request to the Wayland server
def cleanup(self):
"""Clean up resources."""
self.running = False
print("Cleanup")
if self.manager:
try:
self.manager.stop()
except:
pass
# Disconnect from display
if self.display:
try:
self.display.disconnect()
except:
pass
def print_windows(tracker):
"""Print the current list of windows."""
windows = tracker.get_windows()
print(f"\nCurrent windows ({len(windows)}):")
if windows:
for i, window in enumerate(windows, 1):
print(f"{i}. {window}")
else:
print("No windows found")

51
bar/styles/bar.css Normal file
View File

@@ -0,0 +1,51 @@
#bar-inner {
padding: 4px;
border-bottom: solid 2px;
border-color: var(--border-color);
background-color: var(--window-bg);
min-height: 28px;
}
#center-container {
color: var(--foreground);
}
.active-window {
color: var(--foreground);
font-weight: bold;
}
#cpu-progress-bar,
#ram-progress-bar,
#volume-progress-bar {
color: transparent;
background-color: transparent
}
#cpu-progress-bar {
border: solid 0px alpha(var(--violet), 0.8);
}
#ram-progress-bar,
#volume-progress-bar {
border: solid 0px var(--blue);
}
#widgets-container {
background-color: var(--module-bg);
padding: 2px;
}
#nixos-label {
color: var(--blue);
}
tooltip {
border: solid 2px;
border-color: var(--border-color);
background-color: var(--window-bg);
}
tooltip>* {
padding: 2px 4px
}

28
bar/styles/colors.css Normal file
View File

@@ -0,0 +1,28 @@
:vars {
--background: #17181C;
--mid-bg: #1E1F24;
--light-bg: #26272B;
--dark-grey: #333438;
--light-grey: #8F9093;
--dark-fg: #B0B1B4;
--mid-fg: #CBCCCE;
--foreground: #E4E5E7;
--pink: #FA3867;
--orange: #F3872F;
--gold: #FEBD16;
--lime: #3FD43B;
--turquoise: #47E7CE;
--blue: #53ADE1;
--violet: #AD60FF;
--red: #FC3F2C;
--window-bg: alpha(var(--background), 0.9);
--module-bg: alpha(var(--mid-bg), 0.8);
--border-color: var(--light-bg);
--ws-active: var(--pink);
--ws-inactive: var(--blue);
--ws-empty: var(--dark-grey);
--ws-hover: var(--turquoise);
--ws-urgent: var(--red);
}

29
bar/styles/finder.css Normal file
View File

@@ -0,0 +1,29 @@
#picker-box {
padding: 12px;
background-color: rgba(40, 40, 40, 0.95); /* darker for contrast */
border-radius: 8px;
font-family: sans-serif;
font-size: 14px;
color: white;
}
#viewport {
padding: 8px;
background-color: rgba(30, 30, 30, 0.9); /* dark background for contrast */
border-radius: 6px;
font-family: sans-serif;
font-size: 14px;
color: white; /* ensure contrast */
}
#viewport > * {
padding: 6px 10px;
margin-bottom: 4px;
border-radius: 4px;
background-color: rgba(255, 255, 255, 0.05);
}
#viewport:hover {
background-color: rgba(255, 255, 255, 0.15); /* hover feedback */
}

20
bar/styles/main.css Normal file
View File

@@ -0,0 +1,20 @@
@import url("./colors.css");
@import url("./workspaces.css");
@import url("./menu.css");
@import url("./vinyl.css");
@import url("./bar.css");
@import url("./finder.css");
/* unset so we can style everything from the ground up. */
* {
all: unset;
color: var(--foreground);
font-size: 16px;
font-family: "Jost*", sans-serif;
}
button {
background-size: 400% 400%;
}

38
bar/styles/menu.css Normal file
View File

@@ -0,0 +1,38 @@
#date-time,
menu>menuitem>label,
#date-time>label,
/* system tray */
#system-tray {
padding: 2px 4px;
background-color: var(--module-bg);
}
/* menu and menu items (written for the system tray) */
menu {
border: solid 2px;
border-radius: 10px;
border-color: var(--border-color);
background-color: var(--window-bg);
}
menu>menuitem {
border-radius: 0px;
background-color: var(--module-bg);
padding: 6px;
margin-left: 2px;
margin-right: 2px;
}
menu>menuitem:first-child {
margin-top: 1px;
border-radius: 8px 8px 0px 0px;
}
menu>menuitem:last-child {
margin-bottom: 1px;
border-radius: 0px 0px 8px 8px;
}
menu>menuitem:hover {
background-color: var(--pink);
}

41
bar/styles/vinyl.css Normal file
View File

@@ -0,0 +1,41 @@
/* Vinyl button styling */
#vinyl-button {
padding: 0px 8px;
transition: padding 0.05s steps(8);
background-color: rgba(180, 180, 180, 0.2);
border-radius: 4px;
transition: all 0.2s ease;
}
/* Active state styling */
.active #vinyl-button {
background-color: rgba(108, 158, 175, 0.7);
padding: 0px 32px;
}
/* Icon styling */
#vinyl-icon {
color: #555555;
min-width: 36px;
}
/* Label styling */
#vinyl-label {
color: #333333;
}
/* Active state changes for icon and label */
.active #vinyl-icon,
.active #vinyl-label {
color: var(--pink);
padding: 0px 32px;
}
/* Hover effect */
#vinyl-button:hover {
background-color: rgba(180, 180, 180, 0.4);
}
.active #vinyl-button:hover {
background-color: rgba(108, 158, 175, 0.9);
}

42
bar/styles/workspaces.css Normal file
View File

@@ -0,0 +1,42 @@
#workspaces {
padding: 6px;
min-width: 0px;
background-color: var(--module-bg);
}
#workspaces>button {
padding: 0px 8px;
transition: padding 0.05s steps(8);
background-color: var(--foreground);
border-radius: 100px;
}
#workspaces>button>label {
font-size: 0px;
}
#workspaces button.hover {
background-color: var(--ws-hover);
}
#workspaces button.urgent {
background-color: var(--ws-urgent);
color: var(--foreground);
font-weight: bold;
animation: urgent-blink 1s infinite;
}
@keyframes urgent-blink {
0% { opacity: 1.0; }
50% { opacity: 0.5; }
100% { opacity: 1.0; }
}
#workspaces>button.empty {
background-color: var(--ws-empty);
}
#workspaces>button.active {
padding: 0px 32px;
background-color: var(--ws-active);
}

126
bar/widgets/circle_image.py Normal file
View File

@@ -0,0 +1,126 @@
import math
from typing import Literal
import cairo
import gi
from fabric.core.service import Property
from fabric.widgets.widget import Widget
gi.require_version("Gtk", "3.0")
from gi.repository import Gdk, GdkPixbuf, Gtk # noqa: E402
class CircleImage(Gtk.DrawingArea, Widget):
"""A widget that displays an image in a circular shape with a 1:1 aspect ratio."""
@Property(int, "read-write")
def angle(self) -> int:
return self._angle
@angle.setter
def angle(self, value: int):
self._angle = value % 360
self.queue_draw()
def __init__(
self,
image_file: str | None = None,
pixbuf: GdkPixbuf.Pixbuf | None = None,
name: str | None = None,
visible: bool = True,
all_visible: bool = False,
style: str | None = None,
tooltip_text: str | None = None,
tooltip_markup: str | None = None,
h_align: Literal["fill", "start", "end", "center", "baseline"]
| Gtk.Align
| None = None,
v_align: Literal["fill", "start", "end", "center", "baseline"]
| Gtk.Align
| None = None,
h_expand: bool = False,
v_expand: bool = False,
size: int | None = None,
**kwargs,
):
Gtk.DrawingArea.__init__(self)
Widget.__init__(
self,
name=name,
visible=visible,
all_visible=all_visible,
style=style,
tooltip_text=tooltip_text,
tooltip_markup=tooltip_markup,
h_align=h_align,
v_align=v_align,
h_expand=h_expand,
v_expand=v_expand,
size=size,
**kwargs,
)
self.size = size if size is not None else 100 # Default size if not provided
self._angle = 0
self._orig_image: GdkPixbuf.Pixbuf | None = (
None # Original image for reprocessing
)
self._image: GdkPixbuf.Pixbuf | None = None
if image_file:
pix = GdkPixbuf.Pixbuf.new_from_file(image_file)
self._orig_image = pix
self._image = self._process_image(pix)
elif pixbuf:
self._orig_image = pixbuf
self._image = self._process_image(pixbuf)
self.connect("draw", self.on_draw)
def _process_image(self, pixbuf: GdkPixbuf.Pixbuf) -> GdkPixbuf.Pixbuf:
"""Crop the image to a centered square and scale it to the widgets size."""
width, height = pixbuf.get_width(), pixbuf.get_height()
if width != height:
square_size = min(width, height)
x_offset = (width - square_size) // 2
y_offset = (height - square_size) // 2
pixbuf = pixbuf.new_subpixbuf(x_offset, y_offset, square_size, square_size)
else:
square_size = width
if square_size != self.size:
pixbuf = pixbuf.scale_simple(
self.size, self.size, GdkPixbuf.InterpType.BILINEAR
)
return pixbuf
def on_draw(self, widget: "CircleImage", ctx: cairo.Context):
if self._image:
ctx.save()
# Create a circular clipping path
ctx.arc(self.size / 2, self.size / 2, self.size / 2, 0, 2 * math.pi)
ctx.clip()
# Rotate around the center of the square image
ctx.translate(self.size / 2, self.size / 2)
ctx.rotate(self._angle * math.pi / 180.0)
ctx.translate(-self.size / 2, -self.size / 2)
Gdk.cairo_set_source_pixbuf(ctx, self._image, 0, 0)
ctx.paint()
ctx.restore()
def set_image_from_file(self, new_image_file: str):
if not new_image_file:
return
pixbuf = GdkPixbuf.Pixbuf.new_from_file(new_image_file)
self._orig_image = pixbuf
self._image = self._process_image(pixbuf)
self.queue_draw()
def set_image_from_pixbuf(self, pixbuf: GdkPixbuf.Pixbuf):
if not pixbuf:
return
self._orig_image = pixbuf
self._image = self._process_image(pixbuf)
self.queue_draw()
def set_image_size(self, size: int):
self.size = size
if self._orig_image:
self._image = self._process_image(self._orig_image)
self.queue_draw()

2
example.yaml Normal file
View File

@@ -0,0 +1,2 @@
vinyl:
enabled: true

36
flake.lock generated
View File

@@ -6,16 +6,16 @@
"utils": "utils"
},
"locked": {
"lastModified": 1725442219,
"narHash": "sha256-xgTjqwlAgfY0Kv6G6CogOV2pN6U0wllRYteVAAZs7BU=",
"owner": "wholikeel",
"repo": "fabric-nix",
"rev": "3bc86cfb8c988ff5488526a47e1914f03a34a87c",
"lastModified": 1745289078,
"narHash": "sha256-1dZTqsWPaHyWjZkX4MaJdwUAQoMXwr8hhHymxQIwFrY=",
"owner": "Fabric-Development",
"repo": "fabric",
"rev": "1831ced4d9bb9f4be3893be55a8d502b47bff29e",
"type": "github"
},
"original": {
"owner": "wholikeel",
"repo": "fabric-nix",
"owner": "Fabric-Development",
"repo": "fabric",
"type": "github"
}
},
@@ -41,32 +41,32 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1717179513,
"narHash": "sha256-vboIEwIQojofItm2xGCdZCzW96U85l9nDW3ifMuAIdM=",
"lastModified": 1733261153,
"narHash": "sha256-eq51hyiaIwtWo19fPEeE0Zr2s83DYMKJoukNLgGGpek=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "63dacb46bf939521bdc93981b4cbb7ecb58427a0",
"rev": "b681065d0919f7eb5309a93cea2cfa84dec9aa88",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "24.05",
"ref": "nixos-24.11",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1717179513,
"narHash": "sha256-vboIEwIQojofItm2xGCdZCzW96U85l9nDW3ifMuAIdM=",
"lastModified": 1731603435,
"narHash": "sha256-CqCX4JG7UiHvkrBTpYC3wcEurvbtTADLbo3Ns2CEoL8=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "63dacb46bf939521bdc93981b4cbb7ecb58427a0",
"rev": "8b27c1239e5c421a2bbc2c65d52e4a6fbf2ff296",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "24.05",
"ref": "24.11",
"repo": "nixpkgs",
"type": "github"
}
@@ -131,11 +131,11 @@
"systems": "systems"
},
"locked": {
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"lastModified": 1731533236,
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
"type": "github"
},
"original": {

View File

@@ -2,10 +2,10 @@
description = "Fabric Bar Example";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/24.05";
nixpkgs.url = "github:NixOS/nixpkgs/24.11";
unstable.url = "github:NixOS/nixpkgs/nixos-unstable";
utils.url = "github:numtide/flake-utils";
fabric.url = "github:wholikeel/fabric-nix";
fabric.url = "github:Fabric-Development/fabric";
home-manager.url = "github:nix-community/home-manager";
home-manager.inputs.nixpkgs.follows = "nixpkgs";
};
@@ -31,11 +31,22 @@
in
{
formatter = pkgs.nixfmt-rfc-style;
devShells.default = pkgs.callPackage ./shell.nix { inherit pkgs; };
packages.default = pkgs.callPackage ./derivation.nix { inherit (pkgs) lib python3Packages; };
apps.default = {
type = "app";
program = "${self.packages.${system}.default}/bin/bar";
devShells.default = pkgs.callPackage ./nix/shell.nix { inherit pkgs; };
packages = {
default = pkgs.callPackage ./nix/derivation.nix { inherit (pkgs) lib python3Packages; };
makku = pkgs.writeShellScriptBin "makku" ''
dbus-send --session --print-reply --dest=org.Fabric.fabric.bar /org/Fabric/fabric org.Fabric.fabric.Evaluate string:"finder.open()" > /dev/null 2>&1
'';
};
apps = {
default = {
type = "app";
program = "${self.packages.${system}.default}/bin/bar";
};
show = {
type = "app";
program = "${self.packages.${system}.makku}/bin/makku";
};
};
}
)
@@ -47,6 +58,11 @@
pkgs,
...
}:
let
cfg = config.services.makku-bar;
settingsFormat = pkgs.formats.yaml { };
in
{
options.services.makku-bar = {
enable = lib.mkEnableOption "makku-bar status bar";
@@ -56,24 +72,44 @@
default = self.packages.${pkgs.system}.default;
description = "The makku-bar package to use.";
};
settings = lib.mkOption {
type = lib.types.submodule {
options = {
vinyl = {
enable = lib.mkOption {
type = lib.types.bool;
default = false;
};
};
};
};
default = {
vinyl.enable = false;
};
};
};
config = lib.mkIf config.services.makku-bar.enable {
systemd.user.services.makku-bar = {
Unit = {
Description = "Makku Status Bar";
After = [ "graphical-session.target" ];
};
systemd.user.services.makku-bar =
let
configFile = settingsFormat.generate "config.yaml" cfg.settings;
in
{
Unit = {
Description = "Makku Status Bar";
After = [ "graphical-session.target" ];
};
Service = {
ExecStart = "${config.services.makku-bar.package}/bin/bar";
Restart = "on-failure";
};
Service = {
ExecStart = "${config.services.makku-bar.package}/bin/bar --config ${configFile}";
Restart = "on-failure";
};
Install = {
WantedBy = [ "default.target" ];
Install = {
WantedBy = [ "default.target" ];
};
};
};
};
};
};

View File

@@ -7,9 +7,11 @@
gobject-introspection,
libdbusmenu-gtk3,
gdk-pixbuf,
gnome,
cinnamon,
gnome-bluetooth,
cinnamon-desktop,
wrapGAppsHook3,
playerctl,
webp-pixbuf-loader,
...
}:
@@ -18,29 +20,49 @@ python3Packages.buildPythonApplication {
version = "0.0.1";
pyproject = true;
src = ./.;
src = ../.;
nativeBuildInputs = [
wrapGAppsHook3
gtk3
gobject-introspection
python3Packages.pygobject3
cairo
playerctl
];
buildInputs = [
libdbusmenu-gtk3
gtk-layer-shell
gnome-bluetooth
cinnamon-desktop
gdk-pixbuf
playerctl
webp-pixbuf-loader
];
# buildInputs = [
# libdbusmenu-gtk3
# gtk-layer-shell
# gnome.gnome-bluetooth
# cinnamon.cinnamon-desktop
# gdk-pixbuf
# ];
dependencies = with python3Packages; [
python-fabric
pywayland
pyyaml
platformdirs
];
doCheck = false;
dontWrapGApps = true;
installPhase = ''
runHook preInstall
mkdir -p $out/${python3Packages.python.sitePackages}
cp -r bar $out/${python3Packages.python.sitePackages}/
# If you have any scripts to install
mkdir -p $out/bin
cp scripts/launcher.py $out/bin/bar
chmod +x $out/bin/bar
runHook postInstall
'';
preFixup = ''
makeWrapperArgs+=("''${gappsWrapperArgs[@]}")
'';

View File

@@ -22,11 +22,13 @@ pkgs.mkShell {
gobject-introspection
libdbusmenu-gtk3
gdk-pixbuf
gnome.gnome-bluetooth
cinnamon.cinnamon-desktop
gnome-bluetooth
cinnamon-desktop
wayland-scanner
wayland
wayland-protocols
playerctl
(python3.withPackages (
ps: with ps; [
setuptools
@@ -39,6 +41,8 @@ pkgs.mkShell {
pylsp-mypy
pyls-isort
python-lsp-ruff
pyyaml
platformdirs
]
))
];

View File

@@ -14,11 +14,11 @@ description = "Fabric using Nix example."
readme = "README.md"
license = {file = "LICENSE"}
[project.scripts]
bar = "bar.bar:main"
[tool.setuptools]
include-package-data = true
[tool.setuptools.packages.find]
where = ["."]
[tool.setuptools.packages]
find = { namespaces = true }
[tool.setuptools.package-data]
bar = ["bar.css"]
"*" = ["*.css", "styles"]

21
scripts/launcher.py Normal file
View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python3
import sys
import os
script_dir = os.path.dirname(os.path.abspath(__file__))
site_packages_dir = os.path.join(
script_dir,
os.pardir,
"lib",
f"python{sys.version_info.major}.{sys.version_info.minor}",
"site-packages",
)
if site_packages_dir not in sys.path:
sys.path.insert(0, site_packages_dir)
from bar.main import *
sys.argv[0] = os.path.join(script_dir, os.path.basename(__file__))
sys.exit(main())