Files
sims/sims/services/fenster.py

56 lines
1.7 KiB
Python

"""
Fenster/Sway IPC connection helper.
Provides a singleton I3 connection configured for Fenster's SWAYSOCK.
"""
import os
from sims.services.i3 import I3, I3MessageType
_connection: I3 | None = None
def get_i3_connection() -> I3:
"""Get the singleton I3 connection, configured for Fenster."""
global _connection
if _connection is None:
swaysock = os.environ.get("SWAYSOCK")
if swaysock:
I3.SOCKET_PATH = swaysock
elif not I3.SOCKET_PATH:
runtime_dir = os.environ.get(
"XDG_RUNTIME_DIR", f"/run/user/{os.getuid()}"
)
fallback = os.path.join(runtime_dir, "fenster.sock")
if os.path.exists(fallback):
I3.SOCKET_PATH = fallback
_connection = I3()
return _connection
def focused_output_index() -> int:
"""Index of the currently focused output in active GET_OUTPUTS order.
Matches the indexing main.spawn_bars uses for `monitor=` so layer-shell
windows opened with the same index land on the focused output. Returns 0
on any IPC failure or if the focused output cannot be located.
"""
ws_reply = I3.send_command("", I3MessageType.GET_WORKSPACES)
if not (ws_reply.is_ok and isinstance(ws_reply.reply, list)):
return 0
focused_output = next(
(ws.get("output") for ws in ws_reply.reply if ws.get("focused")),
None,
)
if not focused_output:
return 0
out_reply = I3.send_command("", I3MessageType.GET_OUTPUTS)
if not (out_reply.is_ok and isinstance(out_reply.reply, list)):
return 0
active = [o for o in out_reply.reply if o.get("active")]
for i, o in enumerate(active):
if o.get("name") == focused_output:
return i
return 0