diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c8397f..2c6314a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ See Git commit messages for full history. ## 10.2.0.dev0 (2026-xx-xx) +- Linux: add primary monitor detection, monitor device name, unique device interface name, and output name using XRandR (#153) - Windows: switch from `GetDIBits` to more memory efficient `CreateDIBSection` for `MSS.grab` implementation (#449) - Windows: fix gdi32.GetDIBits() failed after a couple of minutes of recording (#268) - Linux: check the server for Xrandr support version (#417) diff --git a/CHANGES.md b/CHANGES.md index 1f9ef07..202b224 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,40 @@ # Technical Changes +## 10.2.0 (2026-xx-xx) + +### windows.py +- Added `MONITORINFOEXW` structure for extended monitor information. +- Added `DISPLAY_DEVICEW` structure for device information. +- Added constants: `CCHDEVICENAME`, `MONITORINFOF_PRIMARY`, `EDD_GET_DEVICE_INTERFACE_NAME`. +- Added `GetMonitorInfoW` to `CFUNCTIONS` for querying monitor properties. +- Added `EnumDisplayDevicesW` to `CFUNCTIONS` for querying device details. +- Modified `_monitors_impl()` callback to extract primary monitor flag, device names, and device interface name (unique_id) using Win32 APIs; `unique_id` uses `EDD_GET_DEVICE_INTERFACE_NAME` when available. + +### linux/base.py +- Reworked `_monitors_impl()` to prefer XRandR 1.5+ `GetMonitors` when available, falling back to enumerating active CRTCs. +- Added monitor identification fields from RandR + EDID where available: `is_primary`, `output`, `name`, and `unique_id`. +- Added EDID lookup via RandR `EDID`/`EdidData` output property and parsing via `mss.tools.parse_edid()`. + +### linux/xcb.py +- Added `intern_atom()` helper with per-connection caching and support for predefined atoms. +- Added `XCB_NONE` constant (`Atom(0)`). +- Added additional XRandR request wrappers used for monitor identification (`GetMonitors`, `GetOutputInfo`, `GetOutputPrimary`, `GetOutputProperty`). + +### linux/xcbhelpers.py +- Added `InternAtomReply` structure and typed binding for `xcb_intern_atom`. +- Added `__eq__()`/`__hash__()` to `XID` for value-based comparisons. + +### xcbproto/gen_xcb_to_py.py +- Extended the generator to include additional XRandR requests used by the XCB backends (`GetOutputInfo`, `GetOutputPrimary`, `GetOutputProperty`, `GetMonitors`). +- Updated typedef generation to emit value-based `__eq__()`/`__hash__()` implementations. +- Refactored code generation helpers and formatting (use `textwrap.indent`/`dedent`). + +### tools.py +- Added `parse_edid()` helper for extracting identifying fields (legacy model id, serial number, manufacture/model year, and display name) from EDID blocks. + +### linux/xshmgetimage.py +- Fixed XID type handling for `drawable`/`visual` (avoid mixing raw `.value` with typed IDs). + ## 10.1.1 (2025-xx-xx) ### linux/__init__.py diff --git a/src/mss/linux/base.py b/src/mss/linux/base.py index f9921d5..036eb98 100644 --- a/src/mss/linux/base.py +++ b/src/mss/linux/base.py @@ -1,14 +1,18 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any +from urllib.parse import urlencode from mss.base import MSSBase from mss.exception import ScreenShotError +from mss.tools import parse_edid from . import xcb from .xcb import LIB if TYPE_CHECKING: + from ctypes import Array + from mss.models import Monitor from mss.screenshot import ScreenShot @@ -66,7 +70,7 @@ def __init__(self, /, **kwargs: Any) -> None: # noqa: PLR0912 # we'll have to ask the server for its depth and visual. assert self.root == self.drawable # noqa: S101 self.drawable_depth = self.pref_screen.root_depth - self.drawable_visual_id = self.pref_screen.root_visual.value + self.drawable_visual_id = self.pref_screen.root_visual # Server image byte order if xcb_setup.image_byte_order != xcb.ImageOrder.LSBFirst: msg = "Only X11 servers using LSB-First images are supported." @@ -103,7 +107,7 @@ def __init__(self, /, **kwargs: Any) -> None: # noqa: PLR0912 msg = "Internal error: drawable's depth not found in screen's supported depths" raise ScreenShotError(msg) for visual_info in xcb.depth_visuals(xcb_depth): - if visual_info.visual_id.value == self.drawable_visual_id: + if visual_info.visual_id == self.drawable_visual_id: break else: msg = "Internal error: drawable's visual not found in screen's supported visuals" @@ -140,8 +144,37 @@ def _monitors_impl(self) -> None: msg = "Cannot identify monitors while the connection is closed" raise ScreenShotError(msg) - # The first entry is the whole X11 screen that the root is on. That's the one that covers all the - # monitors. + self._append_root_monitor() + + randr_version = self._randr_get_version() + if randr_version is None or randr_version < (1, 2): + return + + # XRandR terminology (very abridged, but enough for this code): + # - X screen / framebuffer: the overall drawable area for this root. + # - CRTC: a display controller that scans out a rectangular region of the X screen. A CRTC with zero + # outputs is inactive. A CRTC may drive multiple outputs in clone/mirroring mode. + # - Output: a physical connector (e.g. "HDMI-1", "DP-1"). The RandR "connection" state (connected vs + # disconnected) is separate from whether the output is currently driven by a CRTC. + # - Monitor (RandR 1.5+): a logical rectangle presented to clients. Monitors may be client-defined (useful + # for tiled displays) and are the closest match to what MSS wants. + # + # This implementation prefers RandR 1.5+ Monitors when available; otherwise it falls back to enumerating + # active CRTCs. + + primary_output = self._randr_get_primary_output(randr_version) + edid_atom = self._randr_get_edid_atom() + + if randr_version >= (1, 5): + self._monitors_from_randr_monitors(primary_output, edid_atom) + else: + self._monitors_from_randr_crtcs(randr_version, primary_output, edid_atom) + + def _append_root_monitor(self) -> None: + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) + root_geom = xcb.get_geometry(self.conn, self.root) self._monitors.append( { @@ -152,47 +185,181 @@ def _monitors_impl(self) -> None: } ) - # After that, we have one for each monitor on that X11 screen. For decades, that's been handled by - # Xrandr. We don't presently try to work with Xinerama. So, we're going to check the different outputs, - # according to Xrandr. If that fails, we'll just leave the one root covering everything. + def _randr_get_version(self) -> tuple[int, int] | None: + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) - # Make sure we have the Xrandr extension we need. This will query the cache that we started populating in - # __init__. randr_ext_data = xcb.get_extension_data(self.conn, LIB.randr_id) if not randr_ext_data.present: - return + return None - # We ask the server to give us anything up to the version we support (i.e., what we expect the reply - # structs to look like). If the server only supports 1.2, then that's what it'll give us, and we're ok - # with that, but we also use a faster path if the server implements at least 1.3. randr_version_data = xcb.randr_query_version(self.conn, xcb.RANDR_MAJOR_VERSION, xcb.RANDR_MINOR_VERSION) - randr_version = (randr_version_data.major_version, randr_version_data.minor_version) - if randr_version < (1, 2): - return + return (randr_version_data.major_version, randr_version_data.minor_version) + + def _randr_get_primary_output(self, randr_version: tuple[int, int], /) -> xcb.RandrOutput | None: + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) + + if randr_version >= (1, 3): + primary_output_data = xcb.randr_get_output_primary(self.conn, self.drawable) + return primary_output_data.output + # Python None means that there was no way to identify a primary output. This is distinct from XCB_NONE (that + # is, xcb.RandROutput(0)), which means that there is not a primary monitor. + return None + + def _randr_get_edid_atom(self) -> xcb.Atom | None: + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) + + edid_atom = xcb.intern_atom(self.conn, "EDID", only_if_exists=True) + if edid_atom is not None: + return edid_atom + + # Formerly, "EDID" was known as "EdidData". I don't know when it changed. + return xcb.intern_atom(self.conn, "EdidData", only_if_exists=True) + + def _randr_output_ids( + self, + output: xcb.RandrOutput, + timestamp: xcb.Timestamp, + edid_atom: xcb.Atom | None, + /, + ) -> dict[str, Any]: + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) + + output_info = xcb.randr_get_output_info(self.conn, output, timestamp) + if output_info.status != 0: + msg = "Display configuration changed while detecting monitors." + raise ScreenShotError(msg) + + rv: dict[str, Any] = {} + + output_name_arr = xcb.randr_get_output_info_name(output_info) + rv["output"] = bytes(output_name_arr).decode("utf_8", errors="replace") + + if edid_atom is not None: + edid_prop = xcb.randr_get_output_property( + self.conn, # connection + output, # output + edid_atom, # property + xcb.XCB_NONE, # property type: Any + 0, # long-offset: 0 + 1024, # long-length: in 4-byte units; 4k is plenty for an EDID + 0, # delete: false + 0, # pending: false + ) + if edid_prop.type_.value != 0: + edid_block = bytes(xcb.randr_get_output_property_data(edid_prop)) + edid_data = parse_edid(edid_block) + if (display_name := edid_data.get("display_name")) is not None: + rv["name"] = display_name + + edid_params: dict[str, str] = {} + if (id_legacy := edid_data.get("id_legacy")) is not None: + edid_params["model"] = id_legacy + if (serial_number := edid_data.get("serial_number")) is not None: + edid_params["serial"] = str(serial_number) + if (manufacture_year := edid_data.get("manufacture_year")) is not None: + if (manufacture_week := edid_data.get("manufacture_week")) is not None: + edid_params["mfr_date"] = f"{manufacture_year:04d}W{manufacture_week:02d}" + else: + edid_params["mfr_date"] = f"{manufacture_year:04d}" + if (model_year := edid_data.get("model_year")) is not None: + edid_params["model_year"] = f"{model_year:04d}" + if edid_params: + rv["unique_id"] = urlencode(edid_params) + + return rv + + @staticmethod + def _choose_randr_output( + outputs: Array[xcb.RandrOutput], primary_output: xcb.RandrOutput | None, / + ) -> xcb.RandrOutput: + if len(outputs) == 0: + msg = "No RandR outputs available" + raise ScreenShotError(msg) + if primary_output is None: + # We don't want to use the `in` check if this could be None, according to MyPy. + return outputs[0] + if primary_output in outputs: + return primary_output + return outputs[0] + + def _monitors_from_randr_monitors( + self, primary_output: xcb.RandrOutput | None, edid_atom: xcb.Atom | None, / + ) -> None: + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) + + monitors_reply = xcb.randr_get_monitors(self.conn, self.drawable, 1) + timestamp = monitors_reply.timestamp + for randr_monitor in xcb.randr_get_monitors_monitors(monitors_reply): + monitor = { + "left": randr_monitor.x, + "top": randr_monitor.y, + "width": randr_monitor.width, + "height": randr_monitor.height, + } + # Under XRandR, it's legal for no monitor to be primary. In this case, case MSSBase.primary_monitor will + # return the first monitor. That said, we note in the dict that we explicitly are told by XRandR that + # all of the monitors are not primary. (This is distinct from the XRandR 1.2 path, which doesn't have + # any information about primary monitors.) + monitor["is_primary"] = bool(randr_monitor.primary) + + if randr_monitor.nOutput > 0: + outputs = xcb.randr_monitor_info_outputs(randr_monitor) + chosen_output = self._choose_randr_output(outputs, primary_output) + monitor |= self._randr_output_ids(chosen_output, timestamp, edid_atom) + + self._monitors.append(monitor) + + def _monitors_from_randr_crtcs( + self, + randr_version: tuple[int, int], + primary_output: xcb.RandrOutput | None, + edid_atom: xcb.Atom | None, + /, + ) -> None: + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) screen_resources: xcb.RandrGetScreenResourcesReply | xcb.RandrGetScreenResourcesCurrentReply - # Check to see if we have the xcb_randr_get_screen_resources_current function in libxcb-randr, and that - # the server supports it. if hasattr(LIB.randr, "xcb_randr_get_screen_resources_current") and randr_version >= (1, 3): - screen_resources = xcb.randr_get_screen_resources_current(self.conn, self.drawable.value) + screen_resources = xcb.randr_get_screen_resources_current(self.conn, self.drawable) crtcs = xcb.randr_get_screen_resources_current_crtcs(screen_resources) else: - # Either the client or the server doesn't support the _current form. That's ok; we'll use the old - # function, which forces a new query to the physical monitors. screen_resources = xcb.randr_get_screen_resources(self.conn, self.drawable) crtcs = xcb.randr_get_screen_resources_crtcs(screen_resources) + timestamp = screen_resources.config_timestamp for crtc in crtcs: - crtc_info = xcb.randr_get_crtc_info(self.conn, crtc, screen_resources.config_timestamp) + crtc_info = xcb.randr_get_crtc_info(self.conn, crtc, timestamp) if crtc_info.num_outputs == 0: continue - self._monitors.append( - {"left": crtc_info.x, "top": crtc_info.y, "width": crtc_info.width, "height": crtc_info.height} - ) + monitor = { + "left": crtc_info.x, + "top": crtc_info.y, + "width": crtc_info.width, + "height": crtc_info.height, + } + + outputs = xcb.randr_get_crtc_info_outputs(crtc_info) + chosen_output = self._choose_randr_output(outputs, primary_output) + monitor |= self._randr_output_ids(chosen_output, timestamp, edid_atom) + # The concept of primary outputs was added in XRandR 1.3. We distinguish between "all the monitors are + # not primary" (RRGetOutputPrimary returned XCB_NONE, a valid case) and "we have no way to get + # information about the primary monitor": in the latter case, we don't populate "is_primary". + if primary_output is not None: + monitor["is_primary"] = chosen_output == primary_output - # Extra credit would be to enumerate the virtual desktops; see - # https://specifications.freedesktop.org/wm/latest/ar01s03.html. But I don't know how widely-used that - # style is. + self._monitors.append(monitor) def _cursor_impl_check_xfixes(self) -> bool: """Check XFixes availability and version. @@ -277,11 +444,11 @@ def _grab_impl_xgetimage(self, monitor: Monitor, /) -> ScreenShot: # Copy this into a new bytearray, so that it will persist after we clear the image structure. img_data = bytearray(img_data_arr) - if img_reply.depth != self.drawable_depth or img_reply.visual.value != self.drawable_visual_id: + if img_reply.depth != self.drawable_depth or img_reply.visual != self.drawable_visual_id: # This should never happen; a window can't change its visual. msg = ( "Server returned an image with a depth or visual different than it initially reported: " - f"expected {self.drawable_depth},{hex(self.drawable_visual_id)}, " + f"expected {self.drawable_depth},{hex(self.drawable_visual_id.value)}, " f"got {img_reply.depth},{hex(img_reply.visual.value)}" ) raise ScreenShotError(msg) diff --git a/src/mss/linux/xcb.py b/src/mss/linux/xcb.py index f599e2a..64ea929 100644 --- a/src/mss/linux/xcb.py +++ b/src/mss/linux/xcb.py @@ -1,11 +1,13 @@ from __future__ import annotations -from ctypes import _Pointer, c_int +import contextlib +from ctypes import _Pointer, addressof, c_int +from typing import Literal, overload from . import xcbgen # We import these just so they're re-exported to our users. -# ruff: noqa: F401, TC001 +# ruff: noqa: F401 from .xcbgen import ( RANDR_MAJOR_VERSION, RANDR_MINOR_VERSION, @@ -29,12 +31,19 @@ ImageOrder, Keycode, Pixmap, + RandrConnection, RandrCrtc, RandrGetCrtcInfoReply, + RandrGetMonitorsReply, + RandrGetOutputInfoReply, + RandrGetOutputPrimaryReply, + RandrGetOutputPropertyReply, RandrGetScreenResourcesCurrentReply, RandrGetScreenResourcesReply, RandrMode, RandrModeInfo, + RandrMonitorInfo, + RandrMonitorInfoIterator, RandrOutput, RandrQueryVersionReply, RandrSetConfig, @@ -75,6 +84,16 @@ randr_get_crtc_info, randr_get_crtc_info_outputs, randr_get_crtc_info_possible, + randr_get_monitors, + randr_get_monitors_monitors, + randr_get_output_info, + randr_get_output_info_clones, + randr_get_output_info_crtcs, + randr_get_output_info_modes, + randr_get_output_info_name, + randr_get_output_primary, + randr_get_output_property, + randr_get_output_property_data, randr_get_screen_resources, randr_get_screen_resources_crtcs, randr_get_screen_resources_current, @@ -85,6 +104,7 @@ randr_get_screen_resources_modes, randr_get_screen_resources_names, randr_get_screen_resources_outputs, + randr_monitor_info_outputs, randr_query_version, render_pictdepth_visuals, render_pictscreen_depths, @@ -109,7 +129,7 @@ ) # These are also here to re-export. -from .xcbhelpers import LIB, XID, Connection, QueryExtensionReply, XcbExtension, XError +from .xcbhelpers import LIB, XID, Connection, InternAtomReply, QueryExtensionReply, XcbExtension, XError XCB_CONN_ERROR = 1 XCB_CONN_CLOSED_EXT_NOTSUPPORTED = 2 @@ -134,6 +154,147 @@ #### High-level XCB function wrappers +# XCB_NONE is the universal null resource or null atom parameter value for many core X requests +XCB_NONE = Atom(0) + +# Some atoms are defined by the spec, to avoid apps having to look them up. It's fine to look them up anyway. +_PREDEFINED_ATOMS = { + "PRIMARY": Atom(1), + "SECONDARY": Atom(2), + "ARC": Atom(3), + "ATOM": Atom(4), + "BITMAP": Atom(5), + "CARDINAL": Atom(6), + "COLORMAP": Atom(7), + "CURSOR": Atom(8), + "CUT_BUFFER0": Atom(9), + "CUT_BUFFER1": Atom(10), + "CUT_BUFFER2": Atom(11), + "CUT_BUFFER3": Atom(12), + "CUT_BUFFER4": Atom(13), + "CUT_BUFFER5": Atom(14), + "CUT_BUFFER6": Atom(15), + "CUT_BUFFER7": Atom(16), + "DRAWABLE": Atom(17), + "FONT": Atom(18), + "INTEGER": Atom(19), + "PIXMAP": Atom(20), + "POINT": Atom(21), + "RECTANGLE": Atom(22), + "RESOURCE_MANAGER": Atom(23), + "RGB_COLOR_MAP": Atom(24), + "RGB_BEST_MAP": Atom(25), + "RGB_BLUE_MAP": Atom(26), + "RGB_DEFAULT_MAP": Atom(27), + "RGB_GRAY_MAP": Atom(28), + "RGB_GREEN_MAP": Atom(29), + "RGB_RED_MAP": Atom(30), + "STRING": Atom(31), + "VISUALID": Atom(32), + "WINDOW": Atom(33), + "WM_COMMAND": Atom(34), + "WM_HINTS": Atom(35), + "WM_CLIENT_MACHINE": Atom(36), + "WM_ICON_NAME": Atom(37), + "WM_ICON_SIZE": Atom(38), + "WM_NAME": Atom(39), + "WM_NORMAL_HINTS": Atom(40), + "WM_SIZE_HINTS": Atom(41), + "WM_ZOOM_HINTS": Atom(42), + "MIN_SPACE": Atom(43), + "NORM_SPACE": Atom(44), + "MAX_SPACE": Atom(45), + "END_SPACE": Atom(46), + "SUPERSCRIPT_X": Atom(47), + "SUPERSCRIPT_Y": Atom(48), + "SUBSCRIPT_X": Atom(49), + "SUBSCRIPT_Y": Atom(50), + "UNDERLINE_POSITION": Atom(51), + "UNDERLINE_THICKNESS": Atom(52), + "STRIKEOUT_ASCENT": Atom(53), + "STRIKEOUT_DESCENT": Atom(54), + "ITALIC_ANGLE": Atom(55), + "X_HEIGHT": Atom(56), + "QUAD_WIDTH": Atom(57), + "WEIGHT": Atom(58), + "POINT_SIZE": Atom(59), + "RESOLUTION": Atom(60), + "COPYRIGHT": Atom(61), + "NOTICE": Atom(62), + "FONT_NAME": Atom(63), + "FAMILY_NAME": Atom(64), + "FULL_NAME": Atom(65), + "CAP_HEIGHT": Atom(66), + "WM_CLASS": Atom(67), + "WM_TRANSIENT_FOR": Atom(68), +} + +# The atom cache needs to be per-connection. Rather than keying on a (connection, name) tuple, we use a two-level +# cache keyed by the integer address of the underlying XCB connection (see ctypes.addressof in intern_atom). +_ATOM_CACHE: dict[int, dict[str, Atom]] = {} + + +@overload +def intern_atom( + xcb_conn: Connection | _Pointer[Connection], + name: str, + *, + only_if_exists: Literal[False] = False, +) -> Atom: ... +@overload +def intern_atom( + xcb_conn: Connection | _Pointer[Connection], + name: str, + *, + only_if_exists: Literal[True] = True, +) -> Atom | None: ... +@overload +def intern_atom( + xcb_conn: Connection | _Pointer[Connection], + name: str, + *, + only_if_exists: bool, +) -> Atom | None: ... + + +def intern_atom( + xcb_conn: Connection | _Pointer[Connection], + name: str, + *, + only_if_exists: bool = False, +) -> Atom | None: + if name in _PREDEFINED_ATOMS: + return _PREDEFINED_ATOMS[name] + + if isinstance(xcb_conn, _Pointer): + # Dereference the pointer before using the cache. + xcb_conn = xcb_conn.contents + cache_key = addressof(xcb_conn) + if cache_key not in _ATOM_CACHE: + # This can happen if the connection was closed and its cache cleared, but some code still has a reference to + # the connection object. We could re-create the cache entry, but it's safer to just fail instead of silently + # allowing lookups to succeed when they shouldn't. + msg = "Connection to X server is closed" + raise XError(msg) + if name in _ATOM_CACHE[cache_key]: + return _ATOM_CACHE[cache_key][name] + + # Atom names are required to be Latin-1, per the X protocol spec, although anything that's not in the XPCS (a + # subset of ASCII) is vendor-defined. + name_encoded = name.encode("latin_1", errors="strict") + cookie = LIB.xcb.xcb_intern_atom(xcb_conn, 1 if only_if_exists else 0, len(name_encoded), name_encoded) + atom_as_xid = cookie.reply(xcb_conn).atom + if atom_as_xid.value == 0: + if not only_if_exists: + # This shouldn't be possible. We at least need to have a path for the type-checker to be happy, though. + msg = f"X server failed to intern atom '{name}'" + raise XError(msg) + # We don't do negative caching, since any app might intern the atom at any time. + return None + atom = Atom(atom_as_xid.value) + _ATOM_CACHE[cache_key][name] = atom + return atom + def get_extension_data( xcb_conn: Connection | _Pointer[Connection], ext: XcbExtension | _Pointer[XcbExtension] @@ -210,13 +371,23 @@ def connect(display: str | bytes | None = None) -> tuple[Connection, int]: prefetch_extension_data(conn_p, LIB.shm_id) prefetch_extension_data(conn_p, LIB.xfixes_id) + _ATOM_CACHE[addressof(conn_p.contents)] = {} + return conn_p.contents, pref_screen_num.value -def disconnect(conn: Connection) -> None: - conn_err = LIB.xcb.xcb_connection_has_error(conn) +def disconnect(xcb_conn: Connection | _Pointer[Connection]) -> None: + if isinstance(xcb_conn, _Pointer): + # Dereference the pointer before using the cache. + xcb_conn = xcb_conn.contents + + # The cache might already be cleared if the connection had an error, or if disconnect was called multiple times. + with contextlib.suppress(KeyError): + del _ATOM_CACHE[addressof(xcb_conn)] + + conn_err = LIB.xcb.xcb_connection_has_error(xcb_conn) # XCB won't free its connection structures until we disconnect, even in the event of an error. - LIB.xcb.xcb_disconnect(conn) + LIB.xcb.xcb_disconnect(xcb_conn) if conn_err != 0: msg = "Connection to X server closed: " conn_errmsg = XCB_CONN_ERRMSG.get(conn_err) diff --git a/src/mss/linux/xcbgen.py b/src/mss/linux/xcbgen.py index 6fba72b..606ad29 100644 --- a/src/mss/linux/xcbgen.py +++ b/src/mss/linux/xcbgen.py @@ -41,6 +41,12 @@ # Enum classes +class RandrConnection(IntEnum): + Connected = 0 + Disconnected = 1 + Unknown = 2 + + class RandrSetConfig(IntEnum): Success = 0 InvalidConfigTime = 1 @@ -96,7 +102,13 @@ class Drawable(XID): class Keycode(c_uint8): - pass + def __eq__(self, other: object) -> bool: + if isinstance(other, Keycode): + return self.value == other.value + return NotImplemented + + def __hash__(self) -> int: + return hash(self.value) class Format(Structure): @@ -117,7 +129,13 @@ class Colormap(XID): class Visualid(c_uint32): - pass + def __eq__(self, other: object) -> bool: + if isinstance(other, Visualid): + return self.value == other.value + return NotImplemented + + def __hash__(self) -> int: + return hash(self.value) class Visualtype(Structure): @@ -261,7 +279,13 @@ class RandrQueryVersionReply(Structure): class Timestamp(c_uint32): - pass + def __eq__(self, other: object) -> bool: + if isinstance(other, Timestamp): + return self.value == other.value + return NotImplemented + + def __hash__(self) -> int: + return hash(self.value) class RandrCrtc(XID): @@ -345,6 +369,81 @@ class RandrGetCrtcInfoReply(Structure): ) +class RandrGetOutputInfoReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("status", c_uint8), + ("sequence", c_uint16), + ("length", c_uint32), + ("timestamp", Timestamp), + ("crtc", RandrCrtc), + ("mm_width", c_uint32), + ("mm_height", c_uint32), + ("connection", c_uint8), + ("subpixel_order", c_uint8), + ("num_crtcs", c_uint16), + ("num_modes", c_uint16), + ("num_preferred", c_uint16), + ("num_clones", c_uint16), + ("name_len", c_uint16), + ) + + +class RandrGetOutputPrimaryReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("pad0", c_uint8 * 1), + ("sequence", c_uint16), + ("length", c_uint32), + ("output", RandrOutput), + ) + + +class RandrGetOutputPropertyReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("format_", c_uint8), + ("sequence", c_uint16), + ("length", c_uint32), + ("type_", Atom), + ("bytes_after", c_uint32), + ("num_items", c_uint32), + ("pad0", c_uint8 * 12), + ) + + +class RandrMonitorInfo(Structure): + _fields_ = ( + ("name", Atom), + ("primary", c_uint8), + ("automatic", c_uint8), + ("nOutput", c_uint16), + ("x", c_int16), + ("y", c_int16), + ("width", c_uint16), + ("height", c_uint16), + ("width_in_millimeters", c_uint32), + ("height_in_millimeters", c_uint32), + ) + + +class RandrMonitorInfoIterator(Structure): + _fields_ = (("data", POINTER(RandrMonitorInfo)), ("rem", c_int), ("index", c_int)) + + +class RandrGetMonitorsReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("pad0", c_uint8 * 1), + ("sequence", c_uint16), + ("length", c_uint32), + ("timestamp", Timestamp), + ("nMonitors", c_uint32), + ("nOutputs", c_uint32), + ("pad1", c_uint8 * 12), + ) + + class RenderQueryVersionReply(Structure): _fields_ = ( ("response_type", c_uint8), @@ -597,6 +696,42 @@ def randr_get_crtc_info_possible(r: RandrGetCrtcInfoReply) -> Array[RandrOutput] ) +def randr_get_output_info_crtcs(r: RandrGetOutputInfoReply) -> Array[RandrCrtc]: + return array_from_xcb( + LIB.randr.xcb_randr_get_output_info_crtcs, LIB.randr.xcb_randr_get_output_info_crtcs_length, r + ) + + +def randr_get_output_info_modes(r: RandrGetOutputInfoReply) -> Array[RandrMode]: + return array_from_xcb( + LIB.randr.xcb_randr_get_output_info_modes, LIB.randr.xcb_randr_get_output_info_modes_length, r + ) + + +def randr_get_output_info_clones(r: RandrGetOutputInfoReply) -> Array[RandrOutput]: + return array_from_xcb( + LIB.randr.xcb_randr_get_output_info_clones, LIB.randr.xcb_randr_get_output_info_clones_length, r + ) + + +def randr_get_output_info_name(r: RandrGetOutputInfoReply) -> Array[c_uint8]: + return array_from_xcb(LIB.randr.xcb_randr_get_output_info_name, LIB.randr.xcb_randr_get_output_info_name_length, r) + + +def randr_get_output_property_data(r: RandrGetOutputPropertyReply) -> Array[c_uint8]: + return array_from_xcb( + LIB.randr.xcb_randr_get_output_property_data, LIB.randr.xcb_randr_get_output_property_data_length, r + ) + + +def randr_monitor_info_outputs(r: RandrMonitorInfo) -> Array[RandrOutput]: + return array_from_xcb(LIB.randr.xcb_randr_monitor_info_outputs, LIB.randr.xcb_randr_monitor_info_outputs_length, r) + + +def randr_get_monitors_monitors(r: RandrGetMonitorsReply) -> list[RandrMonitorInfo]: + return list_from_xcb(LIB.randr.xcb_randr_get_monitors_monitors_iterator, LIB.randr.xcb_randr_monitor_info_next, r) + + def render_pictdepth_visuals(r: RenderPictdepth) -> Array[RenderPictvisual]: return array_from_xcb(LIB.render.xcb_render_pictdepth_visuals, LIB.render.xcb_render_pictdepth_visuals_length, r) @@ -686,6 +821,33 @@ def randr_get_crtc_info(c: Connection, crtc: RandrCrtc, config_timestamp: Timest return LIB.randr.xcb_randr_get_crtc_info(c, crtc, config_timestamp).reply(c) +def randr_get_output_info(c: Connection, output: RandrOutput, config_timestamp: Timestamp) -> RandrGetOutputInfoReply: + return LIB.randr.xcb_randr_get_output_info(c, output, config_timestamp).reply(c) + + +def randr_get_output_primary(c: Connection, window: Window) -> RandrGetOutputPrimaryReply: + return LIB.randr.xcb_randr_get_output_primary(c, window).reply(c) + + +def randr_get_output_property( + c: Connection, + output: RandrOutput, + property_: Atom, + type_: Atom, + long_offset: c_uint32 | int, + long_length: c_uint32 | int, + delete: c_uint8 | int, + pending: c_uint8 | int, +) -> RandrGetOutputPropertyReply: + return LIB.randr.xcb_randr_get_output_property( + c, output, property_, type_, long_offset, long_length, delete, pending + ).reply(c) + + +def randr_get_monitors(c: Connection, window: Window, get_active: c_uint8 | int) -> RandrGetMonitorsReply: + return LIB.randr.xcb_randr_get_monitors(c, window, get_active).reply(c) + + def render_query_version( c: Connection, client_major_version: c_uint32 | int, client_minor_version: c_uint32 | int ) -> RenderQueryVersionReply: @@ -746,6 +908,8 @@ def initialize() -> None: # noqa: PLR0915 LIB.xcb.xcb_screen_next.restype = None LIB.xcb.xcb_setup_next.argtypes = (POINTER(SetupIterator),) LIB.xcb.xcb_setup_next.restype = None + LIB.randr.xcb_randr_monitor_info_next.argtypes = (POINTER(RandrMonitorInfoIterator),) + LIB.randr.xcb_randr_monitor_info_next.restype = None LIB.render.xcb_render_pictdepth_next.argtypes = (POINTER(RenderPictdepthIterator),) LIB.render.xcb_render_pictdepth_next.restype = None LIB.render.xcb_render_pictscreen_next.argtypes = (POINTER(RenderPictscreenIterator),) @@ -822,6 +986,32 @@ def initialize() -> None: # noqa: PLR0915 LIB.randr.xcb_randr_get_crtc_info_possible.restype = POINTER(RandrOutput) LIB.randr.xcb_randr_get_crtc_info_possible_length.argtypes = (POINTER(RandrGetCrtcInfoReply),) LIB.randr.xcb_randr_get_crtc_info_possible_length.restype = c_int + LIB.randr.xcb_randr_get_output_info_crtcs.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_crtcs.restype = POINTER(RandrCrtc) + LIB.randr.xcb_randr_get_output_info_crtcs_length.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_crtcs_length.restype = c_int + LIB.randr.xcb_randr_get_output_info_modes.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_modes.restype = POINTER(RandrMode) + LIB.randr.xcb_randr_get_output_info_modes_length.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_modes_length.restype = c_int + LIB.randr.xcb_randr_get_output_info_clones.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_clones.restype = POINTER(RandrOutput) + LIB.randr.xcb_randr_get_output_info_clones_length.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_clones_length.restype = c_int + LIB.randr.xcb_randr_get_output_info_name.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_name.restype = POINTER(c_uint8) + LIB.randr.xcb_randr_get_output_info_name_length.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_name_length.restype = c_int + LIB.randr.xcb_randr_get_output_property_data.argtypes = (POINTER(RandrGetOutputPropertyReply),) + LIB.randr.xcb_randr_get_output_property_data.restype = POINTER(c_uint8) + LIB.randr.xcb_randr_get_output_property_data_length.argtypes = (POINTER(RandrGetOutputPropertyReply),) + LIB.randr.xcb_randr_get_output_property_data_length.restype = c_int + LIB.randr.xcb_randr_monitor_info_outputs.argtypes = (POINTER(RandrMonitorInfo),) + LIB.randr.xcb_randr_monitor_info_outputs.restype = POINTER(RandrOutput) + LIB.randr.xcb_randr_monitor_info_outputs_length.argtypes = (POINTER(RandrMonitorInfo),) + LIB.randr.xcb_randr_monitor_info_outputs_length.restype = c_int + LIB.randr.xcb_randr_get_monitors_monitors_iterator.argtypes = (POINTER(RandrGetMonitorsReply),) + LIB.randr.xcb_randr_get_monitors_monitors_iterator.restype = RandrMonitorInfoIterator LIB.render.xcb_render_pictdepth_visuals.argtypes = (POINTER(RenderPictdepth),) LIB.render.xcb_render_pictdepth_visuals.restype = POINTER(RenderPictvisual) LIB.render.xcb_render_pictdepth_visuals_length.argtypes = (POINTER(RenderPictdepth),) @@ -842,10 +1032,7 @@ def initialize() -> None: # noqa: PLR0915 LIB.xfixes.xcb_xfixes_get_cursor_image_cursor_image.restype = POINTER(c_uint32) LIB.xfixes.xcb_xfixes_get_cursor_image_cursor_image_length.argtypes = (POINTER(XfixesGetCursorImageReply),) LIB.xfixes.xcb_xfixes_get_cursor_image_cursor_image_length.restype = c_int - LIB.shm.xcb_shm_create_segment_reply_fds.argtypes = ( - POINTER(Connection), - POINTER(ShmCreateSegmentReply), - ) + LIB.shm.xcb_shm_create_segment_reply_fds.argtypes = (POINTER(Connection), POINTER(ShmCreateSegmentReply)) LIB.shm.xcb_shm_create_segment_reply_fds.restype = POINTER(c_int) initialize_xcb_typed_func(LIB.xcb, "xcb_get_geometry", [POINTER(Connection), Drawable], GetGeometryReply) initialize_xcb_typed_func( @@ -877,6 +1064,21 @@ def initialize() -> None: # noqa: PLR0915 initialize_xcb_typed_func( LIB.randr, "xcb_randr_get_crtc_info", [POINTER(Connection), RandrCrtc, Timestamp], RandrGetCrtcInfoReply ) + initialize_xcb_typed_func( + LIB.randr, "xcb_randr_get_output_info", [POINTER(Connection), RandrOutput, Timestamp], RandrGetOutputInfoReply + ) + initialize_xcb_typed_func( + LIB.randr, "xcb_randr_get_output_primary", [POINTER(Connection), Window], RandrGetOutputPrimaryReply + ) + initialize_xcb_typed_func( + LIB.randr, + "xcb_randr_get_output_property", + [POINTER(Connection), RandrOutput, Atom, Atom, c_uint32, c_uint32, c_uint8, c_uint8], + RandrGetOutputPropertyReply, + ) + initialize_xcb_typed_func( + LIB.randr, "xcb_randr_get_monitors", [POINTER(Connection), Window, c_uint8], RandrGetMonitorsReply + ) initialize_xcb_typed_func( LIB.render, "xcb_render_query_version", [POINTER(Connection), c_uint32, c_uint32], RenderQueryVersionReply ) @@ -890,20 +1092,12 @@ def initialize() -> None: # noqa: PLR0915 [POINTER(Connection), Drawable, c_int16, c_int16, c_uint16, c_uint16, c_uint32, c_uint8, ShmSeg, c_uint32], ShmGetImageReply, ) - LIB.shm.xcb_shm_attach_fd_checked.argtypes = ( - POINTER(Connection), - ShmSeg, - c_int, - c_uint8, - ) + LIB.shm.xcb_shm_attach_fd_checked.argtypes = (POINTER(Connection), ShmSeg, c_int, c_uint8) LIB.shm.xcb_shm_attach_fd_checked.restype = VoidCookie initialize_xcb_typed_func( LIB.shm, "xcb_shm_create_segment", [POINTER(Connection), ShmSeg, c_uint32, c_uint8], ShmCreateSegmentReply ) - LIB.shm.xcb_shm_detach_checked.argtypes = ( - POINTER(Connection), - ShmSeg, - ) + LIB.shm.xcb_shm_detach_checked.argtypes = (POINTER(Connection), ShmSeg) LIB.shm.xcb_shm_detach_checked.restype = VoidCookie initialize_xcb_typed_func( LIB.xfixes, "xcb_xfixes_query_version", [POINTER(Connection), c_uint32, c_uint32], XfixesQueryVersionReply diff --git a/src/mss/linux/xcbhelpers.py b/src/mss/linux/xcbhelpers.py index f387c79..de5fafd 100644 --- a/src/mss/linux/xcbhelpers.py +++ b/src/mss/linux/xcbhelpers.py @@ -10,6 +10,7 @@ Structure, _Pointer, addressof, + c_char, c_char_p, c_int, c_uint, @@ -100,7 +101,13 @@ class Connection(Structure): class XID(c_uint32): - pass + def __eq__(self, other: object) -> bool: + if isinstance(other, XID): + return self.value == other.value + return NotImplemented + + def __hash__(self) -> int: + return hash(self.value) class GenericErrorStructure(Structure): @@ -119,6 +126,18 @@ class GenericErrorStructure(Structure): ) +# We special-case InternAtom for convenience. +class InternAtomReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("pad0", c_uint8 * 1), + ("sequence", c_uint16), + ("length", c_uint32), + # This is actually an Atom, not a raw XID, but we handle the type conversion in intern_atom. + ("atom", XID), + ) + + #### Request / response handling # # The following recaps a lot of what's in the xcb-requests(3) man page, with a few notes about what we're doing in @@ -462,6 +481,14 @@ def initialize(self, callbacks: Iterable[Callable[[], None]] = frozenset()) -> N self.xcb.xcb_disconnect.argtypes = [POINTER(Connection)] self.xcb.xcb_disconnect.restype = None + # We special-case InternAtom for convenience. + initialize_xcb_typed_func( + LIB.xcb, + "xcb_intern_atom", + [POINTER(Connection), c_uint8, c_uint16, POINTER(c_char)], + InternAtomReply, + ) + libxcb_randr_so = ctypes.util.find_library("xcb-randr") if libxcb_randr_so is None: msg = "Library libxcb-randr.so not found" diff --git a/src/mss/linux/xshmgetimage.py b/src/mss/linux/xshmgetimage.py index cdd882c..6b5a492 100644 --- a/src/mss/linux/xshmgetimage.py +++ b/src/mss/linux/xshmgetimage.py @@ -166,7 +166,7 @@ def _grab_impl_xshmgetimage(self, monitor: Monitor) -> ScreenShot: img_reply = xcb.shm_get_image( self.conn, - self.drawable.value, + self.drawable, monitor["left"], monitor["top"], monitor["width"], @@ -177,11 +177,11 @@ def _grab_impl_xshmgetimage(self, monitor: Monitor) -> ScreenShot: 0, ) - if img_reply.depth != self.drawable_depth or img_reply.visual.value != self.drawable_visual_id: + if img_reply.depth != self.drawable_depth or img_reply.visual != self.drawable_visual_id: # This should never happen; a window can't change its visual. msg = ( "Server returned an image with a depth or visual different than it initially reported: " - f"expected {self.drawable_depth},{hex(self.drawable_visual_id)}, " + f"expected {self.drawable_depth},{hex(self.drawable_visual_id.value)}, " f"got {img_reply.depth},{hex(img_reply.visual.value)}" ) raise ScreenShotError(msg) diff --git a/src/mss/tools.py b/src/mss/tools.py index 7f05366..9e4b7e9 100644 --- a/src/mss/tools.py +++ b/src/mss/tools.py @@ -12,6 +12,136 @@ from pathlib import Path +_EDID_BLOCK_LEN = 128 +_EDID_SERIAL_NUMBER_NOT_SET = 0 # The serial number field is unused +_EDID_MANUFACTURE_WEEK_YEAR_IS_MODEL = 0xFF # The year field is for the model year, not the year of manufacture. +_EDID_MANUFACTURE_WEEK_UNKNOWN = 0 # Only the year of manufacture, not the week, is known. +_EDID_YEAR_BASE = 1990 # The starting year for EDID years +_EDID_DESCR_OFFSETS = [0x48, 0x5A, 0x6C] # Display descriptor definition locations in the base block +_EDID_DESCR_LEN = 18 +_EDID_DESCR_ZERO_LOCS = [0, 1, 2, 4] # Locations that must be 0 to mark a display descriptor definition +_EDID_DESCR_TAG_LOC = 3 # Location of display descriptor tag +_EDID_DESCR_TAG_SN = 0xFF # Descriptor is a string serial number +_EDID_DESCR_TAG_NAME = 0xFC # Descriptor is a string model name +_EDID_DESCR_STR_LOC = slice(5, 18) # Location of string in a display descriptor + + +def parse_edid(edid_data: bytes) -> dict: + """Parse a monitor's EDID block. + + Many fields are currently ignored, but may be added in the future. + + If the EDID block cannot be parsed, this returns an empty dict. + + The dict defines the following fields. Any of these may be + missing, if the EDID block does not define them. + + - id_legacy (str): The legacy monitor ID, used in a number of + APIs. This is simply f"{manufacturer}{product_code:04X}". + Those subfields are not part of the returned dict, but are + nominally described as: + + - manufacturer (str): A three-letter, all-uppercase code + specifying the manufacturer's legacy PnP ID. The registry is + managed by UEFI forum. + - product_code (int): A 16-bit product code. This is typically + displayed as four hex digits if rendered to a string. + + - serial_number (str | int): Serial number of the monitor. EDID + block may provide this as an int, string, or both; the string + version is preferred. + - manufacture_week (int): The week, 1-54, of manufacture. This + may not be populated, even if the year is. (The way the weeks + are numbered is up to the manufacturer.) + - manufacture_year (int): The year, 1990 or later, of manufacture. + - model_year (int): The year, 1990 or later, that the model was + released. This is used if the manufacturer doesn't want to + update their EDID block each year; the manufacture_year field is + more common. + - display_name (str): The monitor's model. This is the preferred + value for display. If this field is not present, then id_legacy + is a distant second. + + Currently, the serial_number and name fields are always in ASCII. + This function doesn't currently try to implement the + internationalization extensions defined in the VESA LS-EXT + standard. However, we may in the future. + + We also don't currently inspect the extension blocks. The name + and serial number can be in CTA-861 extension blocks; I'll need to + see how common that is. + """ + # See also https://glenwing.github.io/docs/ for a lot of the relevant specs. + + if len(edid_data) < _EDID_BLOCK_LEN: + # Too short + return {} + + # Get the basic identification information from the start of the + # header. This has been part of EDID for a very long time. + block0 = edid_data[:_EDID_BLOCK_LEN] + if sum(block0) % 256 != 0: + # Checksum failure + return {} + + ( + header, + id_manufacturer_msb, + id_manufacturer_lsb, + id_product_code, + id_serial_number, + manufacture_week, + manufacture_year, + _edid_version, + _edid_revision, + _ext_count, + ) = struct.unpack("<8s2BHIBBBB106xBx", block0) + + if header != b"\x00\xff\xff\xff\xff\xff\xff\x00": + # Header incorrect + return {} + id_manufacturer_packed = id_manufacturer_msb << 8 | id_manufacturer_lsb + id_manufacturer = ( + chr(((id_manufacturer_packed >> 10) % 32) + 64) + + chr(((id_manufacturer_packed >> 5) % 32) + 64) + + chr((id_manufacturer_packed % 32) + 64) + ) + rv: dict[str, int | str] = { + "id_legacy": f"{id_manufacturer}{id_product_code:04X}", + } + if id_serial_number != _EDID_SERIAL_NUMBER_NOT_SET: + rv["serial_number"] = id_serial_number + if manufacture_week == _EDID_MANUFACTURE_WEEK_YEAR_IS_MODEL: + rv["model_year"] = manufacture_year + _EDID_YEAR_BASE + else: + if manufacture_week != _EDID_MANUFACTURE_WEEK_UNKNOWN: + rv["manufacture_week"] = manufacture_week + rv["manufacture_year"] = manufacture_year + _EDID_YEAR_BASE + + # Read the display descriptor definitions, which can have more useful information. + for descr_offset in _EDID_DESCR_OFFSETS: + descr = block0[descr_offset : descr_offset + _EDID_DESCR_LEN] + if any(descr[field_offset] != 0 for field_offset in _EDID_DESCR_ZERO_LOCS): + # Not a display descriptor definition + continue + # Check the tag in descr[3]. + # These strings are in ASCII, optionally terminated by \x0A then right-padded with \x20. In case a + # manufacturer got it a little wrong, we ignore everything after \x0A, and we also strip trailing \x20. (The + # spec requires the \x0A, but some manufacturers don't follow that.) + if descr[_EDID_DESCR_TAG_LOC] == _EDID_DESCR_TAG_SN: # Serial number + sn = descr[_EDID_DESCR_STR_LOC] + sn, _, _ = sn.partition(b"\x0a") + sn = sn.rstrip(b" ") + rv["serial_number"] = sn.decode("ascii", errors="replace") + elif descr[_EDID_DESCR_TAG_LOC] == _EDID_DESCR_TAG_NAME: # Name + name = descr[_EDID_DESCR_STR_LOC] + name, _, _ = name.partition(b"\x0a") + name = name.rstrip(b" ") + rv["display_name"] = name.decode("ascii", errors="replace") + + return rv + + def to_png(data: bytes, size: tuple[int, int], /, *, level: int = 6, output: Path | str | None = None) -> bytes | None: """Dump data to a PNG file. If `output` is `None`, create no file but return the whole PNG data. diff --git a/src/tests/test_tools.py b/src/tests/test_tools.py index 78feea7..7183b2d 100644 --- a/src/tests/test_tools.py +++ b/src/tests/test_tools.py @@ -5,12 +5,13 @@ from __future__ import annotations import io +import struct from pathlib import Path from typing import TYPE_CHECKING import pytest -from mss.tools import to_png +from mss.tools import parse_edid, to_png if TYPE_CHECKING: from collections.abc import Callable @@ -57,3 +58,116 @@ def test_output_raw_bytes() -> None: raw = to_png(data, (WIDTH, HEIGHT)) assert isinstance(raw, bytes) assert_is_valid_png(raw=raw) + + +# --------------------------------------------------------------------------- +# Helpers and tests for parse_edid +# --------------------------------------------------------------------------- + + +def _make_edid( # noqa: PLR0913 + *, + manufacturer: str = "TST", + product_code: int = 0x1234, + serial_number: int = 0, + manufacture_week: int = 0, + manufacture_year: int = 30, + descriptors: list[tuple[int, int, str]] | None = None, + bad_checksum: bool = False, +) -> bytes: + """Build a minimal 128-byte EDID block.""" + data = bytearray(128) + data[0:8] = b"\x00\xff\xff\xff\xff\xff\xff\x00" + packed = ( + ((ord(manufacturer[0]) - ord("@")) << 10) + | ((ord(manufacturer[1]) - ord("@")) << 5) + | (ord(manufacturer[2]) - ord("@")) + ) + data[8] = (packed >> 8) & 0xFF + data[9] = packed & 0xFF + struct.pack_into(" None: + assert parse_edid(b"") == {} + assert parse_edid(b"\x00" * 64) == {} + assert parse_edid(b"\x00" * 127) == {} + + +def test_parse_edid_invalid_checksum() -> None: + assert parse_edid(_make_edid(bad_checksum=True)) == {} + + +def test_parse_edid_invalid_header() -> None: + data = bytearray(_make_edid()) + data[0] = 0x01 # corrupt the header magic + data[127] = (-sum(data[:127])) % 256 # recompute checksum + assert parse_edid(bytes(data)) == {} + + +def test_parse_edid_basic() -> None: + result = parse_edid(_make_edid(manufacturer="TST", product_code=0x1234)) + assert result["id_legacy"] == "TST1234" + + +def test_parse_edid_manufacture_year_only() -> None: + result = parse_edid(_make_edid(manufacture_week=0, manufacture_year=30)) + assert result["manufacture_year"] == 2020 + assert "manufacture_week" not in result + assert "model_year" not in result + + +def test_parse_edid_manufacture_week_and_year() -> None: + result = parse_edid(_make_edid(manufacture_week=10, manufacture_year=30)) + assert result["manufacture_year"] == 2020 + assert result["manufacture_week"] == 10 + assert "model_year" not in result + + +def test_parse_edid_model_year() -> None: + result = parse_edid(_make_edid(manufacture_week=0xFF, manufacture_year=31)) + assert result["model_year"] == 2021 + assert "manufacture_year" not in result + assert "manufacture_week" not in result + + +def test_parse_edid_serial_number_integer() -> None: + result = parse_edid(_make_edid(serial_number=12345)) + assert result["serial_number"] == 12345 + + +def test_parse_edid_serial_number_not_set() -> None: + result = parse_edid(_make_edid(serial_number=0)) + assert "serial_number" not in result + + +def test_parse_edid_descriptor_serial_number() -> None: + result = parse_edid(_make_edid(descriptors=[(0x48, 0xFF, "SN123456")])) + assert result["serial_number"] == "SN123456" + + +def test_parse_edid_descriptor_display_name() -> None: + result = parse_edid(_make_edid(descriptors=[(0x5A, 0xFC, "Test Monitor")])) + assert result["display_name"] == "Test Monitor" + + +def test_parse_edid_descriptor_string_serial_overrides_integer() -> None: + result = parse_edid(_make_edid(serial_number=99, descriptors=[(0x48, 0xFF, "STRSERIAL")])) + assert result["serial_number"] == "STRSERIAL" diff --git a/src/tests/test_xcb.py b/src/tests/test_xcb.py index 24903ac..f51a920 100644 --- a/src/tests/test_xcb.py +++ b/src/tests/test_xcb.py @@ -1,6 +1,7 @@ from __future__ import annotations import gc +import platform from ctypes import ( POINTER, Structure, @@ -12,10 +13,13 @@ sizeof, ) from types import SimpleNamespace -from typing import Any, Callable +from typing import TYPE_CHECKING, Any, Callable from unittest.mock import Mock from weakref import finalize +if TYPE_CHECKING: + from collections.abc import Generator + import pytest from mss.exception import ScreenShotError @@ -27,6 +31,9 @@ list_from_xcb, ) +if platform.system().lower() != "linux": + pytestmark = pytest.mark.skip + def _force_gc() -> None: gc.collect() @@ -224,6 +231,77 @@ def visual_validation_env(monkeypatch: pytest.MonkeyPatch) -> _VisualValidationH return _VisualValidationHarness(monkeypatch) +#### intern_atom tests + + +class TestInternAtom: + """Tests for xcb.intern_atom and the _ATOM_CACHE mechanism.""" + + @pytest.fixture(autouse=True) + def setup_intern_atom(self) -> Generator[None, None, None]: + self.conn, _ = xcb.connect() + yield + xcb.disconnect(self.conn) + + def _mock_xcb_intern_atom(self, monkeypatch: pytest.MonkeyPatch, atom_value: int) -> Mock: + """Patch LIB.xcb.xcb_intern_atom to return a fake reply with the given atom value.""" + fake_reply = SimpleNamespace(atom=SimpleNamespace(value=atom_value)) + fake_cookie = Mock() + fake_cookie.reply.return_value = fake_reply + mock = Mock(return_value=fake_cookie) + monkeypatch.setattr(xcb.LIB.xcb, "xcb_intern_atom", mock) + return mock + + def test_predefined_atom_skips_xcb(self, monkeypatch: pytest.MonkeyPatch) -> None: + mock = self._mock_xcb_intern_atom(monkeypatch, 0) + atom = xcb.intern_atom(self.conn, "PRIMARY") + assert atom == xcb.Atom(1) + mock.assert_not_called() + + def test_cache_miss_calls_xcb_and_caches_result(self, monkeypatch: pytest.MonkeyPatch) -> None: + mock = self._mock_xcb_intern_atom(monkeypatch, 100) + cache_key = addressof(self.conn) + atom = xcb.intern_atom(self.conn, "_NET_WM_NAME") + assert atom == xcb.Atom(100) + mock.assert_called_once() + assert xcb._ATOM_CACHE[cache_key]["_NET_WM_NAME"] == xcb.Atom(100) + + def test_cache_hit_skips_xcb(self, monkeypatch: pytest.MonkeyPatch) -> None: + mock = self._mock_xcb_intern_atom(monkeypatch, 0) + # xcb.connect() in setup_intern_atom guarantees a cache entry for self.conn. + xcb._ATOM_CACHE[addressof(self.conn)]["_NET_WM_NAME"] = xcb.Atom(100) + atom = xcb.intern_atom(self.conn, "_NET_WM_NAME") + assert atom == xcb.Atom(100) + mock.assert_not_called() + + def test_only_if_exists_returns_none_when_missing(self) -> None: + atom = xcb.intern_atom(self.conn, "_MSS_TEST_NONEXISTENT_ATOM_12345", only_if_exists=True) + assert atom is None + + def test_raises_when_missing_and_not_only_if_exists(self, monkeypatch: pytest.MonkeyPatch) -> None: + # Exercises the "shouldn't be possible" code path where the server returns 0 with only_if_exists=False. + self._mock_xcb_intern_atom(monkeypatch, 0) + with pytest.raises(xcb.XError, match="X server failed to intern atom"): + xcb.intern_atom(self.conn, "_NET_NONEXISTENT") + + def test_pointer_connection_uses_correct_cache_key(self) -> None: + atom = xcb.intern_atom(pointer(self.conn), "_NET_WM_NAME") + assert atom is not None + assert addressof(self.conn) in xcb._ATOM_CACHE + + +def test_atom_cache_lifecycle() -> None: + """connect() initializes and disconnect() clears the per-connection atom cache entry.""" + before = set(xcb._ATOM_CACHE) + conn, _ = xcb.connect() + cache_key = addressof(conn) + assert cache_key in xcb._ATOM_CACHE + assert xcb._ATOM_CACHE[cache_key] == {} + xcb.disconnect(conn) + assert cache_key not in xcb._ATOM_CACHE + assert set(xcb._ATOM_CACHE) == before + + def test_xgetimage_visual_validation_accepts_default_setup(visual_validation_env: _VisualValidationHarness) -> None: visual_validation_env.reset() mss_instance = xgetimage.MSS() diff --git a/src/xcbproto/README.md b/src/xcbproto/README.md index 351e0b9..797f8f8 100644 --- a/src/xcbproto/README.md +++ b/src/xcbproto/README.md @@ -19,8 +19,9 @@ The generator is a **maintainer tool**, not part of the normal build process: ``` 3. The generator reads the XML protocol definitions and emits `xcbgen.py`. -4. The maintainer ensures that this worked correctly, and moves the file to `src/mss/linux/xcbgen.py`. -5. The generated `xcbgen.py` is committed to version control and distributed with the package, so end users never need to run the generator. +4. The maintainer ensures that this worked correctly, and runs `ruff check --fix` and `ruff format` on the emitted file. +5. The maintainer moves the file to `src/mss/linux/xcbgen.py`. +6. The generated `xcbgen.py` is committed to version control and distributed with the package, so end users never need to run the generator. ## Protocol XML Files diff --git a/src/xcbproto/gen_xcb_to_py.py b/src/xcbproto/gen_xcb_to_py.py index 1e41acc..648a5e4 100755 --- a/src/xcbproto/gen_xcb_to_py.py +++ b/src/xcbproto/gen_xcb_to_py.py @@ -37,6 +37,7 @@ from contextlib import contextmanager from dataclasses import dataclass, field from pathlib import Path +from textwrap import dedent, indent from typing import TYPE_CHECKING from lxml import etree as ET # noqa: N812 (traditional name) @@ -58,7 +59,8 @@ "GetGeometry", "GetImage", "GetProperty", - # We handle InternAtom specially. + # We handle InternAtom specially in xcb.py: it's the only request we use that includes a list (the name) in + # the request. Rather than writing the code for autogeneration, we just open-code that one. "NoOperation", ], "randr": [ @@ -66,6 +68,10 @@ "GetScreenResources", "GetScreenResourcesCurrent", "GetCrtcInfo", + "GetOutputInfo", + "GetOutputPrimary", + "GetOutputProperty", + "GetMonitors", ], "render": [ "QueryVersion", @@ -364,11 +370,7 @@ def _parse_child(self, child: ET._Element) -> None: ) ) case "fd": - self.members.append( - FdField( - name=child.attrib["name"], - ) - ) + self.members.append(FdField(name=child.attrib["name"])) case "pad": self.members.append(parse_pad(child)) case "list": @@ -377,10 +379,7 @@ def _parse_child(self, child: ET._Element) -> None: return case _: msg = f"Unsupported member {child.tag} in {self.protocol}:{self.name}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) def _parse(self) -> None: with parsing_note(f"while parsing {self.protocol}:{self.name}", self._element): @@ -442,10 +441,7 @@ def parse_enum(protocol: str, elem: ET.Element) -> EnumDefn: items.append(EnumerationItem(item.attrib["name"], 1 << int(bit.text, 0))) else: msg = f"Unsupported enum item in {protocol}:{name}:{item}" - raise GenerationError( - msg, - element=item, - ) + raise GenerationError(msg, element=item) return EnumDefn(protocol, name, items) @@ -465,10 +461,7 @@ def parse_xidunion(protocol: str, elem: ET.Element) -> XidUnionDefn: continue else: msg = f"Unsupported xidunion member {child.tag} in {protocol}:{name}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) if not members: msg = f"xidunion {protocol}:{name} must include at least one type" raise GenerationError(msg, element=elem) @@ -508,52 +501,34 @@ def parse_protocol(path: Path) -> ProtocolModule: # noqa: PLR0912, PLR0915 case "enum": if child.attrib["name"] in module.enums: msg = f"Duplicate enum {child.attrib['name']} in protocol {protocol}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) module.enums[child.attrib["name"]] = parse_enum(protocol, child) case "typedef": if child.attrib["newname"] in module.types: msg = f"Duplicate type {child.attrib['newname']} in protocol {protocol}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) module.types[child.attrib["newname"]] = TypedefDefn( protocol, child.attrib["newname"], child.attrib["oldname"] ) case "xidtype": if child.attrib["name"] in module.types: msg = f"Duplicate type {child.attrib['name']} in protocol {protocol}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) module.types[child.attrib["name"]] = XidTypeDefn(protocol, child.attrib["name"]) case "xidunion": if child.attrib["name"] in module.types: msg = f"Duplicate type {child.attrib['name']} in protocol {protocol}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) module.types[child.attrib["name"]] = parse_xidunion(protocol, child) case "struct": if child.attrib["name"] in module.types: msg = f"Duplicate type {child.attrib['name']} in protocol {protocol}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) module.types[child.attrib["name"]] = StructDefn(protocol, child.attrib["name"], child) case "request": if child.attrib["name"] in module.requests: msg = f"Duplicate request {child.attrib['name']} in protocol {protocol}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) module.requests[child.attrib["name"]] = RequestDefn(protocol, child.attrib["name"], child) case "import": # There's actually some leeway in how the imports are resolved. We only require the imported @@ -734,7 +709,7 @@ def __init__(self, fh: io.TextIOBase) -> None: def write(self, line: str = "") -> None: if line: - self._fh.write(" " * self._indent + line + "\n") + self._fh.write(indent(line, " " * self._indent) + "\n") else: self._fh.write("\n") @@ -930,7 +905,17 @@ def emit_typedef(writer: CodeWriter, registry: ProtocolRegistry, entry: TypedefD writer.write() writer.write(f"class {class_name}({base}):") with writer.indent(): - writer.write("pass") + writer.write( + dedent(f""" + def __eq__(self, other: object) -> bool: + if isinstance(other, {class_name}): + return self.value == other.value + return NotImplemented + + def __hash__(self) -> int: + return hash(self.value) + """) + ) # Struct-like types @@ -964,9 +949,7 @@ def emit_structlike( elif isinstance(member, Field): if seen_list: msg = f"Structure {entry.protocol}:{entry.name} has fields after lists, which is unsupported" - raise GenerationError( - msg, - ) + raise GenerationError(msg) name = format_field_name(member) type_expr = python_type_for(registry, entry.protocol, member.type) field_entries.append((name, type_expr)) @@ -975,9 +958,7 @@ def emit_structlike( continue if member.align is not None or member.bytes is None: msg = f"Struct {entry.protocol}:{entry.name} uses align-based padding, which is unsupported" - raise GenerationError( - msg, - ) + raise GenerationError(msg) name = f"pad{pad_index}" pad_index += 1 field_entries.append((name, f"c_uint8 * {member.bytes}")) @@ -1091,9 +1072,7 @@ def emit_types( rv += emit_reply(writer, registry, typ) else: msg = f"Unsupported type kind {type(typ).__name__} for {typ.protocol}:{typ.name}" - raise GenerationError( - msg, - ) + raise GenerationError(msg) return rv