mirror of
https://port.numenaute.org/aleajactaest/clientbot.git
synced 2024-11-08 08:19:03 +00:00
1049 lines
36 KiB
Python
1049 lines
36 KiB
Python
#!/usr/bin/python3
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# module BitStream
|
|
#
|
|
# Copyright (C) 2019 AleaJactaEst
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import logging
|
|
from ctypes import *
|
|
import sys
|
|
import inspect
|
|
import copy
|
|
import struct
|
|
|
|
LOGGER='BitStream'
|
|
|
|
class OverflowError(Exception):
|
|
pass
|
|
|
|
class BitStream():
|
|
def __init__(self):
|
|
self._pos = 0
|
|
self._read = 0
|
|
self._tampon = []
|
|
self._groupRead = []
|
|
self._groupWrite = []
|
|
self._CheckStreamOverflow = True
|
|
|
|
def __len__(self):
|
|
return (self._pos + 7) // 8
|
|
|
|
def __deepcopy__(self, memo):
|
|
ret = BitStream()
|
|
ret._pos = self._pos
|
|
ret._read = self._read
|
|
ret._tampon = copy.deepcopy(self._tampon, memo)
|
|
ret._groupRead = copy.deepcopy(self._groupRead, memo)
|
|
ret._groupWrite = copy.deepcopy(self._groupWrite, memo)
|
|
return ret
|
|
|
|
def enable_LogErrorOnStreamOverflow(self):
|
|
self._CheckStreamOverflow = True
|
|
|
|
def disable_LogErrorOnStreamOverflow(self):
|
|
self._CheckStreamOverflow = False
|
|
|
|
def needRead(self):
|
|
return self._pos - self._read
|
|
|
|
def sizeData(self):
|
|
return self._pos
|
|
|
|
def sizeRead(self):
|
|
return self._read
|
|
|
|
def getRead(self):
|
|
return self._read
|
|
|
|
def getPos(self):
|
|
return self._pos
|
|
|
|
def buffer(self):
|
|
return self._tampon
|
|
|
|
def putRead(self, value):
|
|
if value > self._pos:
|
|
raise ValueError
|
|
self._read = value
|
|
|
|
# ------------------------------------
|
|
def internalSerial(self, value, nbits, decode=True, typeName=''):
|
|
p1 = self._pos
|
|
if nbits == 0:
|
|
return
|
|
elif nbits > 32:
|
|
raise "Out of range"
|
|
pos = self._pos % 8
|
|
if pos == 0:
|
|
self._tampon.append(0)
|
|
value = c_uint32(value).value
|
|
if nbits != 32:
|
|
mask = (1 << nbits) - 1;
|
|
v = value & mask;
|
|
else:
|
|
v = value;
|
|
_FreeBits = 8 - (self._pos % 8)
|
|
if nbits > _FreeBits:
|
|
self._tampon[-1] |= (v >> ( nbits - _FreeBits))
|
|
self._pos += _FreeBits
|
|
self.internalSerial( v , nbits - _FreeBits, decode=False)
|
|
else:
|
|
self._tampon[-1] |= (v << ( _FreeBits - nbits))
|
|
self._pos += nbits
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p1+nbits, name, typeName, value))
|
|
|
|
def pushBool(self, value, decode=True):
|
|
p1 = self._pos
|
|
if value:
|
|
v = 1
|
|
else:
|
|
v = 0
|
|
self.internalSerial(v, 1, decode=False)
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p1+1, name, 'Bool', value))
|
|
|
|
def pushUint32(self, value, decode=True, typeName='Uint32'):
|
|
p1 = self._pos
|
|
self.internalSerial(value, 32, decode=False)
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p1+32, name, typeName, value))
|
|
|
|
def pushSint32(self, value, decode=True):
|
|
p1 = self._pos
|
|
self.internalSerial(value, 32, decode=False)
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p1+32, name, 'Sint32', value))
|
|
|
|
def pushUint16(self, value, decode=True):
|
|
p1 = self._pos
|
|
self.internalSerial(value, 16, decode=False)
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p1+16, name, 'Uint16', value))
|
|
|
|
def pushSint16(self, value, decode=True):
|
|
p1 = self._pos
|
|
self.internalSerial(value, 16, decode=False)
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p1+16, name, 'Sint16', value))
|
|
|
|
def pushUint8(self, value, decode=True):
|
|
p1 = self._pos
|
|
self.internalSerial(value, 8, decode=False)
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p1+8, name, 'Uint8', value))
|
|
|
|
def pushSint8(self, value, decode=True):
|
|
p1 = self._pos
|
|
self.internalSerial(value, 8, decode=False)
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p1+8, name, 'Sint8', value))
|
|
|
|
def pushUint64(self, value, decode=True):
|
|
p1 = self._pos
|
|
if sys.byteorder == "little":
|
|
self.internalSerial(value>>32, 32, decode=False)
|
|
self.internalSerial(value, 32, decode=False)
|
|
else:
|
|
self.internalSerial(value, 32, decode=False)
|
|
self.internalSerial(value>>32, 32, decode=False)
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p1+64, name, 'Uint64', value))
|
|
|
|
def pushSint64(self, value, decode=True):
|
|
p1 = self._pos
|
|
if sys.byteorder == "little":
|
|
self.internalSerial(value>>32, 32, decode=False)
|
|
self.internalSerial(value, 32, decode=False)
|
|
else:
|
|
self.internalSerial(value, 32, decode=False)
|
|
self.internalSerial(value>>32, 32, decode=False)
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p1+64, name, 'Sint64', value))
|
|
|
|
def pushFloat(self, value, decode=True):
|
|
p1 = self._pos
|
|
v = c_float(value).value
|
|
v1 = struct.pack('f', v)
|
|
v2 = struct.unpack('<i',v1)[0]
|
|
self.internalSerial(v2, 32, decode=False)
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p1+32, name, 'Float', value))
|
|
|
|
def pushDouble(self, value, decode=True):
|
|
p1 = self._pos
|
|
v = c_double(value).value
|
|
v1 = struct.pack('d', v)
|
|
v2 = struct.unpack('<Q',v1)[0]
|
|
#self.internalSerial(v2, 32)
|
|
self.internalSerial(v2, 32, decode=False)
|
|
self.internalSerial(v2 >> 32, 32, decode=False)
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p1+64, name, 'Float', value))
|
|
|
|
def pushChar(self, value, decode=True):
|
|
p1 = self._pos
|
|
v = ord(value)
|
|
self.internalSerial(v, 8, decode=False)
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p1+8, name, 'Char', value))
|
|
|
|
def pushString(self, value, decode=True):
|
|
lenValue = len(value)
|
|
self.pushUint32(lenValue, decode=True, typeName='String:len')
|
|
p1 = self._pos
|
|
for _char in value:
|
|
self.pushChar(_char, decode=False)
|
|
p2 = self._pos
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p2, name, 'String', value))
|
|
|
|
def pushUString(self, value, decode=True):
|
|
lenValue = len(value)
|
|
self.pushUint32(lenValue, decode=True, typeName='UString:len')
|
|
p1 = self._pos
|
|
for x in value:
|
|
_char16bit = ord(x)
|
|
self.internalSerial(_char16bit, 16, decode=False)
|
|
p2 = self._pos
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p2, name, 'UString', value))
|
|
|
|
def pushArrayUint8(self, value, decode=True):
|
|
' ex.: pushArrayChar([0,1,3,4]) '
|
|
p1 = self._pos
|
|
for i in value:
|
|
self.pushUint8(i, decode=False)
|
|
p2 = self._pos
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p2, name, 'ArrayUint8', value))
|
|
|
|
def pushBuffer(self, source, decode=True):
|
|
'''
|
|
Push BitStream with all byte (extend to byte if necessary)
|
|
'''
|
|
p1 = self._pos
|
|
srcRead = source.getRead()
|
|
source.putRead(0)
|
|
for ele in source._tampon:
|
|
self.pushUint8(ele, decode=False)
|
|
source.putRead(srcRead)
|
|
p2 = self._pos
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p2, name, 'Buffer', ''))
|
|
|
|
def pushBitStream(self, source, decode=True):
|
|
p1 = self._pos
|
|
srcRead = source.getRead()
|
|
source.putRead(0)
|
|
need = 8 - (self._pos % 8)
|
|
if need != 8:
|
|
self.internalSerial(source.readSerial(need, decode=False), need, decode=False)
|
|
while source.needRead() >= 8:
|
|
self.pushUint8(source.readSerial(8, decode=False), decode=False)
|
|
|
|
need = source.needRead()
|
|
if need > 0:
|
|
self.internalSerial(source.readSerial(need, decode=False), need, decode=False)
|
|
|
|
source.putRead(srcRead)
|
|
p2 = self._pos
|
|
if decode:
|
|
frame = inspect.currentframe()
|
|
frame = inspect.getouterframes(frame)[1]
|
|
string = inspect.getframeinfo(frame[0]).code_context[0].strip()
|
|
args = string[string.find('(') + 1:-1].split(',')
|
|
name = '?'
|
|
for i in args:
|
|
if i.find('=') != -1:
|
|
if i.split('=')[0].strip() == "value":
|
|
name = i.split('=')[1].strip()
|
|
break
|
|
else:
|
|
name = i
|
|
break
|
|
name = name.strip().split(' ')[0].strip()
|
|
self._groupWrite.append((p1, p2, name, 'BitStream', ''))
|
|
|
|
# ------------------------------------
|
|
def readSerial(self, nbits, name="", decode=True, typeName='', emulate=False, signed=False):
|
|
v1 = self._read
|
|
if nbits == 0:
|
|
return
|
|
elif nbits > 32:
|
|
raise "Out of range"
|
|
if self._read + nbits > self._pos:
|
|
if self._CheckStreamOverflow:
|
|
logging.getLogger(LOGGER).error("Error: Stream Overflow - nbits:%d/%d name:'%s' decode:'%s' typeName:'%s' emulate:%s msg:%s" % (nbits, self._pos-self._read, name, str(decode), typeName, str(emulate), self.showAllData()))
|
|
raise OverflowError
|
|
if emulate:
|
|
oldRead = self._read
|
|
value = 0
|
|
pos = self._read // 8
|
|
_FreeBits = 8 - (self._read % 8)
|
|
v = self._tampon[pos] & ((1 << _FreeBits) - 1)
|
|
if nbits > _FreeBits:
|
|
value |= (v << (nbits-_FreeBits))
|
|
self._read += _FreeBits
|
|
value |= self.readSerial(nbits - _FreeBits, decode=False)
|
|
else:
|
|
value |= (v >> (_FreeBits-nbits))
|
|
self._read += nbits
|
|
if emulate:
|
|
self._read = oldRead
|
|
if decode and not emulate:
|
|
if signed:
|
|
value = c_int32(value).value
|
|
self._groupRead.append((v1, v1+nbits, name, typeName, value))
|
|
return value
|
|
|
|
def readBool(self, name):
|
|
v1 = self._read
|
|
v = self.readSerial(1, name=name, decode=False, typeName='Bool')
|
|
self._groupRead.append((v1, v1+1, name, 'Bool', 'True' if v != 0 else 'False'))
|
|
if v != 0:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def readUint32(self, name, decode=True, typeName='Uint32'):
|
|
v = self.readSerial(32, name=name, decode=decode, typeName=typeName)
|
|
return v
|
|
|
|
def readSint32(self, name, decode=True):
|
|
v = self.readSerial(32, name=name, decode=decode, typeName='Sint32', signed=True)
|
|
return c_int32(v).value
|
|
|
|
def readUint16(self, name, decode=True):
|
|
v = self.readSerial(16, name=name, decode=decode, typeName='Uint16')
|
|
return v
|
|
|
|
def readSint16(self, name):
|
|
v = self.readSerial(16, name=name, typeName='Sint16', signed=True)
|
|
return c_int16(v).value
|
|
|
|
def readUint8(self, name, decode=True, typeName='Uint8'):
|
|
v = self.readSerial(8, decode=decode, name=name, typeName=typeName)
|
|
return v
|
|
|
|
def readSint8(self, name):
|
|
v = self.readSerial(8, name=name, typeName='Sint8', signed=True)
|
|
return c_int8(v).value
|
|
|
|
def readUint64(self, name):
|
|
p1 = self._read
|
|
if sys.byteorder == "little":
|
|
v1 = self.readSerial(32, decode=False)
|
|
v2 = self.readSerial(32, decode=False)
|
|
else:
|
|
v2 = self.readSerial(32, decode=False)
|
|
v1 = self.readSerial(32, decode=False)
|
|
v3 = (v1 << 32) | v2
|
|
self._groupRead.append((p1, p1+64, name, 'Uint64', v3))
|
|
return v3
|
|
|
|
def readSint64(self, name):
|
|
p1 = self._read
|
|
if sys.byteorder == "little":
|
|
v1 = self.readSerial(32, decode=False)
|
|
v2 = self.readSerial(32, decode=False)
|
|
else:
|
|
v2 = self.readSerial(32, decode=False)
|
|
v1 = self.readSerial(32, decode=False)
|
|
v3 = (v1 << 32) | v2
|
|
self._groupRead.append((p1, p1+64, name, 'Sint64', c_int64(v3).value))
|
|
return c_int64(v3).value
|
|
|
|
def readFloat(self, name):
|
|
p1 = self._read
|
|
v = self.readSerial(32, name=name, decode=False, typeName='Float')
|
|
v1 = struct.pack('I', v)
|
|
v2 = struct.unpack('<f',v1)[0]
|
|
self._groupRead.append((p1, p1+32, name, 'Float', v2))
|
|
return v2
|
|
|
|
def readDouble(self, name):
|
|
p1 = self._read
|
|
v = self.readSerial(32, decode=False)
|
|
v1 = struct.pack('I', v)
|
|
w = self.readSerial(32, decode=False)
|
|
w1 = struct.pack('I', w)
|
|
x = v1 + w1
|
|
x1 = struct.unpack('<d', x)[0]
|
|
self._groupRead.append((p1, p1+64, name, 'Double', x1))
|
|
return x1
|
|
|
|
def readChar(self, name, decode=True):
|
|
v = self.readUint8(name=name, decode=decode, typeName='Char')
|
|
return chr(v)
|
|
|
|
def readString(self, name):
|
|
tmp = ''
|
|
_size = self.readUint32(name + ':size', decode=True)
|
|
v1 = self._read
|
|
while _size > 0:
|
|
x = self.readChar('', decode=False)
|
|
tmp += x
|
|
_size -= 1
|
|
v2 = self._read
|
|
if v2 > self._pos:
|
|
raise ValueError
|
|
if v1 < v2:
|
|
self._groupRead.append((v1, v2, name + ':string', 'String', tmp))
|
|
return tmp
|
|
|
|
def readUtf8String(self, name):
|
|
tmp = b''
|
|
_size = self.readUint32(name + ':size', decode=True)
|
|
v1 = self._read
|
|
while _size > 0:
|
|
x = self.readUint8(name='', decode=False)
|
|
tmp += bytes(chr(x), encoding='latin1')
|
|
_size -= 1
|
|
v2 = self._read
|
|
if v2 > self._pos:
|
|
raise ValueError
|
|
if v1 < v2:
|
|
self._groupRead.append((v1, v2, name + ':ustring', 'UString', tmp))
|
|
return tmp.decode(encoding='utf-8')
|
|
|
|
def readUString(self, name):
|
|
tmp = ''
|
|
_size = self.readUint32(name + ':size', decode=True)
|
|
v1 = self._read
|
|
while _size > 0:
|
|
x = self.readUint16(name='', decode=False)
|
|
tmp += chr(x)
|
|
_size -= 1
|
|
v2 = self._read
|
|
if v2 > self._pos:
|
|
raise ValueError
|
|
if v1 < v2:
|
|
self._groupRead.append((v1, v2, name + ':ustring', 'UString', tmp))
|
|
return tmp
|
|
|
|
def readArrayUint8(self, size, name):
|
|
ret = []
|
|
v1 = self._read
|
|
for i in range(0, size):
|
|
ret.append(self.readUint8('', decode=False))
|
|
v2 = self._read
|
|
self._groupRead.append((v1, v2, name, 'ArrayUint8', ''))
|
|
return ret
|
|
|
|
def readBitStreamUint8(self, size, name):
|
|
ret = BitStream()
|
|
v1 = self._read
|
|
for i in range(0, size):
|
|
ret.pushUint8(self.readUint8('', decode=False), decode=False)
|
|
v2 = self._read
|
|
self._groupRead.append((v1, v2, name, 'StreamUint8', ''))
|
|
return ret
|
|
|
|
def readCont(self, name):
|
|
ret = BitStream()
|
|
size = self.readSint32(name = name + ':len', decode=True)
|
|
v1 = self._read
|
|
for i in range(0, size):
|
|
ret.pushBool(self.readSerial(1,name = '', decode=False))
|
|
v2 = self._read
|
|
self._groupRead.append((v1, v2, name + ':data', 'readCont', ''))
|
|
return size, ret
|
|
|
|
def getNotRead(self):
|
|
ret = BitStream()
|
|
readBefore = self._read
|
|
while self._read < self._pos:
|
|
if self._pos - self._read >= 8:
|
|
nsize = 8
|
|
else:
|
|
nsize = self._pos - self._read
|
|
data = self.readSerial(nsize, decode=False)
|
|
ret.internalSerial(data, nsize, decode=False)
|
|
self._read = readBefore
|
|
return ret
|
|
# ------------------------------------
|
|
def __str__(self):
|
|
return ''.join([ chr(x) for x in self._tampon])
|
|
|
|
def message(self):
|
|
# return str(self._pos) + ':' + '.'.join([ format(x, "02x") for x in self._tampon])
|
|
return str(self._pos) + ':' + '.'.join([ "{0:08b}".format(x) for x in self._tampon])
|
|
|
|
def toBytes(self):
|
|
return bytes( self._tampon )
|
|
|
|
def fromBytes(self, data):
|
|
self._read = 0
|
|
self._tampon = [int(x) for x in data]
|
|
self._pos = len(self._tampon) * 8
|
|
|
|
def showLastData(self):
|
|
ret = ""
|
|
readBefore = self._read
|
|
while self._read < self._pos:
|
|
if self._pos - self._read >= 8:
|
|
nsize = 8
|
|
else:
|
|
nsize = self._pos - self._read
|
|
data = self.readSerial(nsize, decode=False)
|
|
if nsize == 1:
|
|
ret += "{0:01b}".format(data)
|
|
elif nsize == 2:
|
|
ret += "{0:02b}".format(data)
|
|
elif nsize == 3:
|
|
ret += "{0:03b}".format(data)
|
|
elif nsize == 4:
|
|
ret += "{0:04b}".format(data)
|
|
elif nsize == 5:
|
|
ret += "{0:05b}".format(data)
|
|
elif nsize == 6:
|
|
ret += "{0:06b}".format(data)
|
|
elif nsize == 7:
|
|
ret += "{0:07b}".format(data)
|
|
else:
|
|
ret += "{0:08b}".format(data)
|
|
if ret != "":
|
|
ret += "."
|
|
ret += "{0:08b}".format(data)
|
|
self._read = readBefore
|
|
return ret
|
|
|
|
def showAllData(self):
|
|
ret = ""
|
|
readBefore = self._read
|
|
self._read = 0
|
|
while self._read < self._pos:
|
|
if self._pos - self._read >= 8:
|
|
nsize = 8
|
|
else:
|
|
nsize = self._pos - self._read
|
|
data = self.readSerial(nsize, decode=False)
|
|
if nsize == 1:
|
|
ret += "{0:01b}".format(data)
|
|
elif nsize == 2:
|
|
ret += "{0:02b}".format(data)
|
|
elif nsize == 3:
|
|
ret += "{0:03b}".format(data)
|
|
elif nsize == 4:
|
|
ret += "{0:04b}".format(data)
|
|
elif nsize == 5:
|
|
ret += "{0:05b}".format(data)
|
|
elif nsize == 6:
|
|
ret += "{0:06b}".format(data)
|
|
elif nsize == 7:
|
|
ret += "{0:07b}".format(data)
|
|
else:
|
|
ret += "{0:08b}".format(data)
|
|
self._read = readBefore
|
|
ret2 = ""
|
|
|
|
last = 0
|
|
for x, y, name, typeName, value in self._groupRead:
|
|
ret2 += "[<" + str(x) + ':' + str(y-1) + "> " + str(name) + ' (' + typeName + ') : ' + ret[x:y] + ' => ' + str(value) + "]"
|
|
last = y
|
|
if last < self._pos:
|
|
ret2 += "{" + ret[last:] + "}"
|
|
|
|
return ret2
|
|
|
|
def showAllDataWrite(self):
|
|
ret = ""
|
|
readBefore = self._read
|
|
self._read = 0
|
|
while self._read < self._pos:
|
|
if self._pos - self._read >= 8:
|
|
nsize = 8
|
|
else:
|
|
nsize = self._pos - self._read
|
|
data = self.readSerial(nsize, decode=False)
|
|
if nsize == 1:
|
|
ret += "{0:01b}".format(data)
|
|
elif nsize == 2:
|
|
ret += "{0:02b}".format(data)
|
|
elif nsize == 3:
|
|
ret += "{0:03b}".format(data)
|
|
elif nsize == 4:
|
|
ret += "{0:04b}".format(data)
|
|
elif nsize == 5:
|
|
ret += "{0:05b}".format(data)
|
|
elif nsize == 6:
|
|
ret += "{0:06b}".format(data)
|
|
elif nsize == 7:
|
|
ret += "{0:07b}".format(data)
|
|
else:
|
|
ret += "{0:08b}".format(data)
|
|
self._read = readBefore
|
|
ret2 = ""
|
|
|
|
last = 0
|
|
for x, y, name, typeName, value in self._groupWrite:
|
|
ret2 += "[<" + str(x) + ':' + str(y-1) + "> " + str(name) + ' (' + typeName + ') : ' + ret[x:y] + ' => ' + str(value) + "]"
|
|
last = y
|
|
if last < self._pos:
|
|
ret2 += "{" + ret[last:] + "}"
|
|
|
|
return ret2
|
|
|
|
def showAllDataRaw(self):
|
|
ret = ""
|
|
readBefore = self._read
|
|
self._read = 0
|
|
while self._read < self._pos:
|
|
if self._pos - self._read >= 8:
|
|
nsize = 8
|
|
else:
|
|
nsize = self._pos - self._read
|
|
data = self.readSerial(nsize, decode=False)
|
|
if nsize == 1:
|
|
ret += "{0:01b}".format(data)
|
|
elif nsize == 2:
|
|
ret += "{0:02b}".format(data)
|
|
elif nsize == 3:
|
|
ret += "{0:03b}".format(data)
|
|
elif nsize == 4:
|
|
ret += "{0:04b}".format(data)
|
|
elif nsize == 5:
|
|
ret += "{0:05b}".format(data)
|
|
elif nsize == 6:
|
|
ret += "{0:06b}".format(data)
|
|
elif nsize == 7:
|
|
ret += "{0:07b}".format(data)
|
|
else:
|
|
ret += "{0:08b}".format(data)
|
|
if ret != "":
|
|
ret += "."
|
|
self._read = readBefore
|
|
return ret
|
|
|
|
def extractAllData(self):
|
|
ret = ""
|
|
readBefore = self._read
|
|
self._read = 0
|
|
while self._read < self._pos:
|
|
if self._pos - self._read >= 8:
|
|
nsize = 8
|
|
else:
|
|
nsize = self._pos - self._read
|
|
data = self.readSerial(nsize, decode=False)
|
|
if nsize == 1:
|
|
ret += "{0:01b}".format(data)
|
|
elif nsize == 2:
|
|
ret += "{0:02b}".format(data)
|
|
elif nsize == 3:
|
|
ret += "{0:03b}".format(data)
|
|
elif nsize == 4:
|
|
ret += "{0:04b}".format(data)
|
|
elif nsize == 5:
|
|
ret += "{0:05b}".format(data)
|
|
elif nsize == 6:
|
|
ret += "{0:06b}".format(data)
|
|
elif nsize == 7:
|
|
ret += "{0:07b}".format(data)
|
|
else:
|
|
ret += "{0:08b}".format(data)
|
|
self._read = readBefore
|
|
ret2 = []
|
|
|
|
last = 0
|
|
for x, y, name, typeName, value in self._groupRead:
|
|
if not typeName:
|
|
nbbit = y - x
|
|
typeName = "%d bit" % nbbit
|
|
if nbbit > 1:
|
|
typeName += "s"
|
|
ret2.append("<" + str(x) + ':' + str(y-1) + "> " + '(' + typeName + ') ' + str(name) + ' => ' + str(value) + " : " + ret[x:y])
|
|
last = y
|
|
if last < self._pos:
|
|
ret2.append("<" + str(last) + ':' + str(self._pos - 1) + "> " + ret[last:])
|
|
|
|
return ret2
|
|
|
|
def checkOnlyZeroAtEnd(self):
|
|
readBefore = self._read
|
|
while self._read < self._pos:
|
|
if self._pos - self._read >= 8:
|
|
nsize = 8
|
|
else:
|
|
nsize = self._pos - self._read
|
|
data = self.readSerial(nsize, decode=False)
|
|
if data > 0:
|
|
return False
|
|
self._read = readBefore
|
|
return True
|
|
|
|
def getPosInBit(self):
|
|
return self._pos
|
|
|
|
def TestBitStream():
|
|
a = BitStream()
|
|
vrai = True
|
|
a.pushBool(decode=True, value=vrai)
|
|
a.pushBool(False)
|
|
a.pushBool(True)
|
|
a.pushBool(True)
|
|
a.pushUint32(1234567890)
|
|
a.pushSint32(-1234567890)
|
|
a.pushUint16(12345)
|
|
a.pushSint16(-12345)
|
|
a.pushUint8(123)
|
|
a.pushSint8(-123)
|
|
#-3.4E+38) # 1.2339999675750732)
|
|
a.pushFloat(-3.3999999521443642e+38)
|
|
a.pushDouble(-1.7E+308)
|
|
a.pushUint64(16045690709418696365)
|
|
a.pushSint64(-1)
|
|
a.pushChar('a')
|
|
a.pushString("Test A Faire")
|
|
print("-" * 40)
|
|
print(a.showAllData())
|
|
print('raw:')
|
|
print(a.showAllDataRaw())
|
|
print("-" * 20)
|
|
print("-" * 80)
|
|
print(a.readBool('a1'))
|
|
print(a.readBool('a2'))
|
|
print(a.readBool('a3'))
|
|
print(a.readBool('a4'))
|
|
print(a.readUint32('a5'))
|
|
print(a.readSint32('a6'))
|
|
print(a.readUint16('a7'))
|
|
print(a.readSint16('a8'))
|
|
print(a.readUint8('a9'))
|
|
print(a.readSint8('a10'))
|
|
print(a.readFloat('a11'))
|
|
print(a.readDouble('a12'))
|
|
print(a.readUint64('a13'))
|
|
print(a.readSint64('a14'))
|
|
print(a.readChar('a15'))
|
|
print(a.readString('a16'))
|
|
print(a.toBytes())
|
|
print("-" * 40)
|
|
print(a.showAllData())
|
|
print("-" * 80)
|
|
b = BitStream()
|
|
b.fromBytes(a.toBytes())
|
|
print(b.readBool('b1'))
|
|
print(b.readBool('b2'))
|
|
print(b.readBool('b3'))
|
|
print(b.readBool('b4'))
|
|
print(b.readUint32('b5'))
|
|
print(b.readSint32('b6'))
|
|
print(b.readUint16('b7'))
|
|
print(b.readSint16('b8'))
|
|
print(b.readUint8('b9'))
|
|
print(b.readSint8('b10'))
|
|
print(b.readFloat('b11'))
|
|
print(b.readDouble('b12'))
|
|
print(b.readUint64('b13'))
|
|
print(b.readSint64('b14'))
|
|
print(b.readChar('b15'))
|
|
print(b.readString('b16'))
|
|
print(b.toBytes())
|
|
print("-" * 40)
|
|
print(b.showAllData())
|
|
print("-" * 80)
|
|
c = BitStream()
|
|
c.pushBool(True)
|
|
c.pushBitStream(a)
|
|
c.pushBitStream(b)
|
|
print(c.readBool('c1'))
|
|
print("-" * 80)
|
|
print(c.readBool('c2'))
|
|
print(c.readBool('c3'))
|
|
print(c.readBool('c4'))
|
|
print(c.readBool('c5'))
|
|
print(c.readUint32('c6'))
|
|
print(c.readSint32('c7'))
|
|
print(c.readUint16('c8'))
|
|
print(c.readSint16('c9'))
|
|
print(c.readUint8('c10'))
|
|
print(c.readSint8('c11'))
|
|
print(c.readFloat('c12'))
|
|
print(c.readDouble('c13'))
|
|
print(c.readUint64('c14'))
|
|
print(c.readSint64('c15'))
|
|
print(c.readChar('c16'))
|
|
print(c.readString('c17'))
|
|
print(c.toBytes())
|
|
print("-" * 50)
|
|
print(c.showAllData())
|
|
print("-" * 50)
|
|
print(c.readBool('c18'))
|
|
print(c.readBool('c19'))
|
|
print(c.readBool('c20'))
|
|
print(c.readBool('c21'))
|
|
print(c.readUint32('c22'))
|
|
print(c.readSint32('c23'))
|
|
print(c.readUint16('c24'))
|
|
print(c.readSint16('c25'))
|
|
print(c.readUint8('c26'))
|
|
print(c.readSint8('c27'))
|
|
print(c.readFloat('c28'))
|
|
print(c.readDouble('c29'))
|
|
print(c.readUint64('c30'))
|
|
print(c.readSint64('c31'))
|
|
print(c.readChar('c32'))
|
|
print("-" * 40)
|
|
print(c.showAllData())
|
|
print(c.readString('c33'))
|
|
print("-" * 40)
|
|
print(c.showAllData())
|
|
print(c.toBytes())
|
|
print("-" * 40)
|
|
print(c.showAllDataRaw())
|
|
print("-" * 20)
|
|
print(c.showAllData())
|
|
print("-" * 80)
|