2025-05-19 09:32:17 +02:00

239 lines
7.8 KiB
Python

import time
from gi.repository import GLib
from typing import Dict, List, Optional
from pywayland.client import Display
from pywayland.protocol.wayland import WlOutput, WlSeat
from fabric.core.service import Property, Service, Signal
from fabric.utils.helpers import idle_add
from bar.services.wlr.protocol.wlr_foreign_toplevel_management_unstable_v1.zwlr_foreign_toplevel_manager_v1 import (
ZwlrForeignToplevelManagerV1,
)
from bar.services.wlr.protocol.wlr_foreign_toplevel_management_unstable_v1.zwlr_foreign_toplevel_handle_v1 import (
ZwlrForeignToplevelHandleV1,
)
class Window:
"""Represents a toplevel window in the compositor."""
def __init__(self, handle: ZwlrForeignToplevelHandleV1):
self.handle = handle
self.title: str = "Unknown"
self.app_id: str = "Unknown"
self.states: List[str] = []
self.outputs: List[WlOutput] = []
self.parent: Optional["Window"] = None
self.closed = False
def __str__(self) -> str:
state_str = (
", ".join([ZwlrForeignToplevelHandleV1.state(s).name for s in self.states])
if self.states
else "normal"
)
return (
f"Window(title='{self.title}', app_id='{self.app_id}', state={state_str})"
)
class WaylandWindowTracker(Service):
"""Track Wayland windows in the background and provide access on demand."""
@Property(bool, "readable", "is-ready", default_value=False)
def ready(self) -> bool:
return self._ready
@Signal
def ready_signal(self):
return self.notify("ready")
@Property(list[Window], "readable", "windows")
def windows(self) -> list[Window]:
return [window for window in self._window_dict.values() if not window.closed]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.display = None
self._window_dict: Dict[ZwlrForeignToplevelHandleV1, Window] = {}
self._windows = []
self.manager = None
self.seat: Optional[WlSeat] = None
self.thread = GLib.Thread.new(
"wayland-window-service", self._run_display_thread
)
def _run_display_thread(self):
"""Run the Wayland event loop in a background thread."""
try:
self.display = Display()
self.display.connect()
# Get the registry to find the foreign toplevel manager
registry = self.display.get_registry()
registry.dispatcher["global"] = self._registry_global_handler
# Process registry events
self.display.roundtrip()
if not self.manager:
print("Foreign toplevel manager not found")
return
# Process more events to get initial windows
for _ in range(5):
self.display.roundtrip()
idle_add(self._set_ready)
while True:
self.display.dispatch(block=True)
except Exception as e:
print(f"Display thread error: {e}")
finally:
self.cleanup()
def _registry_global_handler(self, registry, id, interface, version):
"""Handle registry global objects."""
if interface == WlSeat.name:
self.seat = registry.bind(id, WlSeat, version)
print(f"Found seat (id={id}, version={version})")
elif interface == ZwlrForeignToplevelManagerV1.name:
self.manager = registry.bind(
id, ZwlrForeignToplevelManagerV1, min(version, 3)
)
self.manager.dispatcher["toplevel"] = self._handle_toplevel
self.manager.dispatcher["finished"] = self._handle_manager_finished
def _handle_toplevel(self, manager, toplevel):
"""Handle a new toplevel window."""
print("TOPLEVEL IS TRIGGERD")
window = Window(toplevel)
self._window_dict[toplevel] = window
# Setup event dispatchers for the toplevel
toplevel.dispatcher["title"] = self._handle_title
toplevel.dispatcher["app_id"] = self._handle_app_id
toplevel.dispatcher["state"] = self._handle_state
toplevel.dispatcher["done"] = self._handle_done
toplevel.dispatcher["closed"] = self._handle_closed
toplevel.dispatcher["output_enter"] = self._handle_output_enter
toplevel.dispatcher["output_leave"] = self._handle_output_leave
def _handle_title(self, toplevel, title):
"""Handle toplevel title changes."""
window = self._window_dict.get(toplevel)
if window:
print("there is a window, putting title")
window.title = title
def _handle_app_id(self, toplevel, app_id):
"""Handle toplevel app_id changes."""
window = self._window_dict.get(toplevel)
if window:
window.app_id = app_id
def _handle_state(self, toplevel, states):
"""Handle toplevel state changes."""
window = self._window_dict.get(toplevel)
if window:
window.states = states
def _handle_done(self, toplevel):
"""Handle toplevel done event."""
# We don't need to print anything here as we're just tracking silently
pass
def _handle_closed(self, toplevel):
"""Handle toplevel closed event."""
window = self._window_dict.get(toplevel)
if window:
window.closed = True
# Remove from our dictionary
del self._window_dict[toplevel]
# Clean up the toplevel object
toplevel.destroy()
def _handle_output_enter(self, toplevel, output):
"""Handle toplevel entering an output."""
window = self._window_dict.get(toplevel)
if window and output not in window.outputs:
window.outputs.append(output)
def _handle_output_leave(self, toplevel, output):
"""Handle toplevel leaving an output."""
window = self._window_dict.get(toplevel)
if window and output in window.outputs:
window.outputs.remove(output)
def _handle_parent(self, toplevel, parent):
"""Handle toplevel parent changes."""
window = self._window_dict.get(toplevel)
if window:
if parent is None:
window.parent = None
else:
parent_window = self._window_dict.get(parent)
if parent_window:
window.parent = parent_window
def _handle_manager_finished(self, manager):
"""Handle manager finished event."""
self.running = False
def _set_ready(self):
print("IM READY")
self._ready = True
self.ready_signal.emit()
return False
def get_windows(self) -> List[Window]:
"""Get all currently active windows."""
print([window for window in self._window_dict.values()])
print("YOU CALLED WINDOWS")
return [window for window in self._window_dict.values() if not window.closed]
def activate_window(self, window: Window):
if self.seat is None:
print("Cannot activate window: no seat available")
return
print(f"Activating window: {window.title}")
window.handle.activate(self.seat)
self.display.flush() # flush the request to the Wayland server
def cleanup(self):
"""Clean up resources."""
self.running = False
print("Cleanup")
if self.manager:
try:
self.manager.stop()
except:
pass
# Disconnect from display
if self.display:
try:
self.display.disconnect()
except:
pass
def print_windows(tracker):
"""Print the current list of windows."""
windows = tracker.get_windows()
print(f"\nCurrent windows ({len(windows)}):")
if windows:
for i, window in enumerate(windows, 1):
print(f"{i}. {window}")
else:
print("No windows found")