49 lines
1.6 KiB
Python
49 lines
1.6 KiB
Python
"""Smart-corners IPC subscriber.
|
|
|
|
Listens to fenster's `:smart_corners` event and emits a per-output signal
|
|
when the WM's smart-corners state flips. Caches the latest state per output
|
|
so widgets created after the event can ask for the current value.
|
|
"""
|
|
from fabric.core.service import Service, Signal
|
|
from loguru import logger
|
|
|
|
from sims.services.fenster import get_i3_connection
|
|
|
|
|
|
_service: "SmartCornersService | None" = None
|
|
|
|
|
|
class SmartCornersService(Service):
|
|
@Signal
|
|
def state_changed(self, output: str, active: bool) -> None:
|
|
"""Emitted when a fenster output flips smart-corners state."""
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self._state: dict[str, bool] = {}
|
|
i3 = get_i3_connection()
|
|
i3.connect("event::smart_corners", self._on_event)
|
|
|
|
def get(self, output: str) -> bool:
|
|
"""Latest known state for an output, or False if unseen."""
|
|
return self._state.get(output, False)
|
|
|
|
def _on_event(self, _i3, event):
|
|
change = event.data.get("change")
|
|
output = event.data.get("output")
|
|
if not isinstance(output, str) or change not in ("active", "inactive"):
|
|
logger.warning(f"[SmartCorners] unexpected event payload: {event.data!r}")
|
|
return
|
|
active = change == "active"
|
|
if self._state.get(output) == active:
|
|
return
|
|
self._state[output] = active
|
|
self.state_changed(output, active)
|
|
|
|
|
|
def get_smart_corners_service() -> SmartCornersService:
|
|
global _service
|
|
if _service is None:
|
|
_service = SmartCornersService()
|
|
return _service
|