From 7042cdb6bd48d1e15f13844a96df33f58dac4c0b Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Mon, 1 Feb 2016 14:24:46 +0100 Subject: [PATCH 2/2] use hobo_natch in agent Service classes (fixes #9821) --- hobo/agent/common/management/hobo_batch_client.py | 51 +++++++++++++++++++++++ hobo/agent/worker/services.py | 37 ++++++++++++---- 2 files changed, 79 insertions(+), 9 deletions(-) create mode 100644 hobo/agent/common/management/hobo_batch_client.py diff --git a/hobo/agent/common/management/hobo_batch_client.py b/hobo/agent/common/management/hobo_batch_client.py new file mode 100644 index 0000000..f68b006 --- /dev/null +++ b/hobo/agent/common/management/hobo_batch_client.py @@ -0,0 +1,51 @@ +import os +import subprocess +import struct + + +class BatchClient(object): + def write_buffer(self, b): + self.process.stdin.write(b) + + def write_integer(self, i): + self.write_buffer(struct.pack('!I', i)) + + def write_string(self, s): + self.write_integer(len(s)) + self.write_buffer(s) + + def read_string(self): + length = self.read_integer() + return self.read_buffer(length) + + def read_buffer(self, length): + s = '' + while len(s) != length: + b = self.process.stdout.read(length - len(s)) + if not b: + raise IOError('EOF') + s += b + return s + + def read_fmt(self, fmt): + n = struct.calcsize(fmt) + s = self.read_buffer(n) + return struct.unpack(fmt, s) + + def read_integer(self): + return self.read_fmt('!I')[0] + + def __init__(self, command, environment=None): + env = os.environ.copy() + env.update(environment or {}) + self.process = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE) + + def execute(self, args, stdin=None): + self.write_integer(len(args)) + for arg in args: + self.write_string(arg) + self.write_string(stdin or '') + self.process.stdin.flush() + result = self.read_integer() + output = self.read_string() + return result, output diff --git a/hobo/agent/worker/services.py b/hobo/agent/worker/services.py index 0774449..8db48f2 100644 --- a/hobo/agent/worker/services.py +++ b/hobo/agent/worker/services.py @@ -1,4 +1,5 @@ # hobo - portal to configure and deploy applications + irint action, data # Copyright (C) 2015 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it @@ -14,6 +15,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import time import sys import ConfigParser import fnmatch @@ -27,8 +29,13 @@ import urlparse from . import settings +from hobo.agent.common.management.hobo_batch_client import BatchClient + class BaseService(object): + batch_support = True + _batch_process = None + def __init__(self, base_url, title, secret_key, **kwargs): self.base_url = base_url self.title = title @@ -74,6 +81,12 @@ class BaseService(object): stdout = cmd_process.communicate(input=json.dumps(environment)) @classmethod + def get_batch_process(cls): + if not cls._batch_process or cls._batch_process.process.returncode: + cls._batch_process = BatchClient(cls.service_manage_cmd + ' hobo_batch') + return cls._batch_process + + @classmethod def notify(cls, data): for audience in data.get('audience', []): if cls.is_for_us(audience): @@ -82,15 +95,21 @@ class BaseService(object): return if not os.path.exists(cls.service_manage_try_cmd): return - cmd = cls.service_manage_cmd + ' hobo_notify -' - try: - cmd_process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - except OSError: - return - stdout, stderr = cmd_process.communicate(input=json.dumps(data)) - if cmd_process.returncode != 0: - raise RuntimeError('command "%s" failed: %r %r' % (cmd, stdout, stderr)) + if cls.batch_support: + batch = cls.get_batch_process() + returncode, stdout = batch.execute(['hobo_notify', '-'], json.dumps(data)) + if returncode != 0: + raise RuntimeError('command "hobo_notify -" failed: %r %r' % (returncode, stdout)) + else: + cmd = cls.service_manage_cmd + ' hobo_notify -' + try: + cmd_process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except OSError: + return + stdout, stderr = cmd_process.communicate(input=json.dumps(data)) + if cmd_process.returncode != 0: + raise RuntimeError('command "%s" failed: %r %r' % (cmd, stdout, stderr)) class Passerelle(BaseService): -- 2.1.4