#!/usr/bin/python3 # -*- coding: utf-8 -*- # # module Impulse # # Copyright (C) 2019 AleaJactaEst # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import logging from tools import getPowerOf2 from tools import BitStream # from tools import Enum LOGGER='Impulse' class ImpulseNoElement(Exception): pass class ImpulseBase: def __init__(self): self.name = "" self.id = "" self.param = {} #self.Reference = [] def set_name(self, name): self.name = name.replace(':', '_') def get_name(self): return self.name def add_reference(self, ref): #self.Reference.append(ref) self.param.setdefault('Reference', []) self.param['Reference'].append(ref) def get_reference(self): return self.param['Reference'] def get_parameter(self): return self.param def add_parameter(self, key, value): self.param.setdefault(key, []) self.param[key].append(value) def add_value(self, key, value): self.param.setdefault(key, "") self.param[key] = value def read(self, name, msgin, world): logging.getLogger(LOGGER).error("Not define") class ImpulseBotchatSetFilters(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('qualityMin', msgin.readUint32('qualityMin')) self.param.setdefault('qualityMax', msgin.readUint32('qualityMax')) self.param.setdefault('priceMin', msgin.readUint32('priceMin')) self.param.setdefault('priceMax', msgin.readUint32('priceMax')) self.param.setdefault('classMin', msgin.readUint8('classMin')) self.param.setdefault('classMax', msgin.readUint8('classMax')) self.param.setdefault('itemPart', msgin.readUint8('itemPart')) self.param.setdefault('itemType', msgin.readUint8('itemType')) class ImpulseConnectionAskName(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('Name', msgin.readUString('Name')) self.param.setdefault('HomeSessionId', msgin.readUint32('HomeSessionId')) class ImpulseConnectionCreateChar(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('Slot', msgin.readUint8('Slot')) self.param.setdefault('SheetId', msgin.readUint32('SheetId')) self.param.setdefault('CSessionId', msgin.readUint32('CSessionId')) self.param.setdefault('name', msgin.readUString('name')) self.param.setdefault('People', msgin.readUint8('People')) self.param.setdefault('Sex', msgin.readUint8('Sex')) self.param.setdefault('NbPointFighter', msgin.readUint8('NbPointFighter')) self.param.setdefault('NbPointCaster', msgin.readUint8('NbPointCaster')) self.param.setdefault('NbPointCrafter', msgin.readUint8('NbPointCrafter')) self.param.setdefault('NbPointHarvester', msgin.readUint8('NbPointHarvester')) self.param.setdefault('StartPoint', msgin.readSint32('StartPoint')) self.param.setdefault('HairType', msgin.readSint8('HairType')) self.param.setdefault('HairColor', msgin.readSint8('HairColor')) self.param.setdefault('GabaritHeight', msgin.readSint8('GabaritHeight')) self.param.setdefault('GabaritTorsoWidth', msgin.readSint8('GabaritTorsoWidth')) self.param.setdefault('GabaritArmsWidth', msgin.readSint8('GabaritArmsWidth')) self.param.setdefault('GabaritLegsWidth', msgin.readSint8('GabaritLegsWidth')) self.param.setdefault('GabaritBreastSize', msgin.readSint8('GabaritBreastSize')) self.param.setdefault('MorphTarget1', msgin.readSint8('MorphTarget1')) self.param.setdefault('MorphTarget2', msgin.readSint8('MorphTarget2')) self.param.setdefault('MorphTarget3', msgin.readSint8('MorphTarget3')) self.param.setdefault('MorphTarget4', msgin.readSint8('MorphTarget4')) self.param.setdefault('MorphTarget5', msgin.readSint8('MorphTarget5')) self.param.setdefault('MorphTarget6', msgin.readSint8('MorphTarget6')) self.param.setdefault('MorphTarget7', msgin.readSint8('MorphTarget7')) self.param.setdefault('MorphTarget8', msgin.readSint8('MorphTarget8')) self.param.setdefault('EyesColor', msgin.readSint8('EyesColor')) self.param.setdefault('Tattoo', msgin.readSint8('Tattoo')) self.param.setdefault('JacketColor', msgin.readSint8('JacketColor')) self.param.setdefault('TrousersColor', msgin.readSint8('TrousersColor')) self.param.setdefault('HatColor', msgin.readSint8('HatColor')) self.param.setdefault('ArmsColor', msgin.readSint8('ArmsColor')) self.param.setdefault('HandsColor', msgin.readSint8('HandsColor')) self.param.setdefault('FeetColor', msgin.readSint8('FeetColor')) class ImpulseConnectionReady(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('LanguageCode', msgin.readString('LanguageCode')) class ImpulseConnectionSelectChar(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('SelectCharMsg', msgin.readUint8('SelectCharMsg')) class ImpulseConnectionUserChar(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('X', msgin.readSint32('X')) self.param.setdefault('Y', msgin.readSint32('Y')) self.param.setdefault('Z', msgin.readSint32('Z')) self.param.setdefault('Heading', msgin.readFloat('Heading')) self.param.setdefault('season', msgin.readSerial(3, 'season')) self.param.setdefault('userRole', msgin.readSerial(3, 'userRole')) self.param.setdefault('highestMainlandSessionId', msgin.readUint32('highestMainlandSessionId')) self.param.setdefault('firstConnectedTime', msgin.readUint32('firstConnectedTime')) self.param.setdefault('playedTime', msgin.readUint32('playedTime')) class ImpulseConnectionShardId(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('shardId', msgin.readUint32('shardId')) self.param.setdefault('webHost', msgin.readString('webHost')) class ImpulseConnectionValidName(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('valide', msgin.readUint8('valide')) class ImpulseDebugPing(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('localTime', msgin.readUint32('localTime')) class ImpulseGuildFemaleTitles(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('UseFemaleTitles', msgin.readSerial(1, 'UseFemaleTitles')) class ImpulseNpsIconSetDesc(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): ''' khanat-opennel-code/code/ryzom/server/src/entities_game_service/player_manager/character.cpp:20976 void CCharacter::sendNpcMissionGiverIconDesc( const std::vector& npcKeys ) ''' logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('nb8', msgin.readUint8('nb8')) nb8 = self.param['nb8'] for i in range(0, nb8): self.param.setdefault('%d:npcAlias' % nb8, msgin.readUint32('npcAlias')) self.param.setdefault('%d:state' % nb8, msgin.readUint32('state')) class ImpulsePhraseDownload(ImpulseBase): def __init__(self): super().__init__() def readSerialPhrase(self, msgin, id): self.param.setdefault(id, msgin.readUtf8String(id)) size = msgin.readSint32(id + ':len') self.param.setdefault(id + ':len', size) for i in range(0, size): self.param.setdefault('%s:compBricks:%d' % (id, i), msgin.readUint16('%s:compBricks:%d' % (id, i))) def readSerialPhrases(self, msgin, id): """ khanat-opennel-code/code/ryzom/server/src/entities_game_service/player_manager/character.cpp:13586 void CCharacter::sendPhrasesToClient() khanat-opennel-code/code/nel/include/nel/misc/stream.h:1089 void serialVector(T &cont) khanat-opennel-code/code/ryzom/common/src/game_share/sphrase_com.cpp:57 void CSPhraseCom::serial(NLMISC::IStream &impulse) """ size = msgin.readSint32(id + ':len') self.param.setdefault(id + ':len', size) for i in range(0, size): #self.param.setdefault('%d:Phrase' % i, msgin.readUtf8String('%d:Phrase:Name' % i)) #self.param.setdefault('%d:Phrase' % i, msgin.readString('%d:Phrase' % i)) self.readSerialPhrase(msgin, '%d:Phrase' % i) self.param.setdefault('%d:KnownSlot' % i, msgin.readUint16('%d:KnownSlot' % i)) self.param.setdefault('%d:PhraseSheetId:id' % i, msgin.readUint32('%d:PhraseSheetId:id' % i)) self.param.setdefault('%d:PhraseSheetId:Type' % i, msgin.readUint32('%d:PhraseSheetId:IdInfos:Type' % i)) self.param.setdefault('%d:PhraseSheetId:Type' % i, msgin.readUint32('%d:PhraseSheetId:IdInfos:Id' % i)) def readSerialMemorizedPhrases(self, msgin, id): size = msgin.readSint32(id + ':len') self.param.setdefault(id + ':len', size) for i in range(0, size): self.param.setdefault('%d:MemoryLineId' % i, msgin.readUint8('MemoryLineId')) self.param.setdefault('%d:MemorySlotId' % i, msgin.readUint8('MemorySlotId')) self.param.setdefault('%d:PhraseId' % i, msgin.readUint16('PhraseId')) def read(self, name, msgin, world): """ khanat-opennel-code/code/ryzom/server/src/entities_game_service/player_manager/character.cpp:13586 void CCharacter::sendPhrasesToClient() """ logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') return #self.param.setdefault(id + 'phrases', msgin.readString('phrases')) #self.param.setdefault(id + 'memorizedPhrases', msgin.readString('memorizedPhrases')) self.readSerialPhrases(msgin, 'knownPhrases') self.readSerialMemorizedPhrases(msgin, 'memorizedPhrases') logging.getLogger(LOGGER).error("[Client -> Server] msg:%s" % msgin.showAllData()) raise ValueError class ImpulsePosition(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('X', msgin.readSint32('X')) self.param.setdefault('Y', msgin.readSint32('Y')) self.param.setdefault('Z', msgin.readSint32('Z')) self.param.setdefault('Heading', msgin.readFloat('Heading')) class ImpulseSringDynString(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('phraseId', msgin.readUint32('phraseId')) class ImpulseSringManagerReloadCache(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('timestamp', msgin.readUint32('timestamp')) class ImpulseSringManagerPhraseSend(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('dynId', msgin.readUint32('dynId')) self.param.setdefault('StringId', msgin.readUint32('StringId')) try: id = 0 while True: self.param.setdefault('StringId:%d' % id, msgin.readUint32('StringId')) id += 1 except: pass class ImpulseSringManagerStringResp(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('stringId', msgin.readUint32('stringId')) self.param.setdefault('strUtf8', msgin.readUtf8String('strUtf8')) class ImpulseSringManagerStringRq(ImpulseBase): def __init__(self): super().__init__() def read(self, name, msgin, world): logging.getLogger(LOGGER).debug("read") self.name = name.replace(':', '_') self.param.setdefault('stringId', msgin.readUint32('stringId')) class DecodeImpulseSimple: def __init__(self): ''' khanat-opennel-code/code/ryzom/client/src/net_manager.cpp # void initializeNetwork() ''' self.msgXml = None self.databaseXml = None self.GenericMsgHeaderMngr = {} self.initializeNetwork() def initializeNetwork(self): # Send by client self.GenericMsgHeaderMngr.setdefault( "BOTCHAT:SET_FILTERS", ImpulseBotchatSetFilters ) self.GenericMsgHeaderMngr.setdefault( "CONNECTION:ASK_NAME", ImpulseConnectionAskName ) self.GenericMsgHeaderMngr.setdefault( "CONNECTION:CREATE_CHAR", ImpulseConnectionCreateChar ) self.GenericMsgHeaderMngr.setdefault( "CONNECTION:READY", ImpulseConnectionReady ) self.GenericMsgHeaderMngr.setdefault( "CONNECTION:SELECT_CHAR", ImpulseConnectionSelectChar ) self.GenericMsgHeaderMngr.setdefault( "CONNECTION:SHARD_ID", ImpulseConnectionShardId ) self.GenericMsgHeaderMngr.setdefault( "CONNECTION:USER_CHAR", ImpulseConnectionUserChar ) self.GenericMsgHeaderMngr.setdefault( "CONNECTION:VALID_NAME", ImpulseConnectionValidName ) self.GenericMsgHeaderMngr.setdefault( "DEBUG:PING", ImpulseDebugPing ) self.GenericMsgHeaderMngr.setdefault( "GUILD:USE_FEMALE_TITLES", ImpulseGuildFemaleTitles ) self.GenericMsgHeaderMngr.setdefault( "NPC_ICON:SET_DESC", ImpulseNpsIconSetDesc ) self.GenericMsgHeaderMngr.setdefault( "PHRASE:DOWNLOAD", ImpulsePhraseDownload ) self.GenericMsgHeaderMngr.setdefault( "POSITION", ImpulsePosition ) self.GenericMsgHeaderMngr.setdefault( "STRING:DYN_STRING", ImpulseSringDynString ) self.GenericMsgHeaderMngr.setdefault( "STRING_MANAGER:RELOAD_CACHE", ImpulseSringManagerReloadCache ) self.GenericMsgHeaderMngr.setdefault( "STRING_MANAGER:PHRASE_SEND", ImpulseSringManagerPhraseSend ) self.GenericMsgHeaderMngr.setdefault( "STRING_MANAGER:STRING_RESP", ImpulseSringManagerStringResp ) self.GenericMsgHeaderMngr.setdefault( "STRING_MANAGER:STRING_RQ", ImpulseSringManagerStringRq ) def execute(self, msgin, world, references = [], name=""): ''' khanat-opennel-code/code/ryzom/client/src/net_manager.cpp:3746 void impulseCallBack(NLMISC::CBitMemStream &impulse, sint32 packet, void *arg) khanat-opennel-code/code/ryzom/common/src/game_share/generic_xml_msg_mngr.cpp:121 void CGenericXmlMsgHeaderManager::execute(CBitMemStream &strm) khanat-opennel-code/code/ryzom/common/src/game_share/generic_xml_msg_mngr.h:431 CNode *select(NLMISC::CBitMemStream &strm) uint32 index = 0; q uint NbBits; strm.serialAndLog2(index, node->NbBits); ''' logging.getLogger(LOGGER).debug("execute") head = self.msgXml listpath = [] while True: nbBit = getPowerOf2.getPowerOf2(len(head)) #logging.getLogger(LOGGER).error("nbBit : %d " % nbBit) #for child in head: # print(child.tag, child.attrib) try: id = msgin.readSerial(nbBit, name='MsgXML', typeName='Number', emulate=True) except BitStream.OverflowError: raise ImpulseNoElement if id is None: raise ImpulseNoElement try: ele = head[id] except IndexError as e: logging.getLogger(LOGGER).error("Error to decode message: %s" % msgin.showAllData() ) raise e command = ele.attrib['name'] listpath.append(command) fullname = ':'.join(listpath) id = msgin.readSerial(nbBit, name='MsgXML', typeName='XML <' + command + '>') if fullname in self.GenericMsgHeaderMngr: logging.getLogger(LOGGER).debug("Found : %s" % fullname) impulse = self.GenericMsgHeaderMngr[fullname]() impulse.read(fullname, msgin, world) logging.getLogger(LOGGER).debug("MessageXML decoded: %s" % msgin.showAllData() ) impulse.add_value("command", fullname) for reference in references: impulse.add_reference(reference) impulse.add_value("Message", msgin.extractAllData()) return impulse else: #logging.getLogger(LOGGER).debug("Non trouve") for ele in head: if ele.attrib['name'] == command: head = ele break if head != ele: logging.getLogger(LOGGER).error("Impossible to found %s" % fullname ) logging.getLogger(LOGGER).debug("MessageXML decoded: %s" % msgin.showAllData() ) return None # End While logging.getLogger(LOGGER).debug("MessageXML decoded: %s" % msgin.showAllData() ) return None def loadMsg(self, msgXml): logging.getLogger(LOGGER).debug("msgXml") self.msgXml = msgXml def loadDatabase(self, databaseXml): self.databaseXml = databaseXml