from__future__importannotationsimportrandomimportreimportstructfromtypingimportTYPE_CHECKINGfrommcstatus.motdimportMotdfrommcstatus.protocol.connectionimportConnection,UDPAsyncSocketConnection,UDPSocketConnectionifTYPE_CHECKING:fromtyping_extensionsimportSelfclassServerQuerier:MAGIC_PREFIX=bytearray.fromhex("FEFD")PADDING=bytearray.fromhex("00000000")PACKET_TYPE_CHALLENGE=9PACKET_TYPE_QUERY=0def__init__(self,connection:UDPSocketConnection):self.connection=connectionself.challenge=0@staticmethoddef_generate_session_id()->int:# minecraft only supports lower 4 bitsreturnrandom.randint(0,2**31)&0x0F0F0F0Fdef_create_packet(self)->Connection:packet=Connection()packet.write(self.MAGIC_PREFIX)packet.write(struct.pack("!B",self.PACKET_TYPE_QUERY))packet.write_uint(self._generate_session_id())packet.write_int(self.challenge)packet.write(self.PADDING)returnpacketdef_create_handshake_packet(self)->Connection:packet=Connection()packet.write(self.MAGIC_PREFIX)packet.write(struct.pack("!B",self.PACKET_TYPE_CHALLENGE))packet.write_uint(self._generate_session_id())returnpacketdef_read_packet(self)->Connection:packet=Connection()packet.receive(self.connection.read(self.connection.remaining()))packet.read(1+4)returnpacketdefhandshake(self)->None:self.connection.write(self._create_handshake_packet())packet=self._read_packet()self.challenge=int(packet.read_ascii())defread_query(self)->QueryResponse:request=self._create_packet()self.connection.write(request)response=self._read_packet()returnQueryResponse.from_connection(response)classAsyncServerQuerier(ServerQuerier):def__init__(self,connection:UDPAsyncSocketConnection):# We do this to inform python about self.connection type (it's async)super().__init__(connection)# type: ignore[arg-type]self.connection:UDPAsyncSocketConnectionasyncdef_read_packet(self)->Connection:packet=Connection()packet.receive(awaitself.connection.read(self.connection.remaining()))packet.read(1+4)returnpacketasyncdefhandshake(self)->None:awaitself.connection.write(self._create_handshake_packet())packet=awaitself._read_packet()self.challenge=int(packet.read_ascii())asyncdefread_query(self)->QueryResponse:request=self._create_packet()awaitself.connection.write(request)response=awaitself._read_packet()returnQueryResponse.from_connection(response)
[docs]classQueryResponse:"""Documentation for this class is written by hand, without docstrings. This is because the class is not supposed to be auto-documented. Please see https://mcstatus.readthedocs.io/en/latest/api/basic/#mcstatus.querier.QueryResponse for the actual documentation. """# THIS IS SO UNPYTHONIC# it's staying just because the tests depend on this structure
[docs]classPlayers:online:intmax:intnames:list[str]# TODO: It's a bit weird that we accept str for number parameters, just to convert them in initdef__init__(self,online:str|int,max:str|int,names:list[str]):self.online=int(online)self.max=int(max)self.names=names
motd:Motdmap:strplayers:Playerssoftware:Softwaredef__init__(self,raw:dict[str,str],players:list[str]):try:self.raw=rawself.motd=Motd.parse(raw["hostname"],bedrock=False)self.map=raw["map"]self.players=QueryResponse.Players(raw["numplayers"],raw["maxplayers"],players)self.software=QueryResponse.Software(raw["version"],raw["plugins"])exceptKeyError:raiseValueError("The provided data is not valid")@classmethoddeffrom_connection(cls,response:Connection)->Self:response.read(len("splitnum")+1+1+1)data={}whileTrue:key=response.read_ascii()ifkey=="hostname":# hostname is actually motd in the query protocolmatch=re.search(b"(.*?)\x00(hostip|hostport|game_id|gametype|map|maxplayers|numplayers|plugins|version)",response.received,flags=re.DOTALL,)motd=match.group(1)ifmatchelse""# Since the query protocol does not properly support unicode, the motd is still not resolved# correctly; however, this will avoid other parameter parsing errors.data[key]=response.read(len(motd)).decode("ISO-8859-1")response.read(1)# ignore null byteeliflen(key)==0:response.read(1)breakelse:value=response.read_ascii()data[key]=valueresponse.read(len("player_")+1+1)players=[]whileTrue:player=response.read_ascii()iflen(player)==0:breakplayers.append(player)returncls(data,players)