#!/usr/bin/python3 # -*- coding: utf-8 -*- # # module DecodeDatabase # # Copyright (C) 2019 AleaJactaEst # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import logging from tools import getPowerOf2 from tools import BitStream LOGGER='DecodeDatabase' def show_dico(dico, level=1): try: for ele in dico: if isinstance(dico[ele], dict): logging.getLogger(LOGGER).debug("%s %s %s" % ("." * level, ele , ":")) if isinstance(dico[ele], dict): show_dico(dico[ele], level+1) else: logging.getLogger(LOGGER).debug("%s %s %s" % ("." * level, ele, ':', dico[ele])) except: logging.getLogger(LOGGER).debug("empty") def print_dico(dico, level=1): try: for ele in dico: if isinstance(dico[ele], dict): print("%s %s %s" % ("." * level, ele , ":")) if isinstance(dico[ele], dict): show_dico(dico[ele], level+1) else: print("%s %s %s" % ("." * level, ele, ':', dico[ele])) except: pass def child(ele): ret = {} ret_branch = {} ref_other = {} id_branch = 0 id_other = 0 min_i = -1 max_i = -1 for k in ele.keys(): ret[k] = ele.get(k) logging.getLogger(LOGGER).debug("%s %s" % (str(k), str(ele.get(k)))) for _child in list(ele): x = child(_child) if x['name'] == 'branch': ret_branch.setdefault(id_branch, x) id_branch += 1 else: ref_other.setdefault(id_other, x) id_other += 1 if ret_branch or ref_other: ret['child'] = {} id = 0 for x in ret_branch: min_i = max_i + 1 max_i = min_i #show_dico( ret_branch[x]) if 'count' in ret_branch[x]: max_i = min_i + int(ret_branch[x]['count']) - 1 ret_branch[x]['min'] = min_i ret_branch[x]['max'] = max_i ret['child'].setdefault(id, ret_branch[x]) id += 1 for x in ref_other: min_i = max_i + 1 max_i = min_i #show_dico( ref_other[x]) if 'count' in ref_other[x]: max_i = min_i + int(ref_other[x]['count']) - 1 ref_other[x]['min'] = min_i ref_other[x]['max'] = max_i ret['child'].setdefault(id, ref_other[x]) id += 1 return ret def count_elements(head): #print(head) if 'child' in head.keys(): list_ele = [x for x in list(head['child'].keys()) if isinstance(x, int)] else: list_ele = [x for x in list(head.keys()) if isinstance(x, int)] logging.getLogger(LOGGER).debug(str(list_ele)) last_key = max(list_ele) logging.getLogger(LOGGER).debug(str(last_key )) try: last_key = max(list_ele) if 'child' in head.keys(): return head['child'][last_key]['max'] + 1 else: return head[last_key]['max'] + 1 except TypeError: return 0 def get_element(head, id): logging.getLogger(LOGGER).debug("id: %s" % str(id)) for ele in head['child']: logging.getLogger(LOGGER).debug("ele:%s" % str(ele)) logging.getLogger(LOGGER).debug("head:%s" % str(head)) logging.getLogger(LOGGER).debug("max:%s", str(head['child'][ele]['max'])) logging.getLogger(LOGGER).debug("min:%s", str(head['child'][ele]['min'])) if id <= head['child'][ele]['max'] and id >= head['child'][ele]['min']: return head['child'][ele] return None class LeafDatabase(): def __init__(self): self.name = "" self.type = "" def loadXmlWithoutCount(self, xmldata, extra=""): self.name = xmldata.get('name') + extra self.type = xmldata.get('type') def countLeaves(self): logging.getLogger(LOGGER).debug("countLeaves leaf %s (nb:1)" % (self.name)) return 1 def show(self, level=1): logging.getLogger(LOGGER).debug("%s %s Leaf %s [%s]" % ("*" * level, str(level), self.name, str(self.type))) def execute(self, msgin, name=""): if name: tmp = name else: tmp = self.name if self.type[0] == 'I': # Unsigned logging.getLogger(LOGGER).debug("Read:" + str (self.type)) value = int(self.type[1:]) data = msgin.readSerialUint64(value, name=tmp, typeName="Uint" + str(value)) elif self.type[0] == 'S': # Signed logging.getLogger(LOGGER).debug("Read:" + str (self.type)) value = int(self.type[1:]) # _ = msgin.readNbChar(value, name='DatabaseXML' + tmp) data = msgin.readSerialUint64(value, name=tmp, typeName="Sint" + str(value)) elif self.type == 'TEXT': logging.getLogger(LOGGER).debug("type:" + str (self.type)) value = 32 data = msgin.readSerialSint64(value, name=idname, typeName="Uint32/TEXT") else: logging.getLogger(LOGGER).error("Type inconnu:" + str (self.type)) raise "type not managed" return True, {tmp: data} def execute_atom(self, level, pos, msgin, name=""): if name: extraName = name + "/" + self.name else: extraName = self.name idname = extraName logging.getLogger(LOGGER).debug('level:'+ str(level) + ' pos:' + str(pos) + ' idname:' + idname) if level < pos: return level + 1, None if self.type[0] == 'I': value = int(self.type[1:]) if value > 64: raise "type not managed" logging.getLogger(LOGGER).debug('value:' + str(value)) data = msgin.readSerialUint64(value, name=idname, typeName="Uint" + str(value)) return level+1, {extraName: data} elif self.type[0] == 'S': logging.getLogger(LOGGER).debug("type:" + str (self.type)) value = int(self.type[1:]) data = msgin.readSerialSint64(value, name=idname, typeName="Sint" + str(value)) return level+1, {extraName: data} elif self.type == 'TEXT': logging.getLogger(LOGGER).debug("type:" + str (self.type)) value = 32 data = msgin.readSerialUint64(value, name=idname, typeName="Uint32/TEXT") return level+1, {extraName: data} else: logging.getLogger(LOGGER).debug("Type inconnu:", self.type) raise "type not managed" return level, None class BranchDatabase(): def __init__(self): self.name = None self.branch = [] self.leaf = [] self.min = None self.max = None self.atom = False def loadXml(self, xmldata, extra="", filter=None): if filter: if 'bank' in xmldata: if filter != xmldata['bank']: return else: return self.name = xmldata.get('name') + extra if xmldata.get('atom'): self.atom = True for ele in xmldata: if ele.get('count'): count = int(ele.get('count')) if ele.tag == 'branch': for i in range(0, count): newbranch = BranchDatabase() newbranch.loadXml(ele, str(i)) self.branch.append(newbranch) elif ele.tag == 'leaf': for i in range(0, count): newleaf = LeafDatabase() newleaf.loadXmlWithoutCount(ele, str(i)) self.leaf.append(newleaf) else: if ele.tag == 'branch': newbranch = BranchDatabase() newbranch.loadXml(ele, "") self.branch.append(newbranch) elif ele.tag == 'leaf': newleaf = LeafDatabase() newleaf.loadXmlWithoutCount(ele, "") self.leaf.append(newleaf) def loadRootXml(self, xmldata, filter=None): for ele in xmldata: if ele.tag == 'branch': if filter: if ele.get('bank') == filter: newbranch = BranchDatabase() newbranch.loadXml(ele) self.branch.append(newbranch) def getIdBits(self): count = 0 for ele in self.branch: count += 1 for ele in self.leaf: count += 1 return count def countLeaves(self): count = 0 for ele in self.branch: count += ele.countLeaves() for ele in self.leaf: count += ele.countLeaves() return count def execute_atom_found(self, level, pos, msgin, name=""): step=1 if name: extraName = name + self.name else: extraName = self.name logging.getLogger(LOGGER).debug('level:' + str(level) + ' pos:' + str(pos) + ' name:' + name) idname = extraName for ele in self.branch: level, data = ele.execute_atom_found(level, pos, msgin, idname) logging.getLogger(LOGGER).debug('level:' + str(level) + ' pos:' + str(pos) + ' name:' + name + ' idname:' + str(idname)) if level > pos: return level, data for ele in self.leaf: level, data = ele.execute_atom(level, pos, msgin, idname) logging.getLogger(LOGGER).debug('level:' + str(level) + ' pos:' + str(pos) + ' name:' + name + ' idname:' + str(idname)) if level > pos: return level, data return level, None def show(self, level=0, pos=0, filterlevel=None): logging.getLogger(LOGGER).debug( "%s %s pos:%s Branch:%s atom:%s getIdBits:%s" % ("*" * level, str(level), str(pos), str(self.name), str(self.atom), str(self.getIdBits()))) if filterlevel is not None: if filterlevel <= level: return i = 0 for ele in self.branch: ele.show(level + 1, i, filterlevel) i += 1 for ele in self.leaf: ele.show(level + 1) def execute_atom(self, msgin, parent): ret = {} nbchild = self.countLeaves() # nbchild = len(self.leaf) + len(self.branch) #nbBit = getPowerOf2.getPowerOf2_Bis(nbchild) logging.getLogger(LOGGER).debug("needRead:" + str(msgin.needRead()) + " nbchild:" + str(nbchild)) logging.getLogger(LOGGER).debug(msgin.showAllData()) if self.name: idname = parent + self.name +'/' else: idname = parent cBitSet = msgin.readCBitSet(nbchild, idname + ":Param", typeName = 'Uint' + str(nbchild)) #cBitSet = CBitSet.CBitSet() #cBitSet.readSerialExtra(msgin, nbchild, self.name + "/Param") logging.getLogger(LOGGER).debug(msgin.showAllData()) todelete_count_true = 0 for i in range(0, nbchild): #ii = nbchild - i logging.getLogger(LOGGER).debug(str(i) + " - " + str(cBitSet.get(i))) if cBitSet.get(i): todelete_count_true += 1 level, data = self.execute_atom_found(0, i, msgin, parent) #print(data) for key in data: ret.setdefault( key, data[key]) #ret.setdefault( self.name+str(i), data ) if todelete_count_true > 1: logging.getLogger(LOGGER).debug(msgin.showAllData()) logging.getLogger(LOGGER).debug(msgin.showAllData()) if ret: return True, ret else: return False, None def execute_normal(self, msgin, parent): nbchild = self.getIdBits() #nbBit = getPowerOf2.getPowerOf2_Bis(nbchild) nbBit = getPowerOf2.getPowerOf2_ter(nbchild) if nbBit > msgin.needRead() : logging.getLogger(LOGGER).debug("nbBit:" + str(nbBit) + " nbchild:" + str(nbchild) + " needRead:" + str(msgin.needRead() )) return False, None logging.getLogger(LOGGER).debug("needRead:" + str(msgin.needRead()) + " nbBit:" + str(nbBit)) id = msgin.readSerial(nbBit, name=parent, typeName='Uint'+str(nbBit), emulate=True) logging.getLogger(LOGGER).debug("read : needRead:" + str(msgin.needRead()) + " nbBit:" + str(nbBit) + " id:" + str(id)) i = 0 ii = 0 for ele in self.branch: #logging.getLogger(LOGGER).debug(str(i) + " id:" + str(id) + " name:" + str(ele.name)) i += 1 if i > id: comment = "" logging.getLogger(LOGGER).debug(str(i) + " id:" + str(id) + " name:" + str(ele.name)) if self.name: idnameshort = parent + self.name + '/' else: idnameshort = parent idname = idnameshort + ele.name _= msgin.readSerial(nbBit, name=idname, typeName='Uint'+str(nbBit), emulate=False, commentValue=ele.name+comment) logging.getLogger(LOGGER).debug("name:" + ele.name + ", atom:" + str(ele.atom)) state, data = ele.execute(msgin, idnameshort) if state: return True, data return False, None ii = i for ele in self.leaf: logging.getLogger(LOGGER).debug(str(i) + " id:" + str(id) + " name:" + str(ele.name)) i += 1 if i > id: if self.name: idnameshort = parent + self.name + '/' else: idnameshort = parent idname = idnameshort + ele.name logging.getLogger(LOGGER).debug(str(i) + " id:" + str(id) + " name:" + str(ele.name)) _ = msgin.readSerial(nbBit, name= idname, typeName='Uint'+str(nbBit), emulate=False, commentValue=ele.name) logging.getLogger(LOGGER).debug("name:" + ele.name ) state, data = ele.execute(msgin, name=idname) if state: return True, data return False, None ii = i id = msgin.readSerial(nbBit, name=parent, typeName='Uint'+str(nbBit)) logging.getLogger(LOGGER).debug("OUT : needRead:" + str(msgin.needRead()) + " nbBit:" + str(nbBit) + " id:" + str(id)+ " i:" + str(ii)) logging.getLogger(LOGGER).debug(msgin.showAllData()) return False, None def execute(self, msgin, parent='DatabaseXML/'): ret = False, None if self.atom: logging.getLogger(LOGGER).debug("execute:" + parent) state, data = self.execute_atom(msgin, parent) else: logging.getLogger(LOGGER).debug("execute:" + parent) state, data = self.execute_normal(msgin, parent) return state, data class DecodeDatabase(): def __init__(self): self.databaseXml = None self.databasePlr = None def loadDatabase(self, databaseXml): self.databasePlr = BranchDatabase() self.databasePlr.loadRootXml(databaseXml, 'PLR') self.databasePlr.show() #raise "ok" def execute(self, msgin, world, parent='DatabaseXML/'): logging.getLogger(LOGGER).debug("Start execute") ret = self.databasePlr.execute(msgin, parent) logging.getLogger(LOGGER).debug("End execute") return ret