clientbot/tools/CAction.py

595 lines
23 KiB
Python

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# module CActionFactory
#
# Copyright (C) 2019 AleaJactaEst
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#from tools import Enum
import logging
import struct
import math
from tools import TPropIndex
from tools import TPVPMode
from tools import Enum
from tools import PropVisual
LOGGER='CActionFactory'
INVALID_SLOT = 0xff
# khanat-opennel-code/code/ryzom/common/src/game_share/action_sint64.cpp:55 void CActionSint64::registerNumericPropertiesRyzom()
PROPERTY_TO_NB_BIT = [0 for _ in range(0, Enum.TPropIndex.NB_VISUAL_PROPERTIES)]
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_ORIENTATION] = 32
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_SHEET] = 52
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_BEHAVIOUR] = 48
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_NAME_STRING_ID] = 32
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_TARGET_ID] = 8
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_MODE] = 44
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_VPA] = 64
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_VPB] = 47
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_VPC] = 58
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_ENTITY_MOUNTED_ID] = 8
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_RIDER_ENTITY_ID] = 8
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_CONTEXTUAL] = 16
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_BARS] = 32
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_TARGET_LIST] = 32 # USER_DEFINED_PROPERTY_NB_BITS
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_VISUAL_FX] = 11
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_GUILD_SYMBOL] = 60
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_GUILD_NAME_ID] = 32
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_EVENT_FACTION_ID] = 32
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_PVP_MODE] = Enum.TPVPMode.NbBits
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_PVP_CLAN] = 32
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_OWNER_PEOPLE] = 3 # 4 races + unknow
PROPERTY_TO_NB_BIT[Enum.TPropIndex.PROPERTY_OUTPOST_INFOS] = 16 # 15+1
class CAction:
def __init__(self, slot, code, world):
self.Code = code
self.PropertyCode = code
self.Slot = slot
self._Priority = 1
self.Timeout = 0
self.GameCycle = 0
self.world = world
self.Reference = []
self.Name = ""
def set_name(self, name):
self.Name = name
def get_name(self):
return self.Name
def add_reference(self, ref):
self.Reference.append(ref)
def get_reference(self):
return self.Reference
def get_parameter(self):
return {"Type": "CAction", "Code": self.Code, "Slot": self.Slot, "PropertyCode": self.PropertyCode, "GameCycle": self.GameCycle, "Reference": self.Reference}
def unpack(self, message):
raise RuntimeError
def pack(self, msgout):
if self.Code < 4:
modeShort = True
msgout.pushBool(modeShort)
code = self.Code
msgout.internalSerial(self.Code, 2)
else:
modeShort = False
msgout.pushBool(modeShort)
code = self.Code
msgout.pushUint8(code)
def serialIn(self, msgin):
raise RuntimeError
def serialOut(self, msgout):
raise RuntimeError
def size(self):
headerBitSize = 0
if self.Code < 4:
headerBitSize = 1 + 2
else:
headerBitSize = 1 + (1 * 8)
return headerBitSize
def getMaxSizeInBit(self):
logging.getLogger(LOGGER).debug("New Object but missing method reset")
raise RuntimeError
def setPriority(self, prio):
logging.getLogger(LOGGER).debug("New Object but missing method reset")
raise RuntimeError
def priority(self):
logging.getLogger(LOGGER).debug("New Object but missing method reset")
raise RuntimeError
def getValue(self):
logging.getLogger(LOGGER).debug("New Object but missing method reset")
raise RuntimeError
def setValue(self, value):
logging.getLogger(LOGGER).debug("New Object but missing method reset")
raise RuntimeError
def isContinuous(self):
logging.getLogger(LOGGER).debug("New Object but missing method reset")
raise RuntimeError
def reset(self):
logging.getLogger(LOGGER).debug("New Object but missing method reset")
raise RuntimeError
def __str__(self):
return "[%d,%d]" % (self.Code , self.Slot)
class CActionPosition(CAction):
"""
Method use to decode position
khanat/khanat-opennel-code/code/ryzom/client/src/property_decoder.cpp:39 - void CPropertyDecoder::receive(TPacketNumber /* packetNumber */, CAction *action)
"""
def __init__(self, slot, code, world):
super().__init__(slot, code, world)
self.Position = [0, 0, 0]
self.Position16 = [0, 0, 0]
self.IsRelative = False
self.Interior = False
def __str__(self):
return "CActionPosition" + super().__str__() + " x:" + str(self.Position16[0]) + " y:" + str(self.Position16[1]) + " z:" + str(self.Position16[2]) + " IsRelative:" + str(self.IsRelative) + " Interior:" + str(self.Interior)
def get_parameter(self):
ret = super().get_parameter()
ret["Type"] = "CActionPosition"
ret.setdefault("Position", ', '.join([str(x) for x in self.Position]))
ret.setdefault("Position16", ', '.join([str(x) for x in self.Position16]))
ret.setdefault("IsRelative", str(self.IsRelative))
ret.setdefault("IsRelative", str(self.IsRelative))
return ret
def unpack(self, message):
self.Position16[0] = message.readUint16('px')
self.Position16[1] = message.readUint16('py')
self.Position16[2] = message.readUint16('pz')
self.IsRelative = (self.Position16[2] & 0x1) != 0
self.Interior = (self.Position16[2] & 0x2) != 0
# message.serialAndLog1( Position16[0] );
# message.serialAndLog1( Position16[1] );
# message.serialAndLog1( Position16[2] );
# IsRelative = (Position16[2] & (uint16)0x1)!=0;
# Interior = (Position16[2] & (uint16)0x2)!=0;
def reset(self):
pass
class CActionSync(CAction):
def __init__(self, slot, code, world):
super().__init__(slot, code, world)
def get_parameter(self):
ret = super().get_parameter()
ret["Type"] = "CActionSync"
return ret
def __str__(self):
return "CActionSync" + super().__str__()
class CActionDisconnection(CAction):
def __init__(self, slot, code, world):
super().__init__(slot, code, world)
def get_parameter(self):
ret = super().get_parameter()
ret["Type"] = "CActionDisconnection"
return ret
def __str__(self):
return "CActionDisconnection" + super().__str__()
class CActionAssociation(CAction):
def __init__(self, slot, code, world):
super().__init__(slot, code, world)
def get_parameter(self):
ret = super().get_parameter()
ret["Type"] = "CActionAssociation"
return ret
def __str__(self):
return "CActionAssociation" + super().__str__()
class CActionDummy(CAction):
def __init__(self, slot, code, world):
super().__init__(slot, code, world)
def get_parameter(self):
ret = super().get_parameter()
ret["Type"] = "CActionDummy"
return ret
def __str__(self):
return "CActionDummy" + super().__str__()
class CActionLogin(CAction):
def __init__(self, slot, code, world):
super().__init__(slot, code, world)
def get_parameter(self):
ret = super().get_parameter()
ret["Type"] = "CActionLogin"
return ret
def __str__(self):
return "CActionLogin" + super().__str__()
class CActionTargetSlot(CAction):
def __init__(self, slot, code, world):
super().__init__(slot, code, world)
def get_parameter(self):
ret = super().get_parameter()
ret["Type"] = "CActionTargetSlot"
return ret
def __str__(self):
return "CActionTargetSlot" + super().__str__()
class CActionGeneric(CAction):
def __init__(self, slot, code, world):
super().__init__(slot, code, world)
self._Message = None
self.decoded = False
def get_parameter(self):
ret = super().get_parameter()
ret["Type"] = "CActionGeneric"
if not self._Message.checkOnlyZeroAtEnd():
ret["state"] = "message partially decoded"
else:
ret["state"] = 'message decoded'
ret["Message"] = self._Message.extractAllData()
return ret
def set(self, message):
self._Message = message
def unpack(self, message):
size = message.readUint32('size')
self._Message = message.readBitStreamUint8(size, 'message')
def pack(self, msgout):
super().pack(msgout)
sizeMessage = len(self._Message)
msgout.pushUint32(sizeMessage)
msgout.pushBuffer(self._Message)
def reset(self):
self._Message = None
def genericAction(self, decodeImpulse, world, cGenericMultiPartTemp):
decodeImpulse.execute(self._Message, world)
self.decoded = True
def decodeImpulseSimple(self, decodeImpulseSimple, world, cGenericMultiPartTemp, Reference = None, Name = ""):
ret = decodeImpulseSimple.execute(self._Message, world, Reference, Name)
self.decoded = True
return ret
def __str__(self):
if self.decoded:
return "CActionGeneric" + super().__str__() + ' => ' + self._Message.showAllData()
else:
return "CActionGeneric" + super().__str__() + "[read:" + self._Message.showAllData() + '/write:' + self._Message.showAllDataWrite() + ']'
def size(self):
size = super().size()
if self._Message:
return 4 + len(self._Message) * 8 + size
else:
return 4 + size
class CActionGenericMultiPart(CAction):
'''
khanat-opennel-code/code/ryzom/common/src/game_share/action_generic_multi_part.h # class CActionGenericMultiPart
'''
def __init__(self, slot, code, world):
super().__init__(slot, code, world)
self.PartCont = []
self.Number = 0
self.Part = 0
self.NbBlock = 0
def get_parameter(self):
ret = super().get_parameter()
ret["Name"] = self.Name
ret["Type"] = "CActionGenericMultiPart"
ret["Number"] = "%d" % self.Number
ret["Part"] = "%d" % self.Part
ret["NbBlock"] = "%d" % self.NbBlock
ret["PartCont"] = self.PartCont.showAllData()
return ret
def set(self, number, part, buffer, bytelen, size, nbBlock):
'''
khanat-opennel-code/code/ryzom/common/src/game_share/action_generic_multi_part.h # void set (uint8 number, uint16 part, const uint8 *buffer, uint32 bytelen, uint32 size, uint16 nbBlock)
'''
logging.getLogger(LOGGER).debug("number:%d part:%d bytelen:%d size:%d nbBlock:%d" %(number, part, bytelen, size, nbBlock))
start = part*size
end = start + size
logging.getLogger(LOGGER).debug("start:%d end:%d bytelen:%d" % (start, end, bytelen))
if end > bytelen:
end = bytelen
logging.getLogger(LOGGER).debug("start:%d end:%d" % (start, end))
self.PartCont = []
# memcpy( &PartCont[0], buffer + start, end - start );
for i in range(start, end):
logging.getLogger(LOGGER).debug("Append : %d/%d" % (i, len(buffer)))
self.PartCont.append(buffer[i])
self.Number = number
self.Part = part
self.NbBlock = nbBlock
def unpack(self, message):
self.Number = message.readUint8('Number')
self.Part = message.readUint16('Part')
self.NbBlock = message.readUint16('NbBlock')
size = message.readUint32('size')
self.PartCont = message.readBitStreamUint8(size, 'PartCont')
logging.getLogger(LOGGER).debug("unpack - Number:%d Part:%d NbBlock:%d" % (self.Number, self.Part, self.NbBlock))
def pack(self, msgout):
super().pack(msgout)
msgout.pushUint8(self.Number)
msgout.pushUint16(self.Part)
msgout.pushUint16(self.NbBlock)
raise "Missing size param"
msgout.pushArrayUint8(self.PartCont)
def reset(self):
self.PartCont = []
self.Number = 0
self.Part = 0
self.NbBlock = 0
def genericAction(self, decodeImpulse, world, cGenericMultiPartTemp, Reference = None, Name = None):
'''
khanat-opennel-code/code/ryzom/client/src/network_connection.cpp # void CNetworkConnection::genericAction (CActionGenericMultiPart *agmp)
'''
logging.getLogger(LOGGER).debug("Number:%d Part:%d NbBlock:%d" % (self.Number, self.Part, self.NbBlock))
cGenericMultiPartTemp.addGenericMultiPartTemp(self.Number)
cGenericMultiPartTemp.setGenericMultiPartTemp(self.Number, self.Part, self.NbBlock, self.PartCont, decodeImpulse, world, Reference, Name)
def decodeImpulseSimple(self, decodeImpulseSimple, world, cGenericMultiPartTemp, Reference = None, Name = ""):
logging.getLogger(LOGGER).debug("Number:%d Part:%d NbBlock:%d" % (self.Number, self.Part, self.NbBlock))
cGenericMultiPartTemp.addGenericMultiPartTemp(self.Number)
ret = cGenericMultiPartTemp.setGenericMultiPartTemp(self.Number, self.Part, self.NbBlock, self.PartCont, decodeImpulseSimple, world, Reference, Name)
#ret = decodeImpulseSimple.execute(self._Message, world)
self.decoded = True
return ret
def __str__(self):
return "CActionGenericMultiPart" + super().__str__() + "[" + str(self.Number) + ',' + str(self.Part) + ',' + str(self.NbBlock) + ',read:' + self.PartCont.showAllData() + ',write:' + self.PartCont.showAllDataWrite() + ']'
def size(self):
size = super().size()
bytesize = 1 + 2 + 2 + 4 # header
# self.PartCont
for ele in self.PartCont:
bytesize += len(ele)
return bytesize * 8 + size
class CActionSint64(CAction):
'''
khanat-opennel-code/code/ryzom/common/src/game_share/action_sint64.cpp
A lire :
/home/jerome/Projets/khanat/khanat-opennel-code/code/ryzom/client/src/network_connection.cpp:1311 receiveNormalMessage(CBitMemStream &msgin)
/home/jerome/Projets/khanat/khanat-opennel-code/code/ryzom/client/src/network_connection.cpp:1472 receiveNormalMessage(CBitMemStream &msgin)
'''
def __init__(self, slot, code, world):
super().__init__(slot, code, world)
self.value = 0
self.NbBits = 64
self.NameProperty = 'None'
self.PropertyIndex = None
self.PropertyToNbBit = {
TPropIndex.TPropIndex.PROPERTY_ORIENTATION: 32,
TPropIndex.TPropIndex.PROPERTY_SHEET: 52,
TPropIndex.TPropIndex.PROPERTY_BEHAVIOUR: 48,
TPropIndex.TPropIndex.PROPERTY_NAME_STRING_ID: 32,
TPropIndex.TPropIndex.PROPERTY_TARGET_ID: 8,
TPropIndex.TPropIndex.PROPERTY_MODE: 44,
TPropIndex.TPropIndex.PROPERTY_VPA: 64,
TPropIndex.TPropIndex.PROPERTY_VPB: 47,
TPropIndex.TPropIndex.PROPERTY_VPC: 58,
TPropIndex.TPropIndex.PROPERTY_ENTITY_MOUNTED_ID: 8 , # slot
TPropIndex.TPropIndex.PROPERTY_RIDER_ENTITY_ID: 8 , # slot
TPropIndex.TPropIndex.PROPERTY_CONTEXTUAL: 16 ,
TPropIndex.TPropIndex.PROPERTY_BARS: 32 , # please do not lower it (or tell Olivier: used for forage sources)
TPropIndex.TPropIndex.PROPERTY_TARGET_LIST: TPropIndex.USER_DEFINED_PROPERTY_NB_BITS ,
TPropIndex.TPropIndex.PROPERTY_VISUAL_FX: 11 , # please do not lower it (or tell Olivier: used for forage sources)
TPropIndex.TPropIndex.PROPERTY_GUILD_SYMBOL: 60 ,
TPropIndex.TPropIndex.PROPERTY_GUILD_NAME_ID: 32 ,
TPropIndex.TPropIndex.PROPERTY_EVENT_FACTION_ID: 32 ,
TPropIndex.TPropIndex.PROPERTY_PVP_MODE: TPVPMode.NbBits ,
TPropIndex.TPropIndex.PROPERTY_PVP_CLAN: 32 ,
TPropIndex.TPropIndex.PROPERTY_OWNER_PEOPLE: 3 , # 4 races and unknow
TPropIndex.TPropIndex.PROPERTY_OUTPOST_INFOS: 16 } # 15+1
def get_parameter(self):
ret = super().get_parameter()
ret["Type"] = "CActionSint64"
return ret
def __str__(self):
return "CActionSint64" + super().__str__()
def unpack(self, msgin):
logging.getLogger(LOGGER).debug("nb bit:{0}".format(self.NbBits))
logging.getLogger(LOGGER).debug("msgin:%s" % msgin.showAllData())
# self.value = msgin.readSerial( self.NbBits, 'value')
self.value = msgin.readSerial64( self.NbBits, self.NameProperty)
logging.getLogger(LOGGER).debug("msgin:%s" % msgin.showAllData())
#self.NbBits = msgin.readUint32('NbBits')
logging.getLogger(LOGGER).debug("value:%u" % self.value)
logging.getLogger(LOGGER).debug("msgin:%s" % msgin.showAllData())
def pack(self, msgout):
super().pack(msgout)
#msgout.pushUint64(self.value)
msgout.pushSerial64(self.value, self.NbBits)
#msgout.pushUint32(self.NbBits)
def reset(self):
self.value = 0
self.NbBits = 0
self.NameProperty = 'None'
def setNbBits(self, propIndex, nameproperty):
self.NbBits = PROPERTY_TO_NB_BIT[propIndex]
self.NameProperty = nameproperty
self.PropertyIndex = propIndex
self.set_name(nameproperty)
logging.getLogger(LOGGER).debug("NameProperty:{1} NbBits:{0}".format(self.NbBits, self.NameProperty ))
def get_property(self):
ret = ""
if self.PropertyIndex == TPropIndex.TPropIndex.PROPERTY_ORIENTATION:
v1 = struct.pack('I', self.value)
angle = struct.unpack('<f',v1)[0] * 180 / math.pi
ret = "{0} [{1}]".format(angle, self.value)
else:
ret = "{0}".format(self.value)
return ret
def print_yaml(self, outyaml, space):
if self.PropertyIndex == TPropIndex.TPropIndex.PROPERTY_ORIENTATION:
v1 = struct.pack('I', self.value)
angle_radius = struct.unpack('<f',v1)[0]
angle_degree = angle_radius * 180 / math.pi
outyaml.write("{0} {1}:\n".format(space, self.get_name() ))
outyaml.write("{0} raw: {1}\n".format(space, self.value ))
outyaml.write("{0} radius: {1}\n".format(space, angle_radius ))
outyaml.write("{0} degree: {1}\n".format(space, angle_degree ))
elif self.PropertyIndex == TPropIndex.TPropIndex.PROPERTY_VPA:
outyaml.write("{0} {1}:\n".format(space, self.get_name()))
outyaml.write("{0} raw: {1}\n".format(space, self.value))
vba = PropVisual.PropVisualA()
vba.set_compress_data(self.value)
vba.print_yaml(outyaml, space+' ')
elif self.PropertyIndex == TPropIndex.TPropIndex.PROPERTY_VPB:
outyaml.write("{0} {1}:\n".format(space, self.get_name()))
outyaml.write("{0} raw: {1}\n".format(space, self.value))
vbb= PropVisual.PropVisualB()
vbb.set_compress_data(self.value)
vbb.print_yaml(outyaml, space+' ')
elif self.PropertyIndex == TPropIndex.TPropIndex.PROPERTY_VPC:
outyaml.write("{0} {1}:\n".format(space, self.get_name()))
outyaml.write("{0} raw: {1}\n".format(space, self.value))
vbc= PropVisual.PropVisualC()
vbc.set_compress_data(self.value)
vbc.print_yaml(outyaml, space+' ')
else:
outyaml.write("{0} {1}: {2}\n".format(space, self.get_name(), self.get_property()))
class CActionBlock:
def __init__(self):
self.Cycle = 0
self.FirstPacket = 0
self.Actions = []
self.Success = True
def serial(self, msgout, actionFactory):
msgout.pushUint32(self.Cycle)
msgout.pushUint8(len(self.Actions))
for action in self.Actions:
# msgPosBefore = msgout.getPosInBit()
actionFactory.pack(action, msgout)
# msgPosAfter = msgout.getPosInBit()
# actionSize = actionFactory.size(action)
def writeSerial(self, msgout):
msgout.pushUint32(self.Cycle)
numberActions = len(self.Actions)
msgout.pushUint8(numberActions)
for action in self.Actions:
action.pack(msgout)
def readSerial(self, msgin):
actions = []
_cycle = msgin.readUint32("Cycle")
_numberActions = msgin.readUint8("numberActions")
logging.getLogger(LOGGER).debug("_cycle:%d _numberActions:%d" % (_cycle, _numberActions))
for i in range(0,_numberActions):
actions.append( msgin.unpack(msgin) )
return actions
def insert(self, actions, begin, end):
for i in range(0, end):
if i>= begin:
self.Actions.append(actions[i])
def eraseToEnd(self, begin):
while len(self.Actions) >= begin:
self.Actions.pop()
def push_back(self, action):
self.Actions.append(action)
def __str__(self):
return "CActionBlock [Cycle:" + str(self.Cycle) + ', FirstPacket:' + str(self.FirstPacket) + ', Data:' + ', '.join([ str(x) for x in self.Actions]) + "]"
class CActionFake():
def __init__(self, Type, msgin, addon = None, Reference=None, Name=""):
self.Type = Type
self.Code = Enum.TActionCode.ACTION_NONE
self._Message = msgin
self.addon = addon
self.Reference = [ Reference ]
self.Name = Name
def get_name(self):
return self.Name
def get_parameter(self):
ret = {"Type": "CActionFake.%s" % self.Type}
if not self._Message.checkOnlyZeroAtEnd():
ret["state"] = "message partially decoded"
else:
ret["state"] = 'message decoded'
if self.addon:
for key in self.addon:
ret[key] = str(self.addon[key])
ret["Reference"] = self.Reference
ret["Message"] = self._Message.extractAllData()
return ret
def decodeImpulseSimple(self, decodeImpulseSimple, world, cGenericMultiPartTemp, Reference = None, Name = ""):
ret = decodeImpulseSimple.execute(self._Message, world, Reference, Name)
self.decoded = True
return ret