clientbot/tools/BitStream.py
2020-11-27 23:32:51 +01:00

1230 lines
45 KiB
Python

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# module BitStream
#
# Copyright (C) 2019 AleaJactaEst
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from ctypes import *
import sys
import inspect
import copy
import struct
from tools import CBitSet
LOGGER='BitStream'
class OverflowError(Exception):
pass
class BitStream():
def __init__(self):
self._pos = 0
self._read = 0
self._tampon = []
self._groupRead = []
self._groupWrite = []
self._CheckStreamOverflow = True
def __len__(self):
return (self._pos + 7) // 8
def reset_read(self):
self._read = 0
self._groupRead = []
def __deepcopy__(self, memo):
ret = BitStream()
ret._pos = self._pos
ret._read = self._read
ret._tampon = copy.deepcopy(self._tampon, memo)
ret._groupRead = copy.deepcopy(self._groupRead, memo)
ret._groupWrite = copy.deepcopy(self._groupWrite, memo)
return ret
def enable_LogErrorOnStreamOverflow(self):
self._CheckStreamOverflow = True
def disable_LogErrorOnStreamOverflow(self):
self._CheckStreamOverflow = False
def needRead(self):
return self._pos - self._read
def sizeData(self):
return self._pos
def sizeRead(self):
return self._read
def getRead(self):
return self._read
def getPos(self):
return self._pos
def buffer(self):
return self._tampon
def putRead(self, value):
if value > self._pos:
raise ValueError
self._read = value
# ------------------------------------
def internalSerial(self, value, nbits, decode=True, typeName=''):
p1 = self._pos
if nbits == 0:
return
elif nbits > 32:
raise "Out of range"
pos = self._pos % 8
if pos == 0:
self._tampon.append(0)
value = c_uint32(value).value
if nbits != 32:
mask = (1 << nbits) - 1;
v = value & mask;
else:
v = value;
_FreeBits = 8 - (self._pos % 8)
if nbits > _FreeBits:
self._tampon[-1] |= (v >> ( nbits - _FreeBits))
self._pos += _FreeBits
self.internalSerial( v , nbits - _FreeBits, decode=False)
else:
self._tampon[-1] |= (v << ( _FreeBits - nbits))
self._pos += nbits
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p1+nbits, name, typeName, value))
def pushSerial64(self, value, nbits, decode=True, typeName=None):
if nbits > 32:
p1 = self._pos
msd = value >> 32
self.internalSerial(msd, nbits - 32, False, typeName)
self.internalSerial(value, 32, False, typeName)
value = msd << 32 | msd2
if typeName is None:
typeName = 'Uint{0}'.format(nbits)
self._groupWrite.append((p1, p1+nbits, name, typeName, value))
return value
else:
return self.internalSerial(value, nbits, decode, typeName)
def pushBool(self, value, decode=True):
p1 = self._pos
if value:
v = 1
else:
v = 0
self.internalSerial(v, 1, decode=False)
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p1+1, name, 'Bool', value))
def pushUint32(self, value, decode=True, typeName='Uint32'):
p1 = self._pos
self.internalSerial(value, 32, decode=False)
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p1+32, name, typeName, value))
def pushSint32(self, value, decode=True):
p1 = self._pos
self.internalSerial(value, 32, decode=False)
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p1+32, name, 'Sint32', value))
def pushUint16(self, value, decode=True):
p1 = self._pos
self.internalSerial(value, 16, decode=False)
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p1+16, name, 'Uint16', value))
def pushSint16(self, value, decode=True):
p1 = self._pos
self.internalSerial(value, 16, decode=False)
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p1+16, name, 'Sint16', value))
def pushUint8(self, value, decode=True):
p1 = self._pos
self.internalSerial(value, 8, decode=False)
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p1+8, name, 'Uint8', value))
def pushSint8(self, value, decode=True):
p1 = self._pos
self.internalSerial(value, 8, decode=False)
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p1+8, name, 'Sint8', value))
def pushUint64(self, value, decode=True):
p1 = self._pos
if sys.byteorder == "little":
self.internalSerial(value>>32, 32, decode=False)
self.internalSerial(value, 32, decode=False)
else:
self.internalSerial(value, 32, decode=False)
self.internalSerial(value>>32, 32, decode=False)
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p1+64, name, 'Uint64', value))
def pushSint64(self, value, decode=True):
p1 = self._pos
if sys.byteorder == "little":
self.internalSerial(value>>32, 32, decode=False)
self.internalSerial(value, 32, decode=False)
else:
self.internalSerial(value, 32, decode=False)
self.internalSerial(value>>32, 32, decode=False)
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p1+64, name, 'Sint64', value))
def pushFloat(self, value, decode=True):
p1 = self._pos
v = c_float(value).value
v1 = struct.pack('f', v)
v2 = struct.unpack('<i',v1)[0]
self.internalSerial(v2, 32, decode=False)
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p1+32, name, 'Float', value))
def pushDouble(self, value, decode=True):
p1 = self._pos
v = c_double(value).value
v1 = struct.pack('d', v)
v2 = struct.unpack('<Q',v1)[0]
#self.internalSerial(v2, 32)
self.internalSerial(v2, 32, decode=False)
self.internalSerial(v2 >> 32, 32, decode=False)
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p1+64, name, 'Float', value))
def pushChar(self, value, decode=True):
p1 = self._pos
v = ord(value)
self.internalSerial(v, 8, decode=False)
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p1+8, name, 'Char', value))
def pushString(self, value, decode=True):
lenValue = len(value)
self.pushUint32(lenValue, decode=True, typeName='String:len')
p1 = self._pos
for _char in value:
self.pushChar(_char, decode=False)
p2 = self._pos
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p2, name, 'String', value))
def pushUString(self, value, decode=True):
lenValue = len(value)
self.pushUint32(lenValue, decode=True, typeName='UString:len')
p1 = self._pos
for x in value:
_char16bit = ord(x)
self.internalSerial(_char16bit, 16, decode=False)
p2 = self._pos
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p2, name, 'UString', value))
def pushArrayUint8(self, value, decode=True):
' ex.: pushArrayChar([0,1,3,4]) '
p1 = self._pos
for i in value:
self.pushUint8(i, decode=False)
p2 = self._pos
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p2, name, 'ArrayUint8', value))
def pushBuffer(self, source, decode=True):
'''
Push BitStream with all byte (extend to byte if necessary)
'''
p1 = self._pos
srcRead = source.getRead()
source.putRead(0)
for ele in source._tampon:
self.pushUint8(ele, decode=False)
source.putRead(srcRead)
p2 = self._pos
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p2, name, 'Buffer', ''))
def pushBitStream(self, source, decode=True):
p1 = self._pos
srcRead = source.getRead()
source.putRead(0)
need = 8 - (self._pos % 8)
if need != 8:
self.internalSerial(source.readSerial(need, decode=False), need, decode=False)
while source.needRead() >= 8:
self.pushUint8(source.readSerial(8, decode=False), decode=False)
need = source.needRead()
if need > 0:
self.internalSerial(source.readSerial(need, decode=False), need, decode=False)
source.putRead(srcRead)
p2 = self._pos
if decode:
frame = inspect.currentframe()
frame = inspect.getouterframes(frame)[1]
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
args = string[string.find('(') + 1:-1].split(',')
name = '?'
for i in args:
if i.find('=') != -1:
if i.split('=')[0].strip() == "value":
name = i.split('=')[1].strip()
break
else:
name = i
break
name = name.strip().split(' ')[0].strip()
self._groupWrite.append((p1, p2, name, 'BitStream', ''))
# ------------------------------------
def readSerial(self, nbits, name="", decode=True, typeName='', emulate=False, signed=False, commentValue=None):
v1 = self._read
if nbits == 0:
return
elif nbits > 32:
logging.getLogger(LOGGER).error("Error: Stream Overflow - nbits:%d/%d name:'%s' decode:'%s' typeName:'%s' emulate:%s msg:%s" % (nbits, self._pos-self._read, name, str(decode), typeName, str(emulate), self.showAllData()))
raise OverflowError
if self._read + nbits > self._pos:
if self._CheckStreamOverflow:
logging.getLogger(LOGGER).error("Error: Stream Overflow - nbits:%d/%d name:'%s' decode:'%s' typeName:'%s' emulate:%s msg:%s" % (nbits, self._pos-self._read, name, str(decode), typeName, str(emulate), self.showAllData()))
raise OverflowError
if emulate:
oldRead = self._read
value = 0
pos = self._read // 8
_FreeBits = 8 - (self._read % 8)
v = self._tampon[pos] & ((1 << _FreeBits) - 1)
if nbits > _FreeBits:
value |= (v << (nbits-_FreeBits))
self._read += _FreeBits
value |= self.readSerial(nbits - _FreeBits, decode=False)
else:
value |= (v >> (_FreeBits-nbits))
self._read += nbits
if emulate:
self._read = oldRead
if decode and not emulate:
if signed:
value = c_int32(value).value
self._groupRead.append((v1, v1+nbits, name, typeName, value, commentValue))
return value
def readSerialUint64(self, nbits, name="", decode=True, typeName=None, commentValue=None):
v1 = self._read
if nbits > 32:
msd = self.readSerial(nbits - 32, name, False, typeName)
msd2 = self.readSerial(32, name, False, typeName)
value = msd << 32 | msd2
if decode:
if typeName is None:
typeName = 'Uint{0}'.format(nbits)
self._groupRead.append((v1, v1+nbits, name, typeName, value, commentValue))
return value
else:
if decode:
if typeName is None:
typeName = 'Uint{0}'.format(nbits)
value = self.readSerial(nbits, name, False, None, commentValue=None)
self._groupRead.append((v1, v1+nbits, name, typeName, value, commentValue))
return value
def readSerialSint64(self, nbits, name="", decode=True, typeName=None, commentValue=None):
v1 = self._read
if nbits > 32:
msd = self.readSerial(nbits - 32, name, False, typeName)
msd2 = self.readSerial(32, name, False, typeName)
value = msd << 32 | msd2
if decode:
if typeName is None:
typeName = 'Sint{0}'.format(nbits)
self._groupRead.append((v1, v1+nbits, name, typeName, value, commentValue))
else:
if typeName is None:
typeName = 'Sint{0}'.format(nbits)
value = self.readSerial(nbits, name, False, None, commentValue=None)
self._groupRead.append((v1, v1+nbits, name, typeName, value, commentValue))
#value = self.readSerial(nbits, name, decode, typeName, commentValue=commentValue)
return c_int64(value).value
def readCBitSet(self, nbits, name="", decode=True, typeName=None, commentValue=None):
# khanat-opennel-code/code/nel/src/misc/bit_mem_stream.cpp void CBitMemStream::readBits( NLMISC::CBitSet& bitfield )
cBitSet = CBitSet.CBitSet()
v1 = self._read
cBitSet.resize(nbits)
pos = 0
last = nbits
logging.getLogger(LOGGER).debug("nbits:%d" % nbits)
while last >= cBitSet.NL_BITLEN:
v = self.readSerial(cBitSet.NL_BITLEN, decode=False)
logging.getLogger(LOGGER).debug("pos:%d v:%d reel:%d" % (pos, v, (nbits//32)-pos-1))
cBitSet.setUint(v, (nbits//32)-pos-1)
last -= cBitSet.NL_BITLEN
pos += 1
if last > 0:
v = self.readSerial(last, decode=False)
logging.getLogger(LOGGER).debug("pos:%d v:%d reel:%d" % (pos, v, (nbits//32)-pos-1))
cBitSet.setUint(v, (nbits//32)-pos-1)
# pos = 0
# while pos < nbits:
# v = self.readSerial(1, decode=False)
# cBitSet.set(pos, v)
# pos += 1
if decode:
valuereal = cBitSet.showBitString()
_size = len(valuereal)
# _start = _size - nbits
#self._groupRead.append((v1, v1+nbits, name, typeName, valuereal[0:nbits], commentValue))
self._groupRead.append((v1, v1+nbits, name, typeName, valuereal[-nbits:], commentValue))
return cBitSet
def readCBitSetOld2(self, nbits, name="", decode=True, typeName=None, commentValue=None):
# khanat-opennel-code/code/nel/src/misc/bit_mem_stream.cpp void CBitMemStream::readBits( NLMISC::CBitSet& bitfield )
cBitSet = CBitSet.CBitSet()
v1 = self._read
cBitSet.resize(nbits)
pos = 0
logging.getLogger(LOGGER).debug("nbits:%d" % nbits)
pos = 0
while pos < nbits:
v = self.readSerial(1, decode=False)
logging.getLogger(LOGGER).debug("pos:%d value:%d" % (nbits-pos-1, v))
cBitSet.set(nbits-pos-1, v)
pos += 1
if decode:
self._groupRead.append((v1, v1+nbits, name, typeName, '', commentValue))
logging.getLogger(LOGGER).debug("cBitSet:%s" % cBitSet.showBitString())
return cBitSet
def readCBitSetOld3(self, nbits, name="", decode=True, typeName=None, commentValue=None):
# khanat-opennel-code/code/nel/src/misc/bit_mem_stream.cpp void CBitMemStream::readBits( NLMISC::CBitSet& bitfield )
cBitSet = CBitSet.CBitSet()
v1 = self._read
cBitSet.resize(nbits)
pos = 0
logging.getLogger(LOGGER).debug("nbits:%d" % nbits)
pos = 0
while pos < nbits:
v = self.readSerial(1, decode=False)
logging.getLogger(LOGGER).debug("pos:%d value:%d" % (nbits-pos-1, v))
cBitSet.set(pos, v)
pos += 1
if decode:
self._groupRead.append((v1, v1+nbits, name, typeName, '', commentValue))
logging.getLogger(LOGGER).debug("cBitSet:%s" % cBitSet.showBitString())
return cBitSet
def readBool(self, name):
v1 = self._read
v = self.readSerial(1, name=name, decode=False, typeName='Bool')
self._groupRead.append((v1, v1+1, name, 'Bool', 'True' if v != 0 else 'False', None))
if v != 0:
return True
else:
return False
def readUint32(self, name, decode=True, typeName='Uint32'):
v = self.readSerial(32, name=name, decode=decode, typeName=typeName)
return v
def readSint32(self, name, decode=True):
v = self.readSerial(32, name=name, decode=decode, typeName='Sint32', signed=True)
return c_int32(v).value
def readUint16(self, name, decode=True):
v = self.readSerial(16, name=name, decode=decode, typeName='Uint16')
return v
def readSint16(self, name):
v = self.readSerial(16, name=name, typeName='Sint16', signed=True)
return c_int16(v).value
def readUint8(self, name, decode=True, typeName='Uint8'):
v = self.readSerial(8, decode=decode, name=name, typeName=typeName)
return v
def readSint8(self, name):
v = self.readSerial(8, name=name, typeName='Sint8', signed=True)
return c_int8(v).value
def readUint64(self, name):
# khanat-opennel-code/code/nel/include/nel/misc/stream.h #define NLMISC_BSWAP64(src) (src) = (((src)>>56)&0xFF) | ((((src)>>48)&0xFF)<<8) | ((((src)>>40)&0xFF)<<16) | ((((src)>>32)&0xFF)<<24) | ((((src)>>24)&0xFF)<<32) | ((((src)>>16)&0xFF)<<40) | ((((src)>>8)&0xFF)<<48) | (((src)&0xFF)<<56)
p1 = self._read
if sys.byteorder == "little":
v4 = self.readSerial(8, decode=False)
v3 = self.readSerial(8, decode=False)
v2 = self.readSerial(8, decode=False)
v1 = self.readSerial(8, decode=False)
v8 = self.readSerial(8, decode=False)
v7 = self.readSerial(8, decode=False)
v6 = self.readSerial(8, decode=False)
v5 = self.readSerial(8, decode=False)
else:
v1 = self.readSerial(8, decode=False)
v2 = self.readSerial(8, decode=False)
v3 = self.readSerial(8, decode=False)
v4 = self.readSerial(8, decode=False)
v5 = self.readSerial(8, decode=False)
v6 = self.readSerial(8, decode=False)
v7 = self.readSerial(8, decode=False)
v8 = self.readSerial(8, decode=False)
ret = v8 << 56 | v7 << 48 | v6 << 40 | v5 << 32 | v4 << 24 | v3 << 16 | v2 << 8 | v1
self._groupRead.append((p1, p1+64, name, 'Uint64', ret, None))
return v3
def readSint64(self, name):
# khanat-opennel-code/code/nel/include/nel/misc/stream.h #define NLMISC_BSWAP64(src) (src) = (((src)>>56)&0xFF) | ((((src)>>48)&0xFF)<<8) | ((((src)>>40)&0xFF)<<16) | ((((src)>>32)&0xFF)<<24) | ((((src)>>24)&0xFF)<<32) | ((((src)>>16)&0xFF)<<40) | ((((src)>>8)&0xFF)<<48) | (((src)&0xFF)<<56)
p1 = self._read
if sys.byteorder == "little":
v4 = self.readSerial(8, decode=False)
v3 = self.readSerial(8, decode=False)
v2 = self.readSerial(8, decode=False)
v1 = self.readSerial(8, decode=False)
v8 = self.readSerial(8, decode=False)
v7 = self.readSerial(8, decode=False)
v6 = self.readSerial(8, decode=False)
v5 = self.readSerial(8, decode=False)
else:
v1 = self.readSerial(8, decode=False)
v2 = self.readSerial(8, decode=False)
v3 = self.readSerial(8, decode=False)
v4 = self.readSerial(8, decode=False)
v5 = self.readSerial(8, decode=False)
v6 = self.readSerial(8, decode=False)
v7 = self.readSerial(8, decode=False)
v8 = self.readSerial(8, decode=False)
ret = v8 << 56 | v7 << 48 | v6 << 40 | v5 << 32 | v4 << 24 | v3 << 16 | v2 << 8 | v1
self._groupRead.append((p1, p1+64, name, 'Sint64', c_int64(ret).value, None))
return c_int64(v3).value
def readFloat(self, name):
p1 = self._read
v = self.readSerial(32, name=name, decode=False, typeName='Float')
v1 = struct.pack('I', v)
v2 = struct.unpack('<f',v1)[0]
self._groupRead.append((p1, p1+32, name, 'Float', v2, None))
return v2
def readDouble(self, name):
p1 = self._read
v = self.readSerial(32, decode=False)
v1 = struct.pack('I', v)
w = self.readSerial(32, decode=False)
w1 = struct.pack('I', w)
x = v1 + w1
x1 = struct.unpack('<d', x)[0]
self._groupRead.append((p1, p1+64, name, 'Double', x1, None))
return x1
def readChar(self, name, decode=True):
v = self.readUint8(name=name, decode=decode, typeName='Char')
return chr(v)
def readNbChar(self, size, name, decode=True):
tmp = ''
_size = size
v1 = self._read
while _size > 0:
x = self.readChar('', decode=False)
tmp += x
_size -= 1
v2 = self._read
if v2 > self._pos:
raise ValueError
if v1 < v2:
self._groupRead.append((v1, v2, name + ':string', 'String', tmp, None))
return tmp
def readString(self, name):
tmp = ''
_size = self.readUint32(name + ':size', decode=True)
v1 = self._read
while _size > 0:
x = self.readChar('', decode=False)
tmp += x
_size -= 1
v2 = self._read
if v2 > self._pos:
raise ValueError
if v1 < v2:
self._groupRead.append((v1, v2, name + ':string', 'String', tmp, None))
return tmp
def readUtf8String(self, name):
tmp = b''
_size = self.readUint32(name + ':size', decode=True)
v1 = self._read
while _size > 0:
x = self.readUint8(name='', decode=False)
tmp += bytes(chr(x), encoding='latin1')
_size -= 1
v2 = self._read
if v2 > self._pos:
raise ValueError
if v1 < v2:
self._groupRead.append((v1, v2, name + ':ustring', 'UString', tmp, None))
return tmp.decode(encoding='utf-8')
def readUString(self, name):
tmp = ''
_size = self.readUint32(name + ':size', decode=True)
v1 = self._read
while _size > 0:
x = self.readUint16(name='', decode=False)
tmp += chr(x)
_size -= 1
v2 = self._read
if v2 > self._pos:
raise ValueError
if v1 < v2:
self._groupRead.append((v1, v2, name + ':ustring', 'UString', tmp, None))
return tmp
def readArrayUint8(self, size, name):
ret = []
v1 = self._read
for i in range(0, size):
ret.append(self.readUint8('', decode=False))
v2 = self._read
self._groupRead.append((v1, v2, name, 'ArrayUint8', '', None))
return ret
def readBitStreamUint8(self, size, name):
ret = BitStream()
v1 = self._read
for i in range(0, size):
ret.pushUint8(self.readUint8('', decode=False), decode=False)
v2 = self._read
self._groupRead.append((v1, v2, name, 'StreamUint8', '', None))
return ret
def readCont(self, name):
ret = BitStream()
size = self.readSint32(name = name + ':len', decode=True)
v1 = self._read
for i in range(0, size):
ret.pushBool(self.readSerial(1,name = '', decode=False))
v2 = self._read
self._groupRead.append((v1, v2, name + ':data', 'readCont', '', None))
return size, ret
def getNotRead(self):
ret = BitStream()
readBefore = self._read
while self._read < self._pos:
if self._pos - self._read >= 8:
nsize = 8
else:
nsize = self._pos - self._read
data = self.readSerial(nsize, decode=False)
ret.internalSerial(data, nsize, decode=False)
self._read = readBefore
return ret
# ------------------------------------
def __str__(self):
return ''.join([ chr(x) for x in self._tampon])
def message(self):
# return str(self._pos) + ':' + '.'.join([ format(x, "02x") for x in self._tampon])
return str(self._pos) + ':' + '.'.join([ "{0:08b}".format(x) for x in self._tampon])
def toBytes(self):
return bytes( self._tampon )
def fromBytes(self, data):
self._read = 0
self._tampon = [int(x) for x in data]
self._pos = len(self._tampon) * 8
def showLastData(self):
ret = ""
readBefore = self._read
while self._read < self._pos:
if self._pos - self._read >= 8:
nsize = 8
else:
nsize = self._pos - self._read
data = self.readSerial(nsize, decode=False)
if nsize == 1:
ret += "{0:01b}".format(data)
elif nsize == 2:
ret += "{0:02b}".format(data)
elif nsize == 3:
ret += "{0:03b}".format(data)
elif nsize == 4:
ret += "{0:04b}".format(data)
elif nsize == 5:
ret += "{0:05b}".format(data)
elif nsize == 6:
ret += "{0:06b}".format(data)
elif nsize == 7:
ret += "{0:07b}".format(data)
else:
ret += "{0:08b}".format(data)
if ret != "":
ret += "."
ret += "{0:08b}".format(data)
self._read = readBefore
return ret
def showAllData(self):
ret = ""
readBefore = self._read
self._read = 0
while self._read < self._pos:
if self._pos - self._read >= 8:
nsize = 8
else:
nsize = self._pos - self._read
data = self.readSerial(nsize, decode=False)
if nsize == 1:
ret += "{0:01b}".format(data)
elif nsize == 2:
ret += "{0:02b}".format(data)
elif nsize == 3:
ret += "{0:03b}".format(data)
elif nsize == 4:
ret += "{0:04b}".format(data)
elif nsize == 5:
ret += "{0:05b}".format(data)
elif nsize == 6:
ret += "{0:06b}".format(data)
elif nsize == 7:
ret += "{0:07b}".format(data)
else:
ret += "{0:08b}".format(data)
self._read = readBefore
ret2 = ""
last = 0
for x, y, name, typeName, value, commentValue in self._groupRead:
if typeName == None:
typeName = '-'
if commentValue == None:
strcommentValue = ""
else:
strcommentValue = " " + commentValue
if value:
strvalue = ' => ' + str(value)
else:
strvalue = ''
ret2 += "[<" + str(x) + ':' + str(y-1) + "> " + str(name) + ' (' + typeName + ') : ' + ret[x:y] + strvalue + strcommentValue + "]"
last = y
if last < self._pos:
ret2 += "{" + ret[last:] + "}"
return ret2
def showAllDataWrite(self):
ret = ""
readBefore = self._read
self._read = 0
while self._read < self._pos:
if self._pos - self._read >= 8:
nsize = 8
else:
nsize = self._pos - self._read
data = self.readSerial(nsize, decode=False)
if nsize == 1:
ret += "{0:01b}".format(data)
elif nsize == 2:
ret += "{0:02b}".format(data)
elif nsize == 3:
ret += "{0:03b}".format(data)
elif nsize == 4:
ret += "{0:04b}".format(data)
elif nsize == 5:
ret += "{0:05b}".format(data)
elif nsize == 6:
ret += "{0:06b}".format(data)
elif nsize == 7:
ret += "{0:07b}".format(data)
else:
ret += "{0:08b}".format(data)
self._read = readBefore
ret2 = ""
last = 0
for x, y, name, typeName, value in self._groupWrite:
if typeName == None:
typeName = '-'
ret2 += "[<" + str(x) + ':' + str(y-1) + "> " + str(name) + ' (' + typeName + ') : ' + ret[x:y] + ' => ' + str(value) + "]"
last = y
if last < self._pos:
ret2 += "{" + ret[last:] + "}"
return ret2
def showAllDataRaw(self):
ret = ""
readBefore = self._read
self._read = 0
while self._read < self._pos:
if self._pos - self._read >= 8:
nsize = 8
else:
nsize = self._pos - self._read
data = self.readSerial(nsize, decode=False)
if nsize == 1:
ret += "{0:01b}".format(data)
elif nsize == 2:
ret += "{0:02b}".format(data)
elif nsize == 3:
ret += "{0:03b}".format(data)
elif nsize == 4:
ret += "{0:04b}".format(data)
elif nsize == 5:
ret += "{0:05b}".format(data)
elif nsize == 6:
ret += "{0:06b}".format(data)
elif nsize == 7:
ret += "{0:07b}".format(data)
else:
ret += "{0:08b}".format(data)
if ret != "":
ret += "."
self._read = readBefore
return ret
def extractAllData(self):
ret = ""
readBefore = self._read
self._read = 0
while self._read < self._pos:
if self._pos - self._read >= 8:
nsize = 8
else:
nsize = self._pos - self._read
data = self.readSerial(nsize, decode=False)
if nsize == 1:
ret += "{0:01b}".format(data)
elif nsize == 2:
ret += "{0:02b}".format(data)
elif nsize == 3:
ret += "{0:03b}".format(data)
elif nsize == 4:
ret += "{0:04b}".format(data)
elif nsize == 5:
ret += "{0:05b}".format(data)
elif nsize == 6:
ret += "{0:06b}".format(data)
elif nsize == 7:
ret += "{0:07b}".format(data)
else:
ret += "{0:08b}".format(data)
self._read = readBefore
ret2 = []
last = 0
for x, y, name, typeName, value, commentValue in self._groupRead:
if commentValue == None:
commentValue = ""
plus = ""
else:
plus = " "
if not typeName:
nbbit = y - x
typeName = "%d bit" % nbbit
if nbbit > 1:
typeName += "s"
ret2.append("<" + str(x) + ':' + str(y-1) + "> " + '(' + typeName + ') ' + str(name) + ' => ' + str(value) + " : " + ret[x:y] + plus + commentValue)
last = y
if last < self._pos:
ret2.append("<" + str(last) + ':' + str(self._pos - 1) + "> " + ret[last:])
return ret2
def checkOnlyZeroAtEnd(self):
readBefore = self._read
while self._read < self._pos:
if self._pos - self._read >= 8:
nsize = 8
else:
nsize = self._pos - self._read
data = self.readSerial(nsize, decode=False)
if data > 0:
return False
self._read = readBefore
return True
def getPosInBit(self):
return self._pos
def TestBitStream():
a = BitStream()
vrai = True
a.pushBool(decode=True, value=vrai)
a.pushBool(False)
a.pushBool(True)
a.pushBool(True)
a.pushUint32(1234567890)
a.pushSint32(-1234567890)
a.pushUint16(12345)
a.pushSint16(-12345)
a.pushUint8(123)
a.pushSint8(-123)
#-3.4E+38) # 1.2339999675750732)
a.pushFloat(-3.3999999521443642e+38)
a.pushDouble(-1.7E+308)
a.pushUint64(16045690709418696365)
a.pushSint64(-1)
a.pushChar('a')
a.pushString("Test A Faire")
print("-" * 40)
print(a.showAllData())
print('raw:')
print(a.showAllDataRaw())
print("-" * 20)
print("-" * 80)
print(a.readBool('a1'))
print(a.readBool('a2'))
print(a.readBool('a3'))
print(a.readBool('a4'))
print(a.readUint32('a5'))
print(a.readSint32('a6'))
print(a.readUint16('a7'))
print(a.readSint16('a8'))
print(a.readUint8('a9'))
print(a.readSint8('a10'))
print(a.readFloat('a11'))
print(a.readDouble('a12'))
print(a.readUint64('a13'))
print(a.readSint64('a14'))
print(a.readChar('a15'))
print(a.readString('a16'))
print(a.toBytes())
print("-" * 40)
print(a.showAllData())
print("-" * 80)
b = BitStream()
b.fromBytes(a.toBytes())
print(b.readBool('b1'))
print(b.readBool('b2'))
print(b.readBool('b3'))
print(b.readBool('b4'))
print(b.readUint32('b5'))
print(b.readSint32('b6'))
print(b.readUint16('b7'))
print(b.readSint16('b8'))
print(b.readUint8('b9'))
print(b.readSint8('b10'))
print(b.readFloat('b11'))
print(b.readDouble('b12'))
print(b.readUint64('b13'))
print(b.readSint64('b14'))
print(b.readChar('b15'))
print(b.readString('b16'))
print(b.toBytes())
print("-" * 40)
print(b.showAllData())
print("-" * 80)
c = BitStream()
c.pushBool(True)
c.pushBitStream(a)
c.pushBitStream(b)
print(c.readBool('c1'))
print("-" * 80)
print(c.readBool('c2'))
print(c.readBool('c3'))
print(c.readBool('c4'))
print(c.readBool('c5'))
print(c.readUint32('c6'))
print(c.readSint32('c7'))
print(c.readUint16('c8'))
print(c.readSint16('c9'))
print(c.readUint8('c10'))
print(c.readSint8('c11'))
print(c.readFloat('c12'))
print(c.readDouble('c13'))
print(c.readUint64('c14'))
print(c.readSint64('c15'))
print(c.readChar('c16'))
print(c.readString('c17'))
print(c.toBytes())
print("-" * 50)
print(c.showAllData())
print("-" * 50)
print(c.readBool('c18'))
print(c.readBool('c19'))
print(c.readBool('c20'))
print(c.readBool('c21'))
print(c.readUint32('c22'))
print(c.readSint32('c23'))
print(c.readUint16('c24'))
print(c.readSint16('c25'))
print(c.readUint8('c26'))
print(c.readSint8('c27'))
print(c.readFloat('c28'))
print(c.readDouble('c29'))
print(c.readUint64('c30'))
print(c.readSint64('c31'))
print(c.readChar('c32'))
print("-" * 40)
print(c.showAllData())
print(c.readString('c33'))
print("-" * 40)
print(c.showAllData())
print(c.toBytes())
print("-" * 40)
print(c.showAllDataRaw())
print("-" * 20)
print(c.showAllData())
print("-" * 80)