diff --git a/warp/hook.py b/warp/hook.py index be2dcbd..645f325 100644 --- a/warp/hook.py +++ b/warp/hook.py @@ -1,9 +1,11 @@ import sys + import io import argparse -from threading import Thread, Event +from threading import Thread, Event, Lock from multiprocessing import Queue +import queue from os import fdopen, path from . import argparser_wrapper @@ -41,19 +43,39 @@ class FlaskThread(Thread): def run(self): views.app.run(port=self.port, threaded=True, host=self.host) -class Output(): - def __init__(self, queue, restart): - self._queue = queue - self._restart = restart +class OutputThread(Thread): + def __init__(self, inqueue, restart): + super().__init__() + self.queue = inqueue + self.restart = restart self.cache = [] + self.clients = [] + self.sem = Lock() + self.stopped = False + + def stop(self): + self.stopped = True + + def run(self): + while not self.restart.is_set() or self.stopped: + item = self.queue.get() + self.cache.append(item) + self.sem.acquire() + for i, (outqueue, active) in enumerate(self.clients): + try: + outqueue.put(item, True, 1) + except queue.Full: + self.clients[i] = self.clients[i][0], False + self.sem.release() + + self.clients = [client for client in self.clients if client[1] == True] + + def add_client(self): + print("new connection. Current connections: %s" % len(self.clients)) + new_queue = queue.Queue(10) + self.clients.append((new_queue, True)) + return new_queue - def get_output(self): - def gen_output(): - while not self._restart.is_set(): - msg_type, line = self._queue.get() - self.cache.append((msg_type, line)) - yield (msg_type, line) - return gen_output() def start_module(name, is_module): views.app.restart.clear() @@ -63,7 +85,8 @@ def start_module(name, is_module): views.app.queue = Queue() ioout = QueuedOut("out", views.app.queue) ioerr = QueuedOut("err", views.app.queue) - views.app.output = Output(views.app.queue, views.app.restart) + views.app.output = OutputThread(views.app.queue, views.app.restart) + #Output(views.app.queue, views.app.restart) views.app.actionQueue = Queue() # This holds only one Argparser Object views.app.namespaceQueue = Queue() # This hold only one Namespace Object @@ -79,11 +102,13 @@ def start_module(name, is_module): is_module = is_module ) views.app.module_process.start() + views.app.output.start() views.app.mutex_groups, views.app.actions, name, views.app.desc = views.app.actionQueue.get() if name: views.app.name = name views.app.module_process.join() + views.app.output.stop() ioerr.write("Process stopped ({})\n".format(views.app.module_process.exitcode)) views.app.restart.wait() diff --git a/warp/static/css/app.css b/warp/static/css/app.css index 33a6da5..098ecf5 100644 --- a/warp/static/css/app.css +++ b/warp/static/css/app.css @@ -1,12 +1,27 @@ +.page { + height: 100vh; +} + +#main-content { + height: 100%; + display: block; +} + #output { - position: relative; + flex:1; background-color: #000000; color: #FFFFFF; font-weight: bold; font-family: "Lucida Console", Monaco, monospace; white-space:pre; - overflow-y: auto; - height: 100vh; + overflow-y: scroll; + height: 100%; +} + +@media (min-width:40em) { + #main-content { + display: flex; + } } ul { @@ -25,6 +40,7 @@ li.subparser { #arguments { padding: 0px; + overflow: scroll; } .tabs-panel { diff --git a/warp/static/js/app.js b/warp/static/js/app.js index 4258f69..d729e6b 100644 --- a/warp/static/js/app.js +++ b/warp/static/js/app.js @@ -228,6 +228,8 @@ window.onload = function() { oboe('/output.json').node('output.*', function(e){ if(e.type === "err") { printErr(e.line); + } else if (e.type === "sig") { + signal(e.line) } else { printOut(e.line); } @@ -245,52 +247,78 @@ function printErr(data){ $("#output").scrollTop($("#output")[0].scrollHeight); } +function signal(type) { + console.log(type); + if(type === "pause") { + pauseCallback(); + } else if(type === "resume") { + resumeCallback(); + } else if(type === "reload") { + reloadCallback(); + } else if(type === "stop") { + stopCallback(); + } else if(type === "start") { + startCallback(); + } else if(type === "clear") { + clearCallback(); + } +} + function resumeProcess() { + $.get({url: '/resume'}); +} + +function resumeCallback() { $("#resumeButton").css('display', 'none'); //$("#sendButton").css('display', 'inline-block'); $("#pauseButton").css('display', 'inline-block');//removeClass("disabled").prop('disabled', false); - $.get({url: '/resume'}); printErr("Process resumed") } function pauseProcess() { //$("#sendButton").css('display', 'none'); + $.get({url: '/pause'}); +} + +function pauseCallback() { $("#resumeButton").css('display', 'inline-block'); $("#pauseButton").css('display', 'none');//addClass("disabled").prop('disabled', true); - $.get({url: '/pause'}); printErr("Process paused") } function reloadProcess() { + $.get({url: '/reload'}); +} + +function reloadCallback() { $("#sendButton").removeClass("disabled").prop('disabled', false); $("#stopButton").addClass("disabled").prop('disabled', true);//css('display', 'inline-block'); $("#pauseButton").addClass("disabled").prop('disabled', true);//css('display', 'inline-block'); $("#reloadButton").addClass("disabled").prop('disabled', true);//css('display', 'inline-block'); - $.get({url: '/reload', - success: function() { - oboe('/output.json').node('output.*', function(e){ - if(e.type === "err") { - printErr(e.line); - } else { - printOut(e.line); - } - }); - }}); + oboe('/output.json').node('output.*', function(e){ + if(e.type === "err") { + printErr(e.line); + } else if (e.type === "sig") { + signal(e.line) + } else { + printOut(e.line); + } + }); } function stopProcess() { + $.get({url: '/stop'}); +} + +function stopCallback() { $("#sendButton").css('display', 'inline-block').prop('disabled', true).addClass('disabled'); $("#pauseButton").css('display', 'none'); $("#resumeButton").css('display', 'none'); $("#stopButton").addClass("disabled").prop('disabled', true);//css('display', 'none'); $("#reloadButton").removeClass("disabled").prop('disabled', false);//css('display', 'inline-block'); - $.get({url: '/stop'}); } function sendData() { - $("#sendButton").css('display', 'none');//addClass("disabled").prop('disabled', true); - $("#stopButton").removeClass("disabled").prop('disabled', false);//css('display', 'inline-block'); - $("#pauseButton").css('display', 'inline-block').removeClass("disabled").prop('disabled', false);//css('display', 'inline-block'); console.log(calcParams()); $.post({ url: '/arguments', @@ -299,6 +327,22 @@ function sendData() { }); } +function clearOutput() { + $.get({ + url: 'clear' + }); +} + +function clearCallback() { + $("#output").empty(); +} + +function startCallback() { + $("#sendButton").css('display', 'none');//addClass("disabled").prop('disabled', true); + $("#stopButton").removeClass("disabled").prop('disabled', false);//css('display', 'inline-block'); + $("#pauseButton").css('display', 'inline-block').removeClass("disabled").prop('disabled', false);//css('display', 'inline-block'); +} + function addParam(params, object) { params[$(object).data('name')] = []; if($(object).attr('type') === "checkbox") { diff --git a/warp/templates/index.html b/warp/templates/index.html index 98687cb..a1e809c 100644 --- a/warp/templates/index.html +++ b/warp/templates/index.html @@ -1,42 +1,45 @@ - - {{ name }} - - - - + + {{ name }} + + + + - - - + + + - - -
+ + +
+
+
-
- -
-
-
-
+
- - +
+
+
+
+
+ + diff --git a/warp/views.py b/warp/views.py index f182109..0e268e0 100644 --- a/warp/views.py +++ b/warp/views.py @@ -43,6 +43,7 @@ def fill_namespace(): setattr(namespace, action.name, value) app.namespaceQueue.put(namespace) + app.output.queue.put(("sig", "start")) return "OK" @@ -56,16 +57,24 @@ def get_arguments(): def stop(): os.kill(app.module_process.pid, signal.SIGCONT) app.module_process.terminate() + app.output.queue.put(("sig", "stop")) return "OK" @app.route("/resume") def resume(): os.kill(app.module_process.pid, signal.SIGCONT) + app.output.queue.put(("sig", "resume")) return "OK" @app.route("/pause") def pause(): os.kill(app.module_process.pid, signal.SIGSTOP) + app.output.queue.put(("sig", "pause")) + return "OK" + +@app.route('/clear') +def clear(): + app.output.queue.put(("sig", "clear")); return "OK" @app.route("/reload") @@ -73,11 +82,12 @@ def reload(): if app.module_process.is_alive(): return 403, "Process is still running" app.restart.set() + app.output.queue.put(("sig", "reload")) return "OK" @app.route("/download", methods=['GET']) def download(): - output = "\n".join([line for msg_type, line in app.output.cache]) + output = "\n".join([line for msg_type, line in app.output.cache if msg_type != 'sig']) response = make_response(output) response.headers["Content-Disposition"] = \ "attachment; filename=%s_%s.log" \ @@ -89,16 +99,20 @@ def download(): def output(): def generate(): yield '{"output":[' + app.output.sem.acquire() cache = app.output.cache + output = app.output.add_client() + app.output.sem.release() for msg_type, line in cache: - yield json.dumps({'type' : msg_type, 'line': line}) + yield json.dumps({'type': msg_type, 'line': line}) yield ',' - output = app.output.get_output() - for msg_type, line in output: + while not app.restart.is_set(): + msg_type, line = output.get() print("Send ({}): {} (length: {})".format(msg_type, line, len(line)), file=sys.__stdout__) yield json.dumps({'type' : msg_type, 'line': line}) yield ',' + yield '\{\}]}' return Response(generate(), mimetype="application/json") @@ -107,7 +121,5 @@ def output(): @app.route("/", methods=['GET']) def index(): - if not app.actionQueue.empty(): - app.mutex_groups, app.actions, app.name, app.desc = app.actionQueue.get() return render_template("index.html", mutex_groups=app.mutex_groups, actions=app.actions, name=app.name, description=app.desc)