remove/adding some comment, and correct function to wait all thread

This commit is contained in:
Jerome Sagnole 2017-11-02 22:54:58 +01:00
parent bef18d2b52
commit 74f8cbab91
2 changed files with 72 additions and 55 deletions

View file

@ -17,19 +17,37 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# ./client.py --server='172.17.0.2' """
Manipulate manager khaganat
# ./client.py --log="debug" --show-log-console --server='172.17.0.2' --program="aes" --command="START" We can end some command to manager.
# ./client.py --log="debug" --show-log-console --server='172.17.0.2' --program="aes" --command="STATUS" Global:
# ./client.py --log="debug" --show-log-console --server='172.17.0.2' --program="aes" --command="ACTION" --action="coucou" SHUTDOWN : Stop manager & Stop all programs
# ./client.py --log="debug" --show-log-console --server='172.17.0.2' --program="aes" --command="LOG" --firstline=0 STARTALL : Start all programs
# ./client.py --log="debug" --show-log-console --server='172.17.0.2' --program="aes" --command="STOP" STATUSALL : Get status of all programs
# ./client.py --log="debug" --show-log-console --server='172.17.0.2' --command="LIST" STOPALL : Stop all programs
# ./client.py --log="debug" --show-log-console --server='172.17.0.2' --command="SHUTDOWN" LIST : List all programs available
# ./client.py --log="debug" --show-log-console --server='172.17.0.2' --command="STARTALL" For one program :
# ./client.py --log="debug" --show-log-console --server='172.17.0.2' --command="STATUSALL" START : Start program
# ./client.py --key="/home/gameserver/khanat/key.pem" --cert="/home/gameserver/khanat/cert.pem" --log="debug" --show-log-console --command="STATUSALL" STOP : Stop program
STATUS : Get status
LOG : Get log
firstline : option to define first line we need send
ACTION : Send action (command) in stdin
action : option to define which action you need send to stdin
Example :
./client.py --log="debug" --show-log-console --server='172.17.0.2' --program="aes" --command="START"
./client.py --log="debug" --show-log-console --server='172.17.0.2' --program="aes" --command="STATUS"
./client.py --log="debug" --show-log-console --server='172.17.0.2' --program="aes" --command="ACTION" --action="coucou"
./client.py --log="debug" --show-log-console --server='172.17.0.2' --program="aes" --command="LOG" --firstline=0
./client.py --log="debug" --show-log-console --server='172.17.0.2' --program="aes" --command="STOP"
./client.py --log="debug" --show-log-console --server='172.17.0.2' --command="LIST"
./client.py --log="debug" --show-log-console --server='172.17.0.2' --command="SHUTDOWN"
./client.py --log="debug" --show-log-console --server='172.17.0.2' --command="STARTALL"
./client.py --log="debug" --show-log-console --server='172.17.0.2' --command="STATUSALL"
./client.py --key="/home/gameserver/khanat/key.pem" --cert="/home/gameserver/khanat/cert.pem" --log="debug" --show-log-console --command="STATUSALL"
"""
import argparse import argparse
import logging import logging
@ -37,16 +55,8 @@ import logging.config
import http.client import http.client
import json import json
#ip='localhost'
def send_command(command='GET', path='/', host='localhost', port=8000):
conn = http.client.HTTPSConnection(host=host, port=port, key_file='crt/key.pem', cert_file='crt/cert.pem' )
conn.putrequest(command, path)
conn.endheaders()
response = conn.getresponse()
print(response.read())
def cmp_to_key(): def cmp_to_key():
'Convert a cmp= function into a key= function' 'compare key (check if int or other)'
class K(object): class K(object):
def __init__(self, obj, *args): def __init__(self, obj, *args):
self.obj = obj self.obj = obj
@ -84,6 +94,7 @@ def cmp_to_key():
def send_json(jsonin={}, command='GET', path='/', host='localhost', port=8000, raw_data=False, remove_color=False, def send_json(jsonin={}, command='GET', path='/', host='localhost', port=8000, raw_data=False, remove_color=False,
key_file=None, cert_file=None): key_file=None, cert_file=None):
"send command with https & json format"
conn = http.client.HTTPSConnection(host=host, port=port, key_file=key_file, cert_file=cert_file ) conn = http.client.HTTPSConnection(host=host, port=port, key_file=key_file, cert_file=cert_file )
conn.putrequest(command, path) conn.putrequest(command, path)
out=json.dumps(jsonin) out=json.dumps(jsonin)
@ -129,8 +140,7 @@ def main(server, command, program, action, firstline, fileLog, logLevel, show_lo
handlers.append(logging.FileHandler(fileLog.name)) handlers.append(logging.FileHandler(fileLog.name))
logging.basicConfig(handlers=handlers, level=numeric_level, logging.basicConfig(handlers=handlers, level=numeric_level,
format='%(asctime)s %(levelname)s [pid:%(process)d] [%(funcName)s:%(lineno)d] %(message)s') format='%(asctime)s %(levelname)s [pid:%(process)d] [%(funcName)s:%(lineno)d] %(message)s')
#client(server, command, data) # Send command
#send_json({'name': 'aes', 'first-line': 0}, 'GET', '/LOG', server, port)
if command == 'START' or command == 'STOP': if command == 'START' or command == 'STOP':
send_json({'name': program}, 'POST', "/" + command, server, port) send_json({'name': program}, 'POST', "/" + command, server, port)
elif command == 'STATUS': elif command == 'STATUS':

View file

@ -77,14 +77,6 @@ Example :
""" """
# docker run -it -v $PWD:/opt/jsa servercontainer_khanat_debian_jessie_x86_64 /bin/bash
# ./manage.py --log debug --show-log-console -c test.cfg
# https://pymotw.com/2/multiprocessing/communication.html
# https://eli.thegreenplace.net/2012/01/24/distributed-computing-in-python-with-multiprocessing/
# https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
import subprocess import subprocess
import queue import queue
import threading import threading
@ -460,7 +452,7 @@ class ManageCommand():
self.eventRunning = threading.Event() self.eventRunning = threading.Event()
def read_output(self): def read_output(self):
""" Thread to read output (stdout) """
fl = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL) fl = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
logging.debug("Start reader %s " % self.name) logging.debug("Start reader %s " % self.name)
@ -484,6 +476,7 @@ class ManageCommand():
logging.debug("End reader: '%s'" % self.name) logging.debug("End reader: '%s'" % self.name)
def handler(self, signum, frame): def handler(self, signum, frame):
""" Managed signal (not used) """
if self.process: if self.process:
#logging.debug("Send signal %d to '%s'" %(signum, self.name)) #logging.debug("Send signal %d to '%s'" %(signum, self.name))
self.process.send_signal(signum) self.process.send_signal(signum)
@ -492,6 +485,7 @@ class ManageCommand():
raise IOError("signal received") raise IOError("signal received")
def start(self): def start(self):
""" Start program """
logging.debug("start %s" % (self.name)) logging.debug("start %s" % (self.name))
if self.process: if self.process:
logging.debug("%s already exist" % self.name) logging.debug("%s already exist" % self.name)
@ -526,6 +520,7 @@ class ManageCommand():
return "started" return "started"
def status(self): def status(self):
""" Get status of program """
logging.debug("status %s" % (self.name)) logging.debug("status %s" % (self.name))
if self.process: if self.process:
logging.debug("status %s - check" % (self.name)) logging.debug("status %s - check" % (self.name))
@ -542,6 +537,7 @@ class ManageCommand():
return "stopped" return "stopped"
def list_thread(self): def list_thread(self):
""" List number thrad (not used) """
logging.debug('list thread') logging.debug('list thread')
#main_thread = threading.currentThread() #main_thread = threading.currentThread()
for t in threading.enumerate(): for t in threading.enumerate():
@ -550,6 +546,7 @@ class ManageCommand():
def stop(self): def stop(self):
""" Stop program """
logging.debug("stop %s" % (self.name)) logging.debug("stop %s" % (self.name))
if not self.process: if not self.process:
return "stopped" return "stopped"
@ -589,6 +586,7 @@ class ManageCommand():
return "stopped" return "stopped"
def getlog(self, firstline): def getlog(self, firstline):
""" Get log """
logging.debug("read log %d " % firstline) logging.debug("read log %d " % firstline)
outjson = {} outjson = {}
pos = self.poslastlog - len(self.log) + 1 pos = self.poslastlog - len(self.log) + 1
@ -604,6 +602,7 @@ class ManageCommand():
return json.dumps(outjson) return json.dumps(outjson)
def action(self, action): def action(self, action):
""" Send action to program (send input to stdin) """
logging.debug("ACTION '%s'" % action) logging.debug("ACTION '%s'" % action)
if self.process: if self.process:
code = self.process.poll() code = self.process.poll()
@ -615,6 +614,7 @@ class ManageCommand():
return "ko" return "ko"
def run(self): def run(self):
""" loop, run child (wait command) """
loop = True loop = True
while loop: while loop:
logging.debug('wait %s' % self.name) logging.debug('wait %s' % self.name)
@ -654,26 +654,13 @@ class ManageCommand():
logging.debug('end') logging.debug('end')
def runCommand(name, command, path, logsize, bufsize, queueIn, queueOut, event):
"""
Launch Manager
(thread to manage khaganat program)
"""
logging.debug("Initialize '%s'" % name)
manageCommand = ManageCommand(name=name,
command=command,
path=path,
logsize=logsize,
bufsize=bufsize,
queueIn=queueIn,
queueOut=queueOut,
event=event)
manageCommand.run()
class Manager(): class Manager():
""" Manage all services
* https service
* all child to manage each program
"""
def __init__(self, filecfg, launch_program): def __init__(self, filecfg, launch_program):
self.threadCommand = None self.threadCommand = []
self.command = [] self.command = []
self.launch_program = launch_program self.launch_program = launch_program
self.param = {} self.param = {}
@ -728,18 +715,35 @@ class Manager():
if filecfg is None: if filecfg is None:
raise ValueError raise ValueError
def runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, event):
"""
Thread to manage khaganat program
"""
logging.debug("Initialize '%s'" % name)
manageCommand = ManageCommand(name=name,
command=command,
path=path,
logsize=logsize,
bufsize=bufsize,
queueIn=queueIn,
queueOut=queueOut,
event=event)
manageCommand.run()
def launch_server_http(self): def launch_server_http(self):
""" Launch server https """
self.serverHttp.daemon = True self.serverHttp.daemon = True
self.serverHttp .start() self.serverHttp .start()
def launch_command(self): def launch_command(self):
""" Launch child to manage each program """
for name in self.param: for name in self.param:
logging.debug("Initialize '%s'" % name) logging.debug("Initialize '%s'" % name)
queueIn = multiprocessing.Queue() queueIn = multiprocessing.Queue()
queueOut = multiprocessing.Queue() queueOut = multiprocessing.Queue()
event = multiprocessing.Event() event = multiprocessing.Event()
self.serverHttp.append(name, queueIn, queueOut, event) self.serverHttp.append(name, queueIn, queueOut, event)
self.threadCommand = multiprocessing.Process(target=runCommand, threadCommand = multiprocessing.Process(target=self.runCommand,
args=(name, args=(name,
self.param[name]['command'], self.param[name]['command'],
self.param[name]['path'], self.param[name]['path'],
@ -748,7 +752,7 @@ class Manager():
queueIn, queueIn,
queueOut, queueOut,
event)) event))
self.threadCommand.start() threadCommand.start()
if self.launch_program: if self.launch_program:
event.set() event.set()
queueIn.put("START") queueIn.put("START")
@ -759,20 +763,23 @@ class Manager():
logging.debug("pas de message recu pour %s" % name) logging.debug("pas de message recu pour %s" % name)
return return
logging.info("%s => %s" % (name, item)) logging.info("%s => %s" % (name, item))
self.threadCommand.append(threadCommand)
def receive_signal(self, signum, frame): def receive_signal(self, signum, frame):
if self.threadCommand: """ Managed signal """
print(dir(self.threadCommand)) for child in self.threadCommand:
self.threadCommand.terminate() child.terminate()
if self.serverHttp: if self.serverHttp:
self.serverHttp.terminate() self.serverHttp.terminate()
def run(self): def run(self):
""" launch all """
self.launch_command() self.launch_command()
self.launch_server_http() self.launch_server_http()
logging.info('started') logging.info('started')
self.threadCommand.join() for child in self.threadCommand:
child.join()
logging.info('end') logging.info('end')
signal.alarm(0) signal.alarm(0)
logging.info('wait thread http') logging.info('wait thread http')