fabric-makussu/bar/river/widgets.py
2025-05-01 23:34:41 +02:00

182 lines
6.6 KiB
Python

from loguru import logger
from fabric.core.service import Property
from fabric.widgets.button import Button
from fabric.widgets.box import Box
from fabric.widgets.eventbox import EventBox
from fabric.utils.helpers import bulk_connect
from .service import River
from gi.repository import Gdk
_connection: River | None = None
def get_river_connection() -> River:
global _connection
if not _connection:
_connection = River()
return _connection
class RiverWorkspaceButton(Button):
@Property(int, "readable")
def id(self) -> int:
return self._id
@Property(bool, "read-write", default_value=False)
def active(self) -> bool:
return self._active
@active.setter
def active(self, value: bool):
self._active = value
(self.remove_style_class if not value else self.add_style_class)("active")
@Property(bool, "read-write", default_value=False)
def empty(self) -> bool:
return self._empty
@empty.setter
def empty(self, value: bool):
self._empty = value
(self.remove_style_class if not value else self.add_style_class)("empty")
def __init__(self, id: int, label: str = None, **kwargs):
super().__init__(label or str(id), **kwargs)
self._id = id
self._active = False
self._empty = True
class RiverWorkspaces(EventBox):
def __init__(self, output_id=None, max_tags=9, **kwargs):
super().__init__(events="scroll")
self.service = get_river_connection()
self._box = Box(**kwargs)
self.children = self._box
# Store output_id as received
self.output_id = output_id
self.max_tags = max_tags
# Create buttons for tags 0 to max_tags-1 (to match River's 0-based tag indexing)
self._buttons = {i: RiverWorkspaceButton(i) for i in range(max_tags)}
for btn in self._buttons.values():
btn.connect("clicked", self.on_workspace_click)
self._box.add(btn)
# Connect to service events
self.service.connect("event::focused_tags", self.on_focus_change_general)
self.service.connect("event::view_tags", self.on_view_change_general)
self.service.connect("event::output_removed", self.on_output_removed)
# Initial setup when service is ready
if self.service.ready:
self.on_ready(None)
else:
self.service.connect("event::ready", self.on_ready)
self.connect("scroll-event", self.on_scroll)
def on_ready(self, _):
"""Initialize widget state when service is ready"""
logger.debug(
f"[RiverWorkspaces] Service ready, outputs: {list(self.service.outputs.keys())}"
)
# If no output_id was specified, take the first one
if self.output_id is None and self.service.outputs:
self.output_id = next(iter(self.service.outputs.keys()))
logger.info(f"[RiverWorkspaces] Selected output {self.output_id}")
# Initialize state from selected output
if self.output_id is not None and self.output_id in self.service.outputs:
output_info = self.service.outputs[self.output_id]
# Initialize buttons with current state
# Access fields directly on the OutputInfo dataclass
focused_tags = output_info.tags_focused
view_tags = output_info.tags_view
logger.debug(
f"[RiverWorkspaces] Initial state - focused: {focused_tags}, view: {view_tags}"
)
for i, btn in self._buttons.items():
btn.active = i in focused_tags
btn.empty = i not in view_tags
def on_focus_change(self, _, tags):
"""Handle focused tags change for our specific output"""
logger.debug(
f"[RiverWorkspaces] Focus change on output {self.output_id}: {tags}"
)
for i, btn in self._buttons.items():
btn.active = i in tags
def on_view_change(self, _, tags):
"""Handle view tags change for our specific output"""
logger.debug(
f"[RiverWorkspaces] View change on output {self.output_id}: {tags}"
)
for i, btn in self._buttons.items():
btn.empty = i not in tags
def on_focus_change_general(self, _, event):
"""Handle general focused tags event"""
# Only handle event if it's for our output
if event.output_id == self.output_id:
logger.debug(
f"[RiverWorkspaces] General focus change for output {self.output_id}"
)
self.on_focus_change(_, event.data)
def on_view_change_general(self, _, event):
"""Handle general view tags event"""
# Only handle event if it's for our output
if event.output_id == self.output_id:
logger.debug(
f"[RiverWorkspaces] General view change for output {self.output_id}"
)
self.on_view_change(_, event.data)
def on_output_removed(self, _, event):
"""Handle output removal"""
removed_id = event.data[0]
if removed_id == self.output_id:
logger.info(f"[RiverWorkspaces] Our output {self.output_id} was removed")
# Try to find another output
if self.service.outputs:
self.output_id = next(iter(self.service.outputs.keys()))
logger.info(f"[RiverWorkspaces] Switching to output {self.output_id}")
# Update state for new output
if self.output_id in self.service.outputs:
output_info = self.service.outputs[self.output_id]
# Access fields directly on the OutputInfo dataclass
focused_tags = output_info.tags_focused
view_tags = output_info.tags_view
for i, btn in self._buttons.items():
btn.active = i in focused_tags
btn.empty = i not in view_tags
def on_workspace_click(self, btn):
"""Handle workspace button click"""
logger.info(f"[RiverWorkspaces] Clicked on workspace {btn.id}")
self.service.toggle_focused_tag(btn.id)
def on_scroll(self, _, event):
"""Handle scroll events"""
direction = event.direction
if direction == Gdk.ScrollDirection.DOWN:
logger.info("[RiverWorkspaces] Scroll down - focusing next view")
self.service.run_command("focus-view", "next")
elif direction == Gdk.ScrollDirection.UP:
logger.info("[RiverWorkspaces] Scroll up - focusing previous view")
self.service.run_command("focus-view", "previous")