adding multiprocess check

This commit is contained in:
AleaJactaEst 2018-02-09 23:29:36 +01:00
parent 4a5bd254e7
commit e5fe6e5388

View file

@ -25,6 +25,7 @@ import multiprocessing
import time
import re
import queue
import signal
from unittest.mock import patch
try:
@ -41,6 +42,12 @@ except ImportError:
# sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# import pymanager.certificate as cert
def handler(signum, frame):
print("TimeOut !")
raise Exception("end of time")
class TestManager(unittest.TestCase):
def setUp(self):
self.openssl = '/usr/bin/openssl'
@ -57,6 +64,7 @@ class TestManager(unittest.TestCase):
self.path = os.path.dirname(os.path.abspath(__file__))
self.program = os.path.join(self.path, 'simulate_program.py')
self.badprogram = os.path.join(self.path, 'test.cfg')
signal.signal(signal.SIGALRM, handler)
def test_load_config(self):
config = configparser.ConfigParser()
@ -407,77 +415,133 @@ class TestManager(unittest.TestCase):
config.flush()
Manager.main(['--conf=' + config.name])
# def test_run_manager_command(self):
# # Doesn't work (we need enable --concurrency=multiprocessing on coverage command but we need coverage 4.0)
# logsize = 10
# bufsize = 10
# queueIn = multiprocessing.Queue()
# queueOut = multiprocessing.Queue()
# event = multiprocessing.Event()
# threadCommand = multiprocessing.Process(target=self._runCommand,
# args=('test_run_manager_command',
# self.program,
# self.path,
# logsize,
# bufsize,
# queueIn,
# queueOut,
# event))
# threadCommand.start()
# event.set()
# queueIn.put("START")
# item = queueOut.get(timeout=4)
# self.assertEqual(item, "started", 'Error impossible to start program')
# time.sleep(1)
# event.set()
# queueIn.put("STATUS")
# item = queueOut.get(timeout=4)
# self.assertEqual(item, "started", 'Error impossible to start program')
# time.sleep(1)
# print("-" * 80, "shutdown" )
# event.set()
# queueIn.put("SHUTDOWN")
# with self.assertRaises(queue.Empty):
# item = queueOut.get(timeout=4)
# print("-" * 80, "wait thread" )
def test_run_manager_command(self):
# Enable timeout
signal.alarm(10)
class MockServerHttp:
def append(self, name, queueIn, queueOut, event):
pass
def terminate(self):
pass
def join(self):
pass
config = configparser.ConfigParser()
config.add_section('config:server')
config.add_section('command:test')
config.set('command:test', 'command', self.program)
manage = Manager.Manager(False)
manage.serverHttp = MockServerHttp()
manage._load_config(config)
manage.launch_command()
queueIn = manage.threadCommand[0]._args[5]
queueOut = manage.threadCommand[0]._args[6]
event = manage.threadCommand[0]._args[7]
queueIn.put("START")
event.set()
# Enable timeout
signal.alarm(10)
item = queueOut.get(timeout=4)
self.assertEqual(item, "started", 'Error impossible to start program')
signal.alarm(0)
time.sleep(1)
signal.alarm(10)
event.set()
queueIn.put("STATUS")
item = queueOut.get(timeout=4)
self.assertEqual(item, "started", 'Error impossible to read status')
time.sleep(1)
event.set()
queueIn.put("STDIN arg")
item = queueOut.get(timeout=4)
self.assertEqual(item, "ok", 'Error when send STDIN')
signal.alarm(0)
time.sleep(1)
signal.alarm(10)
event.set()
queueIn.put("STDOUT 4")
item = queueOut.get(timeout=4)
signal.alarm(0)
self.assertRegex(item,
'^[{](.*)("first-line": 4)(.*)[}]$',
'Error when read STDOUT (Missing first-line)')
self.assertRegex(item,
'^[{](.*)("last-line": 4)(.*)[}]$',
'Error when read STDOUT (Missing last-line)')
self.assertRegex(item,
'^[{](.*)(4 arg")(.*)[}]$',
'Error when read STDOUT (bad record)')
time.sleep(1)
signal.alarm(10)
event.set()
queueIn.put("BADCOMMAND")
item = queueOut.get(timeout=4)
self.assertEqual(item, "error : command unknown", 'Error impossible to read status')
signal.alarm(0)
time.sleep(1)
signal.alarm(10)
event.set()
queueIn.put("STOP")
item = queueOut.get(timeout=4)
self.assertEqual(item, "stopped", 'Error impossible to read status')
signal.alarm(0)
time.sleep(1)
signal.alarm(10)
event.set()
queueIn.put("SHUTDOWN")
with self.assertRaises(queue.Empty):
item = queueOut.get(timeout=4)
#threadCommand.join()
# self.assertTrue(True)
#
# def test_run_manager_command_2(self):
# manage = Manager.Manager(True)
# logsize = 10
# bufsize = 10
# queueIn = multiprocessing.Queue()
# queueOut = multiprocessing.Queue()
# event = multiprocessing.Event()
# manage.runCommand('test_run_manager_command',
# self.program,
# self.path,
# logsize,
# bufsize,
# queueIn,
# queueOut,
# event)
# event.set()
# queueIn.put("START")
# item = queueOut.get(timeout=4)
# self.assertEqual(item, "started", 'Error impossible to start program')
# time.sleep(1)
# event.set()
# queueIn.put("STATUS")
# item = queueOut.get(timeout=4)
# self.assertEqual(item, "started", 'Error impossible to start program')
# time.sleep(1)
# print("-" * 80, "shutdown" )
# event.set()
# queueIn.put("SHUTDOWN")
# with self.assertRaises(queue.Empty):
# item = queueOut.get(timeout=4)
# print("-" * 80, "wait thread" )
# #threadCommand.join()
# manage.receive_signal(15, 1)
# manage.wait_children_commands()
# self.assertTrue(True)
manage.receive_signal(15, 1)
manage.wait_children_commands()
#Disable timeout
signal.alarm(0)
self.assertTrue(True)
def test_run_manager_command_autostart(self):
# Enable timeout
signal.alarm(10)
class MockServerHttp:
def append(self, name, queueIn, queueOut, event):
pass
def terminate(self):
pass
def join(self):
pass
config = configparser.ConfigParser()
config.add_section('config:server')
config.add_section('command:test')
config.set('command:test', 'command', self.program)
manage = Manager.Manager(True)
manage.serverHttp = MockServerHttp()
manage._load_config(config)
manage.launch_command()
queueIn = manage.threadCommand[0]._args[5]
queueOut = manage.threadCommand[0]._args[6]
event = manage.threadCommand[0]._args[7]
signal.alarm(10)
event.set()
queueIn.put("STATUS")
item = queueOut.get(timeout=4)
self.assertEqual(item, "started", 'Error impossible to read status')
time.sleep(1)
signal.alarm(10)
event.set()
queueIn.put("SHUTDOWN")
with self.assertRaises(queue.Empty):
item = queueOut.get(timeout=4)
#threadCommand.join()
manage.receive_signal(15, 1)
manage.wait_children_commands()
#Disable timeout
signal.alarm(0)
self.assertTrue(True)
if __name__ == '__main__':
unittest.main()