#!/usr/bin/python3 # -*- coding: utf-8 -*- # # module BitStream # # Copyright (C) 2019 AleaJactaEst # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import logging from ctypes import * import sys import inspect import copy import struct from tools import CBitSet LOGGER='BitStream' class OverflowError(Exception): pass class BitStream(): def __init__(self): self._pos = 0 self._read = 0 self._tampon = [] self._groupRead = [] self._groupWrite = [] self._CheckStreamOverflow = True def __len__(self): return (self._pos + 7) // 8 def reset_read(self): self._read = 0 self._groupRead = [] def __deepcopy__(self, memo): ret = BitStream() ret._pos = self._pos ret._read = self._read ret._tampon = copy.deepcopy(self._tampon, memo) ret._groupRead = copy.deepcopy(self._groupRead, memo) ret._groupWrite = copy.deepcopy(self._groupWrite, memo) return ret def enable_LogErrorOnStreamOverflow(self): self._CheckStreamOverflow = True def disable_LogErrorOnStreamOverflow(self): self._CheckStreamOverflow = False def needRead(self): return self._pos - self._read def sizeData(self): return self._pos def sizeRead(self): return self._read def getRead(self): return self._read def getPos(self): return self._pos def buffer(self): return self._tampon def putRead(self, value): if value > self._pos: raise ValueError self._read = value # ------------------------------------ def internalSerial(self, value, nbits, decode=True, typeName=''): p1 = self._pos if nbits == 0: return elif nbits > 32: raise "Out of range" pos = self._pos % 8 if pos == 0: self._tampon.append(0) value = c_uint32(value).value if nbits != 32: mask = (1 << nbits) - 1; v = value & mask; else: v = value; _FreeBits = 8 - (self._pos % 8) if nbits > _FreeBits: self._tampon[-1] |= (v >> ( nbits - _FreeBits)) self._pos += _FreeBits self.internalSerial( v , nbits - _FreeBits, decode=False) else: self._tampon[-1] |= (v << ( _FreeBits - nbits)) self._pos += nbits if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p1+nbits, name, typeName, value)) def pushSerial64(self, value, nbits, decode=True, typeName=None): if nbits > 32: p1 = self._pos msd = value >> 32 self.internalSerial(msd, nbits - 32, False, typeName) self.internalSerial(value, 32, False, typeName) value = msd << 32 | msd2 if typeName is None: typeName = 'Uint{0}'.format(nbits) self._groupWrite.append((p1, p1+nbits, name, typeName, value)) return value else: return self.internalSerial(value, nbits, decode, typeName) def pushBool(self, value, decode=True): p1 = self._pos if value: v = 1 else: v = 0 self.internalSerial(v, 1, decode=False) if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p1+1, name, 'Bool', value)) def pushUint32(self, value, decode=True, typeName='Uint32'): p1 = self._pos self.internalSerial(value, 32, decode=False) if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p1+32, name, typeName, value)) def pushSint32(self, value, decode=True): p1 = self._pos self.internalSerial(value, 32, decode=False) if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p1+32, name, 'Sint32', value)) def pushUint16(self, value, decode=True): p1 = self._pos self.internalSerial(value, 16, decode=False) if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p1+16, name, 'Uint16', value)) def pushSint16(self, value, decode=True): p1 = self._pos self.internalSerial(value, 16, decode=False) if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p1+16, name, 'Sint16', value)) def pushUint8(self, value, decode=True): p1 = self._pos self.internalSerial(value, 8, decode=False) if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p1+8, name, 'Uint8', value)) def pushSint8(self, value, decode=True): p1 = self._pos self.internalSerial(value, 8, decode=False) if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p1+8, name, 'Sint8', value)) def pushUint64(self, value, decode=True): p1 = self._pos if sys.byteorder == "little": self.internalSerial(value>>32, 32, decode=False) self.internalSerial(value, 32, decode=False) else: self.internalSerial(value, 32, decode=False) self.internalSerial(value>>32, 32, decode=False) if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p1+64, name, 'Uint64', value)) def pushSint64(self, value, decode=True): p1 = self._pos if sys.byteorder == "little": self.internalSerial(value>>32, 32, decode=False) self.internalSerial(value, 32, decode=False) else: self.internalSerial(value, 32, decode=False) self.internalSerial(value>>32, 32, decode=False) if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p1+64, name, 'Sint64', value)) def pushFloat(self, value, decode=True): p1 = self._pos v = c_float(value).value v1 = struct.pack('f', v) v2 = struct.unpack('> 32, 32, decode=False) if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p1+64, name, 'Float', value)) def pushChar(self, value, decode=True): p1 = self._pos v = ord(value) self.internalSerial(v, 8, decode=False) if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p1+8, name, 'Char', value)) def pushString(self, value, decode=True): lenValue = len(value) self.pushUint32(lenValue, decode=True, typeName='String:len') p1 = self._pos for _char in value: self.pushChar(_char, decode=False) p2 = self._pos if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p2, name, 'String', value)) def pushUString(self, value, decode=True): lenValue = len(value) self.pushUint32(lenValue, decode=True, typeName='UString:len') p1 = self._pos for x in value: _char16bit = ord(x) self.internalSerial(_char16bit, 16, decode=False) p2 = self._pos if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p2, name, 'UString', value)) def pushArrayUint8(self, value, decode=True): ' ex.: pushArrayChar([0,1,3,4]) ' p1 = self._pos for i in value: self.pushUint8(i, decode=False) p2 = self._pos if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p2, name, 'ArrayUint8', value)) def pushBuffer(self, source, decode=True): ''' Push BitStream with all byte (extend to byte if necessary) ''' p1 = self._pos srcRead = source.getRead() source.putRead(0) for ele in source._tampon: self.pushUint8(ele, decode=False) source.putRead(srcRead) p2 = self._pos if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p2, name, 'Buffer', '')) def pushBitStream(self, source, decode=True): p1 = self._pos srcRead = source.getRead() source.putRead(0) need = 8 - (self._pos % 8) if need != 8: self.internalSerial(source.readSerial(need, decode=False), need, decode=False) while source.needRead() >= 8: self.pushUint8(source.readSerial(8, decode=False), decode=False) need = source.needRead() if need > 0: self.internalSerial(source.readSerial(need, decode=False), need, decode=False) source.putRead(srcRead) p2 = self._pos if decode: frame = inspect.currentframe() frame = inspect.getouterframes(frame)[1] string = inspect.getframeinfo(frame[0]).code_context[0].strip() args = string[string.find('(') + 1:-1].split(',') name = '?' for i in args: if i.find('=') != -1: if i.split('=')[0].strip() == "value": name = i.split('=')[1].strip() break else: name = i break name = name.strip().split(' ')[0].strip() self._groupWrite.append((p1, p2, name, 'BitStream', '')) # ------------------------------------ def readSerial(self, nbits, name="", decode=True, typeName='', emulate=False, signed=False, commentValue=None): v1 = self._read if nbits == 0: return elif nbits > 32: raise "Out of range" if self._read + nbits > self._pos: if self._CheckStreamOverflow: logging.getLogger(LOGGER).error("Error: Stream Overflow - nbits:%d/%d name:'%s' decode:'%s' typeName:'%s' emulate:%s msg:%s" % (nbits, self._pos-self._read, name, str(decode), typeName, str(emulate), self.showAllData())) raise OverflowError if emulate: oldRead = self._read value = 0 pos = self._read // 8 _FreeBits = 8 - (self._read % 8) v = self._tampon[pos] & ((1 << _FreeBits) - 1) if nbits > _FreeBits: value |= (v << (nbits-_FreeBits)) self._read += _FreeBits value |= self.readSerial(nbits - _FreeBits, decode=False) else: value |= (v >> (_FreeBits-nbits)) self._read += nbits if emulate: self._read = oldRead if decode and not emulate: if signed: value = c_int32(value).value self._groupRead.append((v1, v1+nbits, name, typeName, value, commentValue)) return value def readSerial64(self, nbits, name="", decode=True, typeName=None, commentValue=None): if nbits > 32: v1 = self._read msd = self.readSerial(nbits - 32, name, False, typeName) msd2 = self.readSerial(32, name, False, typeName) value = msd << 32 | msd2 if decode: if typeName is None: typeName = 'Uint{0}'.format(nbits) self._groupRead.append((v1, v1+nbits, name, typeName, value, commentValue)) return value else: if typeName is None: typeName = 'Uint{0}'.format(nbits) return self.readSerial(nbits, name, decode, typeName, commentValue=commentValue) def readCBitSet(self, nbits, name="", decode=True, typeName=None, commentValue=None): # khanat-opennel-code/code/nel/src/misc/bit_mem_stream.cpp void CBitMemStream::readBits( NLMISC::CBitSet& bitfield ) cBitSet = CBitSet.CBitSet() v1 = self._read cBitSet.resize(nbits) pos = 0 last = nbits logging.getLogger(LOGGER).debug("nbits:%d" % nbits) while last >= cBitSet.NL_BITLEN: v = self.readSerial(cBitSet.NL_BITLEN, decode=False) logging.getLogger(LOGGER).debug("pos:%d v:%d reel:%d" % (pos, v, (nbits//32)-pos-1)) cBitSet.setUint(v, (nbits//32)-pos-1) last -= cBitSet.NL_BITLEN pos += 1 if last > 0: v = self.readSerial(last, decode=False) logging.getLogger(LOGGER).debug("pos:%d v:%d reel:%d" % (pos, v, (nbits//32)-pos-1)) cBitSet.setUint(v, (nbits//32)-pos-1) # pos = 0 # while pos < nbits: # v = self.readSerial(1, decode=False) # cBitSet.set(pos, v) # pos += 1 if decode: valuereal = cBitSet.showBitString() _size = len(valuereal) _start = _size - nbits self._groupRead.append((v1, v1+nbits, name, typeName, valuereal[0:nbits], commentValue)) return cBitSet def readCBitSetOld2(self, nbits, name="", decode=True, typeName=None, commentValue=None): # khanat-opennel-code/code/nel/src/misc/bit_mem_stream.cpp void CBitMemStream::readBits( NLMISC::CBitSet& bitfield ) cBitSet = CBitSet.CBitSet() v1 = self._read cBitSet.resize(nbits) pos = 0 logging.getLogger(LOGGER).debug("nbits:%d" % nbits) pos = 0 while pos < nbits: v = self.readSerial(1, decode=False) logging.getLogger(LOGGER).debug("pos:%d value:%d" % (nbits-pos-1, v)) cBitSet.set(nbits-pos-1, v) pos += 1 if decode: self._groupRead.append((v1, v1+nbits, name, typeName, '', commentValue)) logging.getLogger(LOGGER).debug("cBitSet:%s" % cBitSet.showBitString()) return cBitSet def readCBitSetOld3(self, nbits, name="", decode=True, typeName=None, commentValue=None): # khanat-opennel-code/code/nel/src/misc/bit_mem_stream.cpp void CBitMemStream::readBits( NLMISC::CBitSet& bitfield ) cBitSet = CBitSet.CBitSet() v1 = self._read cBitSet.resize(nbits) pos = 0 logging.getLogger(LOGGER).debug("nbits:%d" % nbits) pos = 0 while pos < nbits: v = self.readSerial(1, decode=False) logging.getLogger(LOGGER).debug("pos:%d value:%d" % (nbits-pos-1, v)) cBitSet.set(pos, v) pos += 1 if decode: self._groupRead.append((v1, v1+nbits, name, typeName, '', commentValue)) logging.getLogger(LOGGER).debug("cBitSet:%s" % cBitSet.showBitString()) return cBitSet def readBool(self, name): v1 = self._read v = self.readSerial(1, name=name, decode=False, typeName='Bool') self._groupRead.append((v1, v1+1, name, 'Bool', 'True' if v != 0 else 'False', None)) if v != 0: return True else: return False def readUint32(self, name, decode=True, typeName='Uint32'): v = self.readSerial(32, name=name, decode=decode, typeName=typeName) return v def readSint32(self, name, decode=True): v = self.readSerial(32, name=name, decode=decode, typeName='Sint32', signed=True) return c_int32(v).value def readUint16(self, name, decode=True): v = self.readSerial(16, name=name, decode=decode, typeName='Uint16') return v def readSint16(self, name): v = self.readSerial(16, name=name, typeName='Sint16', signed=True) return c_int16(v).value def readUint8(self, name, decode=True, typeName='Uint8'): v = self.readSerial(8, decode=decode, name=name, typeName=typeName) return v def readSint8(self, name): v = self.readSerial(8, name=name, typeName='Sint8', signed=True) return c_int8(v).value def readUint64(self, name): # khanat-opennel-code/code/nel/include/nel/misc/stream.h #define NLMISC_BSWAP64(src) (src) = (((src)>>56)&0xFF) | ((((src)>>48)&0xFF)<<8) | ((((src)>>40)&0xFF)<<16) | ((((src)>>32)&0xFF)<<24) | ((((src)>>24)&0xFF)<<32) | ((((src)>>16)&0xFF)<<40) | ((((src)>>8)&0xFF)<<48) | (((src)&0xFF)<<56) p1 = self._read if sys.byteorder == "little": v4 = self.readSerial(8, decode=False) v3 = self.readSerial(8, decode=False) v2 = self.readSerial(8, decode=False) v1 = self.readSerial(8, decode=False) v8 = self.readSerial(8, decode=False) v7 = self.readSerial(8, decode=False) v6 = self.readSerial(8, decode=False) v5 = self.readSerial(8, decode=False) else: v1 = self.readSerial(8, decode=False) v2 = self.readSerial(8, decode=False) v3 = self.readSerial(8, decode=False) v4 = self.readSerial(8, decode=False) v5 = self.readSerial(8, decode=False) v6 = self.readSerial(8, decode=False) v7 = self.readSerial(8, decode=False) v8 = self.readSerial(8, decode=False) ret = v8 << 56 | v7 << 48 | v6 << 40 | v5 << 32 | v4 << 24 | v3 << 16 | v2 << 8 | v1 self._groupRead.append((p1, p1+64, name, 'Uint64', ret, None)) return v3 def readSint64(self, name): # khanat-opennel-code/code/nel/include/nel/misc/stream.h #define NLMISC_BSWAP64(src) (src) = (((src)>>56)&0xFF) | ((((src)>>48)&0xFF)<<8) | ((((src)>>40)&0xFF)<<16) | ((((src)>>32)&0xFF)<<24) | ((((src)>>24)&0xFF)<<32) | ((((src)>>16)&0xFF)<<40) | ((((src)>>8)&0xFF)<<48) | (((src)&0xFF)<<56) p1 = self._read if sys.byteorder == "little": v4 = self.readSerial(8, decode=False) v3 = self.readSerial(8, decode=False) v2 = self.readSerial(8, decode=False) v1 = self.readSerial(8, decode=False) v8 = self.readSerial(8, decode=False) v7 = self.readSerial(8, decode=False) v6 = self.readSerial(8, decode=False) v5 = self.readSerial(8, decode=False) else: v1 = self.readSerial(8, decode=False) v2 = self.readSerial(8, decode=False) v3 = self.readSerial(8, decode=False) v4 = self.readSerial(8, decode=False) v5 = self.readSerial(8, decode=False) v6 = self.readSerial(8, decode=False) v7 = self.readSerial(8, decode=False) v8 = self.readSerial(8, decode=False) ret = v8 << 56 | v7 << 48 | v6 << 40 | v5 << 32 | v4 << 24 | v3 << 16 | v2 << 8 | v1 self._groupRead.append((p1, p1+64, name, 'Sint64', c_int64(ret).value, None)) return c_int64(v3).value def readFloat(self, name): p1 = self._read v = self.readSerial(32, name=name, decode=False, typeName='Float') v1 = struct.pack('I', v) v2 = struct.unpack(' 0: x = self.readChar('', decode=False) tmp += x _size -= 1 v2 = self._read if v2 > self._pos: raise ValueError if v1 < v2: self._groupRead.append((v1, v2, name + ':string', 'String', tmp, None)) return tmp def readUtf8String(self, name): tmp = b'' _size = self.readUint32(name + ':size', decode=True) v1 = self._read while _size > 0: x = self.readUint8(name='', decode=False) tmp += bytes(chr(x), encoding='latin1') _size -= 1 v2 = self._read if v2 > self._pos: raise ValueError if v1 < v2: self._groupRead.append((v1, v2, name + ':ustring', 'UString', tmp, None)) return tmp.decode(encoding='utf-8') def readUString(self, name): tmp = '' _size = self.readUint32(name + ':size', decode=True) v1 = self._read while _size > 0: x = self.readUint16(name='', decode=False) tmp += chr(x) _size -= 1 v2 = self._read if v2 > self._pos: raise ValueError if v1 < v2: self._groupRead.append((v1, v2, name + ':ustring', 'UString', tmp, None)) return tmp def readArrayUint8(self, size, name): ret = [] v1 = self._read for i in range(0, size): ret.append(self.readUint8('', decode=False)) v2 = self._read self._groupRead.append((v1, v2, name, 'ArrayUint8', '', None)) return ret def readBitStreamUint8(self, size, name): ret = BitStream() v1 = self._read for i in range(0, size): ret.pushUint8(self.readUint8('', decode=False), decode=False) v2 = self._read self._groupRead.append((v1, v2, name, 'StreamUint8', '', None)) return ret def readCont(self, name): ret = BitStream() size = self.readSint32(name = name + ':len', decode=True) v1 = self._read for i in range(0, size): ret.pushBool(self.readSerial(1,name = '', decode=False)) v2 = self._read self._groupRead.append((v1, v2, name + ':data', 'readCont', '', None)) return size, ret def getNotRead(self): ret = BitStream() readBefore = self._read while self._read < self._pos: if self._pos - self._read >= 8: nsize = 8 else: nsize = self._pos - self._read data = self.readSerial(nsize, decode=False) ret.internalSerial(data, nsize, decode=False) self._read = readBefore return ret # ------------------------------------ def __str__(self): return ''.join([ chr(x) for x in self._tampon]) def message(self): # return str(self._pos) + ':' + '.'.join([ format(x, "02x") for x in self._tampon]) return str(self._pos) + ':' + '.'.join([ "{0:08b}".format(x) for x in self._tampon]) def toBytes(self): return bytes( self._tampon ) def fromBytes(self, data): self._read = 0 self._tampon = [int(x) for x in data] self._pos = len(self._tampon) * 8 def showLastData(self): ret = "" readBefore = self._read while self._read < self._pos: if self._pos - self._read >= 8: nsize = 8 else: nsize = self._pos - self._read data = self.readSerial(nsize, decode=False) if nsize == 1: ret += "{0:01b}".format(data) elif nsize == 2: ret += "{0:02b}".format(data) elif nsize == 3: ret += "{0:03b}".format(data) elif nsize == 4: ret += "{0:04b}".format(data) elif nsize == 5: ret += "{0:05b}".format(data) elif nsize == 6: ret += "{0:06b}".format(data) elif nsize == 7: ret += "{0:07b}".format(data) else: ret += "{0:08b}".format(data) if ret != "": ret += "." ret += "{0:08b}".format(data) self._read = readBefore return ret def showAllData(self): ret = "" readBefore = self._read self._read = 0 while self._read < self._pos: if self._pos - self._read >= 8: nsize = 8 else: nsize = self._pos - self._read data = self.readSerial(nsize, decode=False) if nsize == 1: ret += "{0:01b}".format(data) elif nsize == 2: ret += "{0:02b}".format(data) elif nsize == 3: ret += "{0:03b}".format(data) elif nsize == 4: ret += "{0:04b}".format(data) elif nsize == 5: ret += "{0:05b}".format(data) elif nsize == 6: ret += "{0:06b}".format(data) elif nsize == 7: ret += "{0:07b}".format(data) else: ret += "{0:08b}".format(data) self._read = readBefore ret2 = "" last = 0 for x, y, name, typeName, value, commentValue in self._groupRead: if typeName == None: typeName = '-' if commentValue == None: strcommentValue = "" else: strcommentValue = " " + commentValue if value: strvalue = ' => ' + str(value) else: strvalue = '' ret2 += "[<" + str(x) + ':' + str(y-1) + "> " + str(name) + ' (' + typeName + ') : ' + ret[x:y] + strvalue + strcommentValue + "]" last = y if last < self._pos: ret2 += "{" + ret[last:] + "}" return ret2 def showAllDataWrite(self): ret = "" readBefore = self._read self._read = 0 while self._read < self._pos: if self._pos - self._read >= 8: nsize = 8 else: nsize = self._pos - self._read data = self.readSerial(nsize, decode=False) if nsize == 1: ret += "{0:01b}".format(data) elif nsize == 2: ret += "{0:02b}".format(data) elif nsize == 3: ret += "{0:03b}".format(data) elif nsize == 4: ret += "{0:04b}".format(data) elif nsize == 5: ret += "{0:05b}".format(data) elif nsize == 6: ret += "{0:06b}".format(data) elif nsize == 7: ret += "{0:07b}".format(data) else: ret += "{0:08b}".format(data) self._read = readBefore ret2 = "" last = 0 for x, y, name, typeName, value in self._groupWrite: if typeName == None: typeName = '-' ret2 += "[<" + str(x) + ':' + str(y-1) + "> " + str(name) + ' (' + typeName + ') : ' + ret[x:y] + ' => ' + str(value) + "]" last = y if last < self._pos: ret2 += "{" + ret[last:] + "}" return ret2 def showAllDataRaw(self): ret = "" readBefore = self._read self._read = 0 while self._read < self._pos: if self._pos - self._read >= 8: nsize = 8 else: nsize = self._pos - self._read data = self.readSerial(nsize, decode=False) if nsize == 1: ret += "{0:01b}".format(data) elif nsize == 2: ret += "{0:02b}".format(data) elif nsize == 3: ret += "{0:03b}".format(data) elif nsize == 4: ret += "{0:04b}".format(data) elif nsize == 5: ret += "{0:05b}".format(data) elif nsize == 6: ret += "{0:06b}".format(data) elif nsize == 7: ret += "{0:07b}".format(data) else: ret += "{0:08b}".format(data) if ret != "": ret += "." self._read = readBefore return ret def extractAllData(self): ret = "" readBefore = self._read self._read = 0 while self._read < self._pos: if self._pos - self._read >= 8: nsize = 8 else: nsize = self._pos - self._read data = self.readSerial(nsize, decode=False) if nsize == 1: ret += "{0:01b}".format(data) elif nsize == 2: ret += "{0:02b}".format(data) elif nsize == 3: ret += "{0:03b}".format(data) elif nsize == 4: ret += "{0:04b}".format(data) elif nsize == 5: ret += "{0:05b}".format(data) elif nsize == 6: ret += "{0:06b}".format(data) elif nsize == 7: ret += "{0:07b}".format(data) else: ret += "{0:08b}".format(data) self._read = readBefore ret2 = [] last = 0 for x, y, name, typeName, value, commentValue in self._groupRead: if commentValue == None: commentValue = "" plus = "" else: plus = " " if not typeName: nbbit = y - x typeName = "%d bit" % nbbit if nbbit > 1: typeName += "s" ret2.append("<" + str(x) + ':' + str(y-1) + "> " + '(' + typeName + ') ' + str(name) + ' => ' + str(value) + " : " + ret[x:y] + plus + commentValue) last = y if last < self._pos: ret2.append("<" + str(last) + ':' + str(self._pos - 1) + "> " + ret[last:]) return ret2 def checkOnlyZeroAtEnd(self): readBefore = self._read while self._read < self._pos: if self._pos - self._read >= 8: nsize = 8 else: nsize = self._pos - self._read data = self.readSerial(nsize, decode=False) if data > 0: return False self._read = readBefore return True def getPosInBit(self): return self._pos def TestBitStream(): a = BitStream() vrai = True a.pushBool(decode=True, value=vrai) a.pushBool(False) a.pushBool(True) a.pushBool(True) a.pushUint32(1234567890) a.pushSint32(-1234567890) a.pushUint16(12345) a.pushSint16(-12345) a.pushUint8(123) a.pushSint8(-123) #-3.4E+38) # 1.2339999675750732) a.pushFloat(-3.3999999521443642e+38) a.pushDouble(-1.7E+308) a.pushUint64(16045690709418696365) a.pushSint64(-1) a.pushChar('a') a.pushString("Test A Faire") print("-" * 40) print(a.showAllData()) print('raw:') print(a.showAllDataRaw()) print("-" * 20) print("-" * 80) print(a.readBool('a1')) print(a.readBool('a2')) print(a.readBool('a3')) print(a.readBool('a4')) print(a.readUint32('a5')) print(a.readSint32('a6')) print(a.readUint16('a7')) print(a.readSint16('a8')) print(a.readUint8('a9')) print(a.readSint8('a10')) print(a.readFloat('a11')) print(a.readDouble('a12')) print(a.readUint64('a13')) print(a.readSint64('a14')) print(a.readChar('a15')) print(a.readString('a16')) print(a.toBytes()) print("-" * 40) print(a.showAllData()) print("-" * 80) b = BitStream() b.fromBytes(a.toBytes()) print(b.readBool('b1')) print(b.readBool('b2')) print(b.readBool('b3')) print(b.readBool('b4')) print(b.readUint32('b5')) print(b.readSint32('b6')) print(b.readUint16('b7')) print(b.readSint16('b8')) print(b.readUint8('b9')) print(b.readSint8('b10')) print(b.readFloat('b11')) print(b.readDouble('b12')) print(b.readUint64('b13')) print(b.readSint64('b14')) print(b.readChar('b15')) print(b.readString('b16')) print(b.toBytes()) print("-" * 40) print(b.showAllData()) print("-" * 80) c = BitStream() c.pushBool(True) c.pushBitStream(a) c.pushBitStream(b) print(c.readBool('c1')) print("-" * 80) print(c.readBool('c2')) print(c.readBool('c3')) print(c.readBool('c4')) print(c.readBool('c5')) print(c.readUint32('c6')) print(c.readSint32('c7')) print(c.readUint16('c8')) print(c.readSint16('c9')) print(c.readUint8('c10')) print(c.readSint8('c11')) print(c.readFloat('c12')) print(c.readDouble('c13')) print(c.readUint64('c14')) print(c.readSint64('c15')) print(c.readChar('c16')) print(c.readString('c17')) print(c.toBytes()) print("-" * 50) print(c.showAllData()) print("-" * 50) print(c.readBool('c18')) print(c.readBool('c19')) print(c.readBool('c20')) print(c.readBool('c21')) print(c.readUint32('c22')) print(c.readSint32('c23')) print(c.readUint16('c24')) print(c.readSint16('c25')) print(c.readUint8('c26')) print(c.readSint8('c27')) print(c.readFloat('c28')) print(c.readDouble('c29')) print(c.readUint64('c30')) print(c.readSint64('c31')) print(c.readChar('c32')) print("-" * 40) print(c.showAllData()) print(c.readString('c33')) print("-" * 40) print(c.showAllData()) print(c.toBytes()) print("-" * 40) print(c.showAllDataRaw()) print("-" * 20) print(c.showAllData()) print("-" * 80)