#!/usr/bin/python3 # # script to start/stop/status/send command/read log for khaganat process # # Copyright (C) 2017 AleaJactaEst # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Manage all process khaganat Launch this prorgam in background and use clientManager to manipulate process Design + Manager + -> ManageCommand [NumCommand] | + -> read_output (thread) + -> ServerHttp -> khaganatHTTPServer | + -> ManageHttpRequest (each request are send on this class) [NumRequestHttp] http(s) command : [Method] /Path {json data} -------------------------- [POST] /SHUTDOWN : Stop all process and stop manager [POST] /STARTALL : Start all process [GET] /STATUSALL : Get status all process [POST] /STOPALL : Stop all process [POST] /START {'name': program} : Start one program [POST] /ACTION {'name': program, 'action' : action} : Send action one program (send to input program) [GET] /STATUS {'name': program} : Get status one program [POST] /STOP {'name': program} : Stop one program [GET] /LOG {'name': program, 'first-line': firstline } : Get log for one program Configuration File : This script need configuration file (see below for model) ------------------------------------------------------------------------------ [manager] # Define port listen (default 8000) port = 8000 # Generate key # openssl req -nodes -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -subj "/C=FR/ST=France/L=Paris/O=khaganat/CN=khaganat.org" # your key keyfile = crt/key.pem # your certificate certfile = crt/cert.pem # certification to check signature ca_cert = /home/gameserver/ca/appli/certs/cachaincert.pem # address listen (default all port) address = # Admin Executor Service [aes] # command to launch the program command = ryzom_admin_service -A/home/gameserver/khanat/server -C/home/gameserver/khanat/server -L/home/gameserver/log/khanat --nobreak --fulladminname=admin_executor_service --shortadminname=AES # Path : where this program is launched path = /home/gameserver/khanat/server/ # size buffer log for each program launched (number line stdout) logsize = 1000 # buffer size (define value bufsize on subprocess.Popen, this buffer is use before read by manager) bufsize = 100 # bms_master : backup_service [bms_master] # command to launch the program command = ryzom_backup_service -A/home/gameserver/khanat/server -C/home/gameserver/khanat/server -L/home/gameserver/khanat/server/log --nobreak --writepid -P49990 # Path : where this program is launched path = /home/gameserver/khanat/server/ # we keep [logsize] last number line stdout logsize = 1000 # buffer size (define value bufsize on subprocess.Popen) bufsize = 100 ------------------------------------------------------------------------------ Example : nohup ./manage.py --log info --filelog /home/gameserver/log/manager.log -c khaganat.cfg 2>/dev/null 1>/dev/null 0 %s" % (command, name, result)) outjson={'state': result} self._set_headers() self.wfile.write(bytes(json.dumps(outjson), "utf-8")) def do_GET(self): # READ """ Manage request READ we can execute LOG, STATUS, LIST & STATUSALL """ logging.debug('get recieved : %s' % self.path) if self.path == '/LOG': self.command_log() elif self.path == '/STATUS': self.send_command("STATUS") elif self.path == '/LIST': self.send_list() elif self.path == '/STATUSALL': self.send_command_all("STATUS") else: self.send_error(400, 'Path unknown') logging.error("Path unknwon '%s'" % self.path) return def do_POST(self): # CREATE """ Manage request POST currently, we execute START, STOP, ACTION, SHUTDOWN, STARTALL & STOPALL """ logging.debug('post recieved : %s' % self.path) if self.path == '/START': self.send_command("START") elif self.path == '/STOP': self.send_command("STOP") elif self.path == '/ACTION': self.send_action() elif self.path == '/SHUTDOWN': self.send_shutdown() elif self.path == '/STARTALL': self.send_command_all("START") elif self.path == '/STOPALL': self.send_command_all("STOP") else: self.send_error(400,'Path unknown') logging.error("Path unknwon '%s'" % self.path) return def do_HEAD(self): """ request HEAD received """ logging.debug('head recieved : %s' % self.path) self.send_error(404,'File Not Found: %s' % self.path) def do_PUT(self): # UPDATE/REPLACE """ request PUT received """ logging.debug('put recieved!') self.send_error(404,'File Not Found: %s' % self.path) def do_PATCH(self): # UPDATE/MODIFY """ request PATCH received """ logging.debug('patch recieved!') self.send_error(404,'File Not Found: %s' % self.path) def do_DELETE(self): # DELETE """ request DELETE received """ logging.debug('delete recieved!') self.send_error(404,'File Not Found: %s' % self.path) class khaganatHTTPServer(http.server.HTTPServer): """ Class khaganatHTTPServer Redefine HTTPServer (adding queue input & queue output, use by ManageHttpRequest) """ def __init__(self, listQueueIn, listQueueOut, listEvent, server_address, RequestHandlerClass, bind_and_activate=True): http.server.HTTPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate) self.listQueueIn = listQueueIn self.listQueueOut = listQueueOut self.listEvent = listEvent class ServerHttp(multiprocessing.Process): """ Initialize server HTTPS * define Dictionnary queueIn & queueOut (with key as section's name in configuration) """ def __init__(self, keyfile, certfile, ca_cert, address = '', port=8000): multiprocessing.Process.__init__(self) self.listQueueIn = {} self.listQueueOut = {} self.listEvent = {} self.port = port self.key_file = keyfile self.cert_file = certfile self.ca_cert = ca_cert self.address = address def run(self): server_address = (self.address, self.port) httpd = khaganatHTTPServer(self.listQueueIn, self.listQueueOut, self.listEvent, server_address, ManageHttpRequest) if self.ca_cert: httpd.socket = ssl.wrap_socket(httpd.socket, keyfile = self.key_file, certfile = self.cert_file, ca_certs = self.ca_cert, cert_reqs = ssl.CERT_REQUIRED, ssl_version=ssl.PROTOCOL_TLSv1_2, server_side = True) else: httpd.socket = ssl.wrap_socket (httpd.socket, keyfile = self.key_file, certfile = self.cert_file, server_side = True) logging.info('https listen') httpd.serve_forever() def append(self, name, queueIn, queueOut, event): self.listQueueIn.setdefault(name, queueIn) self.listQueueOut.setdefault(name, queueOut) self.listEvent.setdefault(name, event) class ManageCommand(): """ Manage Command (only one) * start/stop/status/get log/send an action [stdin] for command (receive order with queueIn) * read output [in other thread] * communicate with ManageHttpRequest (with queueOut) """ def __init__(self, name, command, path, logsize, bufsize, queueIn, queueOut, event): self.process = None self.queueIn = queueIn self.queueOut = queueOut self.name = name self.command = command self.path = path self.log = [] self.poslastlog = 0 self.maxlog = logsize self.event = event self.bufsize = bufsize self.threadRead = None self.running = False self.state = multiprocessing.Queue() self.pipeIn, self.pipeOut = multiprocessing.Pipe() self.eventRunning = threading.Event() def read_output(self): """ Thread to read output (stdout) """ fl = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL) fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) logging.debug("Start reader %s" % self.name) while self.eventRunning.is_set(): code = self.process.poll() if code is not None: logging.error("process %s down" % self.name) self.eventRunning.clear() continue try: line = self.process.stdout.readline() except AttributeError: logging.error("process %s down (not detected)" % self.name) self.eventRunning.clear() continue if not line: time.sleep(1) continue now = time.strftime('%Y/%m/%d %H:%M:%S %Z') logging.debug("line %s " % line) self.poslastlog += 1 while len(self.log) >= self.maxlog: self.log.pop(0) msg = line.decode().strip() self.log.append(now + ' ' + msg) logging.debug("recu: '%s'" %(msg)) logging.debug("End reader: '%s'" % self.name) def handler(self, signum, frame): """ Managed signal (not used) """ if self.process: #logging.debug("Send signal %d to '%s'" %(signum, self.name)) self.process.send_signal(signum) else: logging.error("Impossible to send signal %d to '%s'" %(signum, self.name)) raise IOError("signal received") def start(self): """ Start program """ logging.debug("start %s" % (self.name)) if self.process: logging.debug("%s already exist" % self.name) code = self.process.poll() if code is None: logging.debug("%s already exist" % self.name) return "already-started" else: logging.debug("%s crashed" % self.name) code = self.process.wait() logging.error("%s crashed (return code:%d) - restart program" % (self.name, code)) try: self.process = subprocess.Popen(self.command.split(), cwd = self.path, shell=False, bufsize=self.bufsize, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) except FileNotFoundError as e: logging.error("Impossible to start %s (%s)" % (self.name, e)) return "crashed" self.eventRunning.set() if self.threadRead: self.eventRunning.clear() self.threadRead.join() self.threadRead = None self.running = True self.threadRead = threading.Thread(target=self.read_output) self.threadRead.start() return "started" def status(self): """ Get status of program """ logging.debug("status %s" % (self.name)) if self.process: logging.debug("status %s - check" % (self.name)) code = self.process.poll() if code is None: logging.debug("%s status" % (self.name)) return "started" else: logging.error("%s crashed (return code:%d)" % (self.name, code)) self.process = None return "stopped" else: logging.debug("%s status" % (self.name)) return "stopped" def list_thread(self): """ List number thrad (not used) """ logging.debug('list thread') #main_thread = threading.currentThread() for t in threading.enumerate(): logging.debug('thread %s', t.getName()) logging.debug("id %d" % t.ident) def stop(self): """ Stop program """ logging.debug("stop %s" % (self.name)) if not self.process: return "stopped" else: code = self.process.poll() loop = 10 while (code is None) and (loop > 0): logging.debug("stop process %s" , self.name) self.process.send_signal(15) time.sleep(1) code = self.process.poll() loop -= 1 loop = 10 while (code is None) and (loop > 0): logging.debug("terminate process %s" , self.name) self.process.terminate() time.sleep(1) code = self.process.poll() loop -= 1 loop = 10 while (code is None) and (loop > 0): logging.debug("kill process %s" , self.name) self.process.send_signal(9) time.sleep(1) code = self.process.poll() loop -= 1 code = self.process.wait() self.process = None if self.threadRead: self.eventRunning.clear() self.threadRead.join() self.threadRead = None logging.info("%s stopped (return code:%d)" % (self.name, code)) return "stopped" def getlog(self, firstline): """ Get log """ logging.debug("read log %d " % firstline) outjson = {} pos = self.poslastlog - len(self.log) + 1 firstlinefound = None for line in self.log: if pos >= firstline: outjson.setdefault(pos, line) if not firstlinefound: firstlinefound = pos pos += 1 outjson.setdefault('first-line', firstlinefound) outjson.setdefault('last-line', pos - 1) return json.dumps(outjson) def action(self, action): """ Send action to program (send input to stdin) """ logging.debug("ACTION '%s'" % action) if self.process: code = self.process.poll() if code is None: if action: self.process.stdin.write(bytes(action +'\n', 'UTF-8')) self.process.stdin.flush() return "ok" return "ko" def run(self): """ loop, run child (wait command) """ loop = True while loop: logging.debug('wait %s' % self.name) self.event.wait() logging.debug('received event %s' % self.name) try: msg = self.queueIn.get(timeout = 4) except queue.Empty: self.event.clear() logging.debug("pas de message recu pour %s" % self.name) return logging.debug("command : '%s'" % msg) command = msg.split()[0] if command == "SHUTDOWN": loop = False continue elif command == "START": self.queueOut.put(self.start()) elif command == "STATUS": self.queueOut.put(self.status()) elif command == "STOP": self.queueOut.put(self.stop()) elif command == "ACTION": data = msg.split(maxsplit=1)[1] self.queueOut.put(self.action(data)) elif command == "LOG": try: firstline = int(msg.split(maxsplit=1)[1]) except ValueError: firstline = 0 self.queueOut.put(self.getlog(firstline)) else: self.queueOut.put("error : command unknown") self.event.clear() self.stop() self.event.clear() logging.debug('end') class Manager(): """ Manage all services (read configuration, launch ManageCommand & launch ServerHttp & wait the end) * https service * all child to manage (it start ManageCommand by command define in configuration) """ def __init__(self, filecfg, launch_program): self.threadCommand = [] self.command = [] self.launch_program = launch_program self.param = {} config = configparser.ConfigParser() config.read_file(filecfg) logging.debug("Sections :%s" % config.sections()) for name in config.sections(): if name == 'config': logging.debug("read config '%s'" % name) try: self.port = int(config[name]['port']) except (TypeError, KeyError, ValueError): self.port = 8000 try: self.address = config[name]['address'] except (TypeError, KeyError): self.address = '' try: self.keyfile = config[name]['keyfile'] except (TypeError, KeyError): self.keyfile = 'crt/key.pem' try: self.certfile = config[name]['certfile'] except (TypeError, KeyError): self.certfile = 'crt/cert.pem' try: self.ca_cert = config[name]['ca_cert'] except (TypeError, KeyError): self.ca_cert = 'crt/ca_cert.crt' elif 'command' in config[name]: logging.debug("read command '%s'" % name) if 'path' in config[name]: path = config[name]['path'] else: path = None if 'logsize' in config[name]: try: logsize = int(config[name]['logsize']) except (TypeError, KeyError, ValueError): logsize = 100 logging.warning("Impossible to read param logsize (command:%s)", name) else: logsize = 100 if 'bufsize' in config[name]: try: bufsize = int(config[name]['bufsize']) except (TypeError, KeyError, ValueError): bufsize = 100 logging.warning("Impossible to read param bufsize (command:%s)", name) else: bufsize = 100 self.param.setdefault(name, {'command': config[name]['command'], 'path': path, 'logsize': logsize, 'bufsize': bufsize}) self.serverHttp = ServerHttp(self.keyfile, self.certfile, self.ca_cert, self.address, self.port) if filecfg is None: raise ValueError def runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, event): """ Thread to manage khaganat program """ logging.debug("Initialize '%s'" % name) manageCommand = ManageCommand(name=name, command=command, path=path, logsize=logsize, bufsize=bufsize, queueIn=queueIn, queueOut=queueOut, event=event) manageCommand.run() def launch_server_http(self): """ Launch server https """ self.serverHttp.daemon = True self.serverHttp .start() def launch_command(self): """ Launch child to manage each program """ for name in self.param: logging.debug("Initialize '%s'" % name) queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() event = multiprocessing.Event() self.serverHttp.append(name, queueIn, queueOut, event) threadCommand = multiprocessing.Process(target=self.runCommand, args=(name, self.param[name]['command'], self.param[name]['path'], self.param[name]['logsize'], self.param[name]['bufsize'], queueIn, queueOut, event)) threadCommand.start() if self.launch_program: event.set() queueIn.put("START") try: item = queueOut.get(timeout = 4) except queue.Empty: item = "" logging.debug("pas de message recu pour %s" % name) return logging.info("%s => %s" % (name, item)) self.threadCommand.append(threadCommand) def receive_signal(self, signum, frame): """ Managed signal """ for child in self.threadCommand: child.terminate() if self.serverHttp: self.serverHttp.terminate() def wait_children_commands(self): for child in self.threadCommand: child.join() def wait_child_server_http(self): self.serverHttp.terminate() self.serverHttp.join() def run(self): """ launch all """ self.launch_command() self.launch_server_http() logging.info('started') self.wait_children_commands() logging.info('end') signal.alarm(0) logging.info('wait thread http') time.sleep(1) self.wait_child_server_http() logging.info('end') def main(filecfg, fileLog, logLevel, launch_program, show_log_console): """ Main function """ # Manage log logging.getLogger('logging') numeric_level = getattr(logging, logLevel.upper(), None) if not isinstance(numeric_level, int): raise ValueError('Invalid log level: %s' % logLevel) handlers=[] if show_log_console: handlers.append(logging.StreamHandler()) if fileLog: handlers.append(logging.FileHandler(fileLog.name)) logging.basicConfig(handlers=handlers, level=numeric_level, format='%(asctime)s %(levelname)s [pid:%(process)d] [%(funcName)s:%(lineno)d] %(message)s') if filecfg is None: logging.error("Missing configuration file") raise ValueError manager = Manager(filecfg, launch_program) manager.run() if __name__ == '__main__': parser = argparse.ArgumentParser(description='Manage khaganat process') parser.add_argument('--version', action='version', version='%(prog)s 1.0') parser.add_argument('-c', '--conf', type=argparse.FileType('r'), default='khaganat.cfg', help='configuration file') parser.add_argument( '--show-log-console', action='store_true', help='show message in console', default=False) parser.add_argument('--filelog', type=argparse.FileType('wt'), default=None, help='log file') parser.add_argument('--log', default='INFO', help='log level [DEBUG, INFO, WARNING, ERROR') parser.add_argument( '--launch-program', action='store_true', help='launch program when start manager', default=False) args = parser.parse_args() main(filecfg=args.conf, fileLog=args.filelog, logLevel=args.log, launch_program=args.launch_program, show_log_console=args.show_log_console)