Source code for mcstatus.responses.forge

"""Decoder for data from Forge, that is included into a response object.

After 1.18.1, Forge started to compress its mod data into a
UTF-16 string that represents binary data containing data like
the forge mod loader network version, a big list of channels
that all the forge mods use, and a list of mods the server has.

Before 1.18.1, the mod data was in `forgeData` attribute inside
a response object. We support this implementation too.

For more information see this file from forge itself:
https://github.com/MinecraftForge/MinecraftForge/blob/54b08d2711a15418130694342a3fe9a5dfe005d2/src/main/java/net/minecraftforge/network/ServerStatusPing.java#L27-L73
"""

from __future__ import annotations

from dataclasses import dataclass
from io import StringIO
from typing import Final, TYPE_CHECKING, final

from mcstatus._protocol.io.base_io import BaseSyncReader, StructFormat
from mcstatus._protocol.io.buffer import Buffer
from mcstatus._utils import or_none

if TYPE_CHECKING:
    from typing_extensions import Self, override

    from mcstatus.responses._raw import RawForgeData, RawForgeDataChannel, RawForgeDataMod
else:
    override = lambda f: f  # noqa: E731

__all__ = [
    "ForgeData",
    "ForgeDataChannel",
    "ForgeDataMod",
]

_VERSION_FLAG_IGNORE_SERVER_ONLY: Final = 0b1
_IGNORE_SERVER_ONLY: Final = "<not required for client>"


[docs] @dataclass(frozen=True) class ForgeDataChannel: """A single Forge data channel.""" name: str """Channel name and ID (for example ``fml:handshake``).""" version: str """Channel version (for example ``1.2.3.4``).""" required: bool """Is this channel required for client to join?""" @classmethod def build(cls, raw: RawForgeDataChannel) -> Self: """Build an object about Forge channel from raw response. :param raw: ``channel`` element in raw forge response :class:`dict`. :return: :class:`ForgeDataChannel` object. """ return cls(name=raw["res"], version=raw["version"], required=raw["required"]) @classmethod def decode(cls, buffer: Buffer, mod_id: str | None = None) -> Self: """Decode an object about Forge channel from decoded optimized buffer. :param buffer: :class:`Buffer` object from UTF-16 encoded binary data. :param mod_id: Optional mod id prefix :class:`str`. :return: :class:`ForgeDataChannel` object. """ channel_identifier = buffer.read_utf() if mod_id is not None: channel_identifier = f"{mod_id}:{channel_identifier}" version = buffer.read_utf() client_required = buffer.read_value(StructFormat.BOOL) return cls( name=channel_identifier, version=version, required=client_required, )
[docs] @dataclass(frozen=True) class ForgeDataMod: """A single Forge mod.""" name: str """A mod name.""" marker: str """A mod marker. Usually a version.""" @classmethod def build(cls, raw: RawForgeDataMod) -> Self: """Build an object about Forge mod from raw response. :param raw: ``mod`` element in raw forge response :class:`dict`. :return: :class:`ForgeDataMod` object. """ # In FML v1, modmarker was version instead. mod_version = or_none(raw.get("modmarker"), raw.get("version")) if mod_version is None: raise KeyError(f"Mod version in Forge mod data must be provided. Mod info: {raw}") # In FML v2, modid was modId instead. At least one of the two should exist. mod_id = or_none(raw.get("modid"), raw.get("modId")) if mod_id is None: raise KeyError(f"Mod ID in Forge mod data must be provided. Mod info: {raw}.") return cls(name=mod_id, marker=mod_version) @classmethod def decode(cls, buffer: Buffer) -> tuple[Self, list[ForgeDataChannel]]: """Decode data about a Forge mod from decoded optimized buffer. :param buffer: :class:`Buffer` object from UTF-16 encoded binary data. :return: :class:`tuple` object of :class:`ForgeDataMod` object and :class:`list` of :class:`ForgeDataChannel` objects. """ channel_version_flags = buffer.read_varint() channel_count = channel_version_flags >> 1 is_server = channel_version_flags & _VERSION_FLAG_IGNORE_SERVER_ONLY != 0 mod_id = buffer.read_utf() mod_version = _IGNORE_SERVER_ONLY if not is_server: mod_version = buffer.read_utf() channels = [ForgeDataChannel.decode(buffer, mod_id) for _ in range(channel_count)] return cls(name=mod_id, marker=mod_version), channels
@final class _StringBuffer(BaseSyncReader): """String Buffer for reading utf-16 encoded binary data.""" __slots__ = ("received", "stringio") def __init__(self, stringio: StringIO) -> None: self.stringio = stringio self.received = bytearray() @override def read(self, length: int, /) -> bytes: """Read length bytes from ``self``, and return a byte array.""" data = bytearray() while self.received and len(data) < length: data.append(self.received.pop(0)) while len(data) < length: result = self.stringio.read(1) if not result: raise OSError(f"Not enough data to read! {len(data)} < {length}") data.extend(result.encode("utf-16be")) while len(data) > length: self.received.append(data.pop()) return bytes(data) def remaining(self) -> int: """Return number of reads remaining.""" return len(self.stringio.getvalue()) - self.stringio.tell() + len(self.received) def read_optimized_size(self) -> int: """Read encoded data length.""" return self.read_value(StructFormat.SHORT) | (self.read_value(StructFormat.SHORT) << 15) def read_optimized_buffer(self) -> Buffer: """Read encoded buffer.""" size = self.read_optimized_size() buffer = Buffer() value, bits = 0, 0 while buffer.remaining < size: if bits < 8 and self.remaining(): # Ignoring sign bit value |= (self.read_value(StructFormat.SHORT) & 0x7FFF) << bits bits += 15 buffer.write((value & 0xFF).to_bytes(1, "big")) value >>= 8 bits -= 8 return buffer
[docs] @dataclass(frozen=True) class ForgeData: """Class for storing information about Forge mods.""" fml_network_version: int """Forge Mod Loader network version.""" channels: list[ForgeDataChannel] """List of channels, both for mods and non-mods.""" mods: list[ForgeDataMod] """List of mods.""" truncated: bool """Is the mods list and or channel list incomplete?""" @staticmethod def _decode_optimized(string: str) -> Buffer: """Decode buffer from UTF-16 optimized binary data ``string``.""" with StringIO(string) as text: str_buffer = _StringBuffer(text) return str_buffer.read_optimized_buffer() @classmethod def build(cls, raw: RawForgeData) -> Self: """Build an object about Forge mods from raw response. :param raw: ``forgeData`` attribute in raw response :class:`dict`. :return: :class:`ForgeData` object. """ fml_network_version = raw.get("fmlNetworkVersion", 1) # see: # https://github.com/MinecraftForge/MinecraftForge/blob/7d0330eb08299935714e34ac651a293e2609aa86/src/main/java/net/minecraftforge/network/ServerStatusPing.java#L27-L73 if "d" not in raw: mod_list = raw.get("mods") or raw.get("modList") if mod_list is None: raise KeyError("Neither `mods` or `modList` keys exist.") return cls( fml_network_version=fml_network_version, channels=[ForgeDataChannel.build(channel) for channel in raw.get("channels", ())], mods=[ForgeDataMod.build(mod) for mod in mod_list], truncated=False, ) buffer = cls._decode_optimized(raw["d"]) channels: list[ForgeDataChannel] = [] mods: list[ForgeDataMod] = [] truncated = buffer.read_value(StructFormat.BOOL) mod_count = buffer.read_value(StructFormat.USHORT) try: for _ in range(mod_count): mod, mod_channels = ForgeDataMod.decode(buffer) channels.extend(mod_channels) mods.append(mod) non_mod_channel_count = buffer.read_varint() channels.extend(ForgeDataChannel.decode(buffer) for _ in range(non_mod_channel_count)) except OSError: if not truncated: raise # If answer wasn't truncated, we lost some data on the way return cls( fml_network_version=fml_network_version, channels=channels, mods=mods, truncated=truncated, )