diff --git a/code/khaganat/tools/manage.py b/code/khaganat/tools/manage.py index a49b93f26..70913df3f 100755 --- a/code/khaganat/tools/manage.py +++ b/code/khaganat/tools/manage.py @@ -21,16 +21,25 @@ Manage all process khaganat Launch this prorgam in background and use clientManager to manipulate process - you can launch command : - [POST] SHUTDOWN : Stop all process and stop manager - [POST] STARTALL : Start all process - [GET] STATUSALL : Get status all process - [POST] STOPALL : Stop all process - [POST] START {'name': program} : Start one program - [POST] ACTION {'name': program, 'action' : action} : Send action one program (send to input program) - [GET] STATUS {'name': program} : Get status one program - [POST] STOP {'name': program} : Stop one program - [GET] LOG {'name': program, 'first-line': firstline } : Get log for one program +Design + + Manager + + -> ManageCommand [NumCommand] + | + -> read_output (thread) + + -> ServerHttp -> khaganatHTTPServer + | + -> ManageHttpRequest (each request are send on this class) [NumRequestHttp] + + http(s) command : + [Method] /Path {json data} + -------------------------- + [POST] /SHUTDOWN : Stop all process and stop manager + [POST] /STARTALL : Start all process + [GET] /STATUSALL : Get status all process + [POST] /STOPALL : Stop all process + [POST] /START {'name': program} : Start one program + [POST] /ACTION {'name': program, 'action' : action} : Send action one program (send to input program) + [GET] /STATUS {'name': program} : Get status one program + [POST] /STOP {'name': program} : Stop one program + [GET] /LOG {'name': program, 'first-line': firstline } : Get log for one program Configuration File : This script need configuration file (see below for model) ------------------------------------------------------------------------------ @@ -95,7 +104,8 @@ import os class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): """ - Class received all request and send to manager process + Class ManageHttpRequest receive all request https + * analyze and send to ManageCommand (with queueIn & queueOut) """ def __init__(self, request, client_address, server): """ Initialize object """ @@ -121,18 +131,15 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): else: ctype = 'text' if ctype != 'application/json': - self.send_response(400) + logging.error("Received request with bad content-type") + self.send_response(400, "bad content-type") self.end_headers() return - if 'content-length' in self.headers: - try: - sizemsg = int(self.headers['content-length']) - except: - self.send_response(400) - self.end_headers() - return - else: - self.send_response(400) + try: + sizemsg = int(self.headers['content-length']) + except (TypeError, KeyError, ValueError): + logging.error("Received request with bad content-length") + self.send_response(400, "bad content-length") self.end_headers() return @@ -141,25 +148,25 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): logging.debug(msgjson) if 'name' not in msgjson: - self.send_error(400,'Missing param name') + self.send_error(400, 'Missing param name') logging.error("Missing param name") return name = msgjson['name'] if name not in self.server.listQueueIn: - self.send_error(400,'Name unknown') + self.send_error(400, 'Name unknown') logging.error("Name unknwon '%s'" % name) return if 'first-line' not in msgjson: - self.send_error(400,'Missing param first-line') + self.send_error(400, 'Missing param first-line') logging.error("Missing param first-line '%s'" % name) return firstLine = 0 try: firstLine = int(msgjson['first-line']) - except: - self.send_error(400,'Impossible to read first-line') + except ValueError: + self.send_error(400, 'Impossible to read first-line') logging.error("Impossible to read first-line '%s'" % msgjson['first-line']) return logging.debug("%s:%s" % (name, firstLine)) @@ -190,7 +197,6 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): for name in self.server.listQueueIn: self.server.listEvent[name].set() self.server.listQueueIn[name].put("SHUTDOWN") - self._set_headers() outjson = {'shutdown':'ok'} self.wfile.write(bytes(json.dumps(outjson), "utf-8")) @@ -201,14 +207,12 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): for name in self.server.listQueueIn: self.server.listEvent[name].set() self.server.listQueueIn[name].put(action) - try: item = self.server.listQueueOut[name].get(timeout = 4) except queue.Empty: logging.debug("pas de message recu pour %s" % name) return outjson.setdefault(name, item) - self._set_headers() self.wfile.write(bytes(json.dumps(outjson), "utf-8")) @@ -219,18 +223,15 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): else: ctype = 'text' if ctype != 'application/json': - self.send_response(400) + logging.error("Bad content-type") + self.send_response(400, "bad content-type") self.end_headers() return - if 'content-length' in self.headers: - try: - sizemsg = int(self.headers['content-length']) - except: - self.send_response(400) - self.end_headers() - return - else: - self.send_response(400) + try: + sizemsg = int(self.headers['content-length']) + except (TypeError, KeyError, ValueError): + logging.error("Bad content-length") + self.send_response(400, "bad content-length") self.end_headers() return @@ -239,26 +240,26 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): logging.debug(msgjson) if 'name' not in msgjson: - self.send_error(400,'Missing param name') + self.send_error(400, 'Missing param name') logging.error("Missing param name") return name = msgjson['name'] if name not in self.server.listQueueIn: - self.send_error(400,'Name unknown') + self.send_error(400, 'Name unknown') logging.error("Name unknwon '%s'" % name) return if 'action' not in msgjson: - self.send_error(400,'Missing param action') + self.send_error(400, 'Missing param action') logging.error("Missing param action '%s'" % name) return action = '' try: action = msgjson['action'] - except: - self.send_error(400,'Impossible to read action') + except KeyError: logging.error("Impossible to read first-line '%s'" % msgjson['action']) + self.send_error(400, 'Impossible to read action') return logging.debug("%s:%s" % (name, action)) self.server.listEvent[name].set() @@ -281,39 +282,35 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): else: ctype = 'text' if ctype != 'application/json': - self.send_response(400) + logging.error("Bad content-type") + self.send_response(400, "Bad content-type") self.end_headers() return - if 'content-length' in self.headers: - try: - sizemsg = int(self.headers['content-length']) - except: - self.send_response(400) - self.end_headers() - return - else: - self.send_response(400) + try: + sizemsg = int(self.headers['content-length']) + except (TypeError, KeyError, ValueError): + logging.error("Bad content-length") + self.send_response(400, "Bad content-length") self.end_headers() return msg = self.rfile.read(sizemsg) msgjson = json.loads(msg.decode()) if 'name' not in msgjson: - self.send_error(400,'Missing param name') + self.send_error(400, 'Missing param name') logging.error("Missing param name") return name = msgjson['name'] if name not in self.server.listQueueIn: - self.send_error(400,'Name unknown') + self.send_error(400, 'Name unknown') logging.error("Name unknwon '%s'" % name) return logging.debug("[%s %s] Send command" % (command, name)) self.server.listEvent[name].set() - logging.debug("[%s %s] Sent command" % (command, name)) self.server.listQueueIn[name].put(command) try: result = self.server.listQueueOut[name].get(timeout = 4) except queue.Empty: - self.send_error(500,'Missing return') + self.send_error(500, 'Missing return') logging.debug("[%s %s] Missing return" % (command, name)) return logging.debug("[%s %s] => %s" % (command, name, result)) @@ -323,8 +320,9 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): self.wfile.write(bytes(json.dumps(outjson), "utf-8")) def do_GET(self): # READ - """ Manage request READ - currently, we execute LOG, STATUS & LIST + """ + Manage request READ + we can execute LOG, STATUS, LIST & STATUSALL """ logging.debug('get recieved : %s' % self.path) if self.path == '/LOG': @@ -336,13 +334,13 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): elif self.path == '/STATUSALL': self.send_command_all("STATUS") else: - self.send_error(400,'Path unknown') + self.send_error(400, 'Path unknown') logging.error("Path unknwon '%s'" % self.path) return def do_POST(self): # CREATE """ Manage request POST - currently, we execute START, STOP, ACTION & SHUTDOWN + currently, we execute START, STOP, ACTION, SHUTDOWN, STARTALL & STOPALL """ logging.debug('post recieved : %s' % self.path) if self.path == '/START': @@ -384,6 +382,7 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): class khaganatHTTPServer(http.server.HTTPServer): """ Class khaganatHTTPServer + Redefine HTTPServer (adding queue input & queue output, use by ManageHttpRequest) """ def __init__(self, listQueueIn, @@ -398,7 +397,10 @@ class khaganatHTTPServer(http.server.HTTPServer): self.listEvent = listEvent class ServerHttp(multiprocessing.Process): - """ Initialize server HTTPS """ + """ + Initialize server HTTPS + * define Dictionnary queueIn & queueOut (with key as section's name in configuration) + """ def __init__(self, keyfile, certfile, address = '', port=8000): multiprocessing.Process.__init__(self) self.listQueueIn = {} @@ -431,7 +433,10 @@ class ServerHttp(multiprocessing.Process): class ManageCommand(): """ - Thread manage all program + Manage Command (only one) + * start/stop/status/get log/send an action [stdin] for command (receive order with queueIn) + * read output [in other thread] + * communicate with ManageHttpRequest (with queueOut) """ def __init__(self, name, command, path, logsize, bufsize, queueIn, queueOut, event): self.process = None @@ -457,22 +462,18 @@ class ManageCommand(): fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) logging.debug("Start reader %s " % self.name) while self.eventRunning.is_set(): - #logging.debug("Start reader %s " % self.name) - try: - line = self.process.stdout.readline() - if not line: - time.sleep(1) - continue - now = time.strftime('%Y/%m/%d %H:%M:%S %Z') - logging.debug("line %s " % line) - self.poslastlog += 1 - while len(self.log) >= self.maxlog: - self.log.pop(0) - msg = line.decode().strip() - self.log.append(now + ' ' + msg) - logging.debug("recu: '%s'" %(msg)) - except: + line = self.process.stdout.readline() + if not line: + time.sleep(1) continue + now = time.strftime('%Y/%m/%d %H:%M:%S %Z') + logging.debug("line %s " % line) + self.poslastlog += 1 + while len(self.log) >= self.maxlog: + self.log.pop(0) + msg = line.decode().strip() + self.log.append(now + ' ' + msg) + logging.debug("recu: '%s'" %(msg)) logging.debug("End reader: '%s'" % self.name) def handler(self, signum, frame): @@ -643,7 +644,7 @@ class ManageCommand(): elif command == "LOG": try: firstline = int(msg.split(maxsplit=1)[1]) - except: + except ValueError: firstline = 0 self.queueOut.put(self.getlog(firstline)) else: @@ -655,9 +656,12 @@ class ManageCommand(): class Manager(): - """ Manage all services + """ + Manage all services + (read configuration, launch ManageCommand & launch ServerHttp & wait the end) * https service - * all child to manage each program + * all child to manage (it start ManageCommand by command define in configuration) + """ def __init__(self, filecfg, launch_program): self.threadCommand = [] @@ -673,19 +677,19 @@ class Manager(): logging.debug("read config '%s'" % name) try: port = int(config[name]['port']) - except: + except (TypeError, KeyError, ValueError): port = 8000 try: address = config[name]['address'] - except: + except (TypeError, KeyError): address = '' try: keyfile = config[name]['keyfile'] - except: + except (TypeError, KeyError): keyfile = 'crt/key.pem' try: certfile = config[name]['certfile'] - except: + except (TypeError, KeyError): certfile = 'crt/cert.pem' elif 'command' in config[name]: logging.debug("read command '%s'" % name) @@ -696,7 +700,7 @@ class Manager(): if 'logsize' in config[name]: try: logsize = int(config[name]['logsize']) - except: + except (TypeError, KeyError, ValueError): logsize = 100 logging.warning("Impossible to read param logsize (command:%s)", name) else: @@ -704,7 +708,7 @@ class Manager(): if 'bufsize' in config[name]: try: bufsize = int(config[name]['bufsize']) - except: + except (TypeError, KeyError, ValueError): bufsize = 100 logging.warning("Impossible to read param bufsize (command:%s)", name) else: @@ -744,14 +748,14 @@ class Manager(): event = multiprocessing.Event() self.serverHttp.append(name, queueIn, queueOut, event) threadCommand = multiprocessing.Process(target=self.runCommand, - args=(name, - self.param[name]['command'], - self.param[name]['path'], - self.param[name]['logsize'], - self.param[name]['bufsize'], - queueIn, - queueOut, - event)) + args=(name, + self.param[name]['command'], + self.param[name]['path'], + self.param[name]['logsize'], + self.param[name]['bufsize'], + queueIn, + queueOut, + event)) threadCommand.start() if self.launch_program: event.set() @@ -773,19 +777,25 @@ class Manager(): if self.serverHttp: self.serverHttp.terminate() + def wait_children_commands(self): + for child in self.threadCommand: + child.join() + + def wait_child_server_http(self): + self.serverHttp.terminate() + self.serverHttp.join() + def run(self): """ launch all """ self.launch_command() self.launch_server_http() logging.info('started') - for child in self.threadCommand: - child.join() + self.wait_children_commands() logging.info('end') signal.alarm(0) logging.info('wait thread http') time.sleep(1) - self.serverHttp.terminate() - self.serverHttp.join() + self.wait_child_server_http() logging.info('end')