|
@@ -16,7 +16,7 @@ if sys.version_info[0] > 2 :
|
|
|
else:
|
|
|
from common import Reader, Writer
|
|
|
import json
|
|
|
-
|
|
|
+from multiprocessing import RLock
|
|
|
class MessageQueue:
|
|
|
"""
|
|
|
This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)
|
|
@@ -29,12 +29,23 @@ class MessageQueue:
|
|
|
self.port= 5672 if 'port' not in params else params['port']
|
|
|
self.virtual_host = '/' if 'vhost' not in params else params['vhost']
|
|
|
self.exchange = params['exchange'] if 'exchange' in params else 'amq.direct' #-- exchange
|
|
|
- self.queue = params['queue']
|
|
|
+ self.queue = params['queue'] if 'queue' in params else 'demo'
|
|
|
self.connection = None
|
|
|
self.channel = None
|
|
|
|
|
|
- self.name = self.__class__.__name__.lower() if 'name' not in params else 'wtf'
|
|
|
+ self.name = self.__class__.__name__.lower() if 'name' not in params else params['name']
|
|
|
|
|
|
+ username = password = None
|
|
|
+ if 'username' in params :
|
|
|
+ username = params['username']
|
|
|
+ password = params['password']
|
|
|
+ if 'auth_file' in params :
|
|
|
+ _info = json.loads((open(params['auth_file'])).read())
|
|
|
+ username=_info['username']
|
|
|
+ password=_info['password']
|
|
|
+ self.virtual_host = _info['virtual_host'] if 'virtual_host' in _info else self.virtual_host
|
|
|
+ self.exchange = _info['exchange'] if 'exchange' in _info else self.exchange
|
|
|
+ self.queue = _info['queue'] if 'queue' in _info else self.queue
|
|
|
|
|
|
self.credentials= pika.PlainCredentials('guest','guest')
|
|
|
if 'username' in params :
|
|
@@ -44,7 +55,9 @@ class MessageQueue:
|
|
|
)
|
|
|
|
|
|
def init(self,label=None):
|
|
|
- properties = pika.ConnectionParameters(host=self.host,port=self.port,virtual_host=self.virtual_host,credentials=self.credentials)
|
|
|
+ properties = pika.ConnectionParameters(host=self.host,port=self.port,virtual_host=self.virtual_host,
|
|
|
+ client_properties={'connection_name':self.name},
|
|
|
+ credentials=self.credentials)
|
|
|
self.connection = pika.BlockingConnection(properties)
|
|
|
self.channel = self.connection.channel()
|
|
|
self.info = self.channel.exchange_declare(exchange=self.exchange,exchange_type='direct',durable=True)
|
|
@@ -93,23 +106,7 @@ class QueueWriter(MessageQueue,Writer):
|
|
|
@param object object to be written (will be converted to JSON)
|
|
|
@TODO: make this less chatty
|
|
|
"""
|
|
|
- # xchar = None
|
|
|
- # if 'xchar' in params:
|
|
|
- # xchar = params['xchar']
|
|
|
- # object = self.format(params['row'],xchar)
|
|
|
-
|
|
|
- # label = params['label']
|
|
|
- # self.init(label)
|
|
|
- # _mode = 2
|
|
|
- # if isinstance(object,str):
|
|
|
- # stream = object
|
|
|
- # _type = 'text/plain'
|
|
|
- # else:
|
|
|
- # stream = json.dumps(object)
|
|
|
- # if 'type' in params :
|
|
|
- # _type = params['type']
|
|
|
- # else:
|
|
|
- # _type = 'application/json'
|
|
|
+
|
|
|
stream = json.dumps(data) if isinstance(data,dict) else data
|
|
|
self.channel.basic_publish(
|
|
|
exchange=self.exchange,
|
|
@@ -143,10 +140,11 @@ class QueueReader(MessageQueue,Reader):
|
|
|
#self.queue = params['qid']
|
|
|
MessageQueue.__init__(self,**params);
|
|
|
# self.init()
|
|
|
- if 'durable' in params :
|
|
|
- self.durable = True
|
|
|
- else:
|
|
|
- self.durable = False
|
|
|
+ self.durable = False if 'durable' not in params else params['durable']
|
|
|
+ # if 'durable' in params :
|
|
|
+ # self.durable = True
|
|
|
+ # else:
|
|
|
+ # self.durable = False
|
|
|
self.size = -1
|
|
|
self.data = {}
|
|
|
# def init(self,qid):
|
|
@@ -166,7 +164,8 @@ class QueueReader(MessageQueue,Reader):
|
|
|
"""
|
|
|
|
|
|
r = []
|
|
|
- if re.match("^\{|\[",stream) is not None:
|
|
|
+ # if re.match("^\{|\[",stream) is not None:
|
|
|
+ if stream.startswith(b'{') or stream.startswith(b'['):
|
|
|
r = json.loads(stream)
|
|
|
else:
|
|
|
|
|
@@ -215,6 +214,7 @@ class QueueReader(MessageQueue,Reader):
|
|
|
|
|
|
return self.data
|
|
|
class QueueListener(MessageQueue):
|
|
|
+ lock = RLock()
|
|
|
"""
|
|
|
This class is designed to have an active listener (worker) against a specified Exchange/Queue
|
|
|
It is initialized as would any other object and will require a callback function to address the objects returned.
|
|
@@ -223,6 +223,7 @@ class QueueListener(MessageQueue):
|
|
|
MessageQueue.__init__(self,**args)
|
|
|
self.listen = self.read
|
|
|
self.apply = args['apply'] if 'apply' in args else print
|
|
|
+ self.lock = False if 'lock' not in args else args['lock']
|
|
|
|
|
|
def finalize(self,channel,ExceptionReason):
|
|
|
pass
|
|
@@ -231,12 +232,30 @@ class QueueListener(MessageQueue):
|
|
|
_info= {}
|
|
|
# if re.match("^\{|\[",stream) is not None:
|
|
|
|
|
|
+
|
|
|
if stream.startswith(b"[") or stream.startswith(b"{"):
|
|
|
_info = json.loads(stream)
|
|
|
else:
|
|
|
|
|
|
_info = stream
|
|
|
- self.apply(_info)
|
|
|
+ #
|
|
|
+ # At this point we should invoke the apply function with a lock if need be
|
|
|
+ # @TODO: Establish a vocabulary
|
|
|
+
|
|
|
+ if stream == b'QUIT' :
|
|
|
+ # channel.exit()
|
|
|
+ self.close()
|
|
|
+ if self.lock == True :
|
|
|
+ QueueListener.lock.acquire()
|
|
|
+ try:
|
|
|
+ #
|
|
|
+ # In case the user has not specified a function to apply the data against, it will simply be printed
|
|
|
+ #
|
|
|
+ self.apply(_info)
|
|
|
+ except Exception as e:
|
|
|
+ pass
|
|
|
+ if self.lock == True :
|
|
|
+ QueueListener.lock.release()
|
|
|
def read(self):
|
|
|
|
|
|
self.init(self.queue)
|
|
@@ -246,3 +265,15 @@ class QueueListener(MessageQueue):
|
|
|
|
|
|
|
|
|
|
|
|
+class Factory :
|
|
|
+ @staticmethod
|
|
|
+ def instance(**_args):
|
|
|
+ """
|
|
|
+ :param count number of workers
|
|
|
+ :param apply function workers
|
|
|
+ """
|
|
|
+ _apply = _args['apply']
|
|
|
+ _count = _args['count']
|
|
|
+ for i in np.arange(_count) :
|
|
|
+ _name = _args['name'] if 'name' in _args else 'worker_'+str(i)
|
|
|
+ transport.factory.instance(provider="rabbit",context="listener",apply=_apply,auth_file=_args['auth_file'])
|