queue.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. """
  2. Data Transport - 1.0
  3. Steve L. Nyemba, The Phi Technology LLC
  4. This file is a wrapper around rabbitmq server for reading and writing content to a queue (exchange)
  5. """
  6. import pika
  7. from datetime import datetime
  8. import re
  9. import json
  10. import os
  11. import sys
  12. if sys.version_info[0] > 2 :
  13. from transport.common import Reader, Writer
  14. else:
  15. from common import Reader, Writer
  16. import json
  17. class MessageQueue:
  18. """
  19. This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)
  20. :host
  21. :xid identifier of the exchange
  22. :qid identifier of the queue
  23. """
  24. def __init__(self,**params):
  25. self.host= 'localhost' if 'host' not in params else params['host'] #-- location of the queue server
  26. self.port= 5672 if 'port' not in params else params['port']
  27. self.virtual_host = '/' if 'vhost' not in params else params['vhost']
  28. self.exchange = params['exchange'] if 'exchange' in params else 'amq.direct' #-- exchange
  29. self.queue = params['queue']
  30. self.connection = None
  31. self.channel = None
  32. self.name = self.__class__.__name__.lower() if 'name' not in params else 'wtf'
  33. self.credentials= pika.PlainCredentials('guest','guest')
  34. if 'username' in params :
  35. self.credentials = pika.PlainCredentials(
  36. params['username'],
  37. ('' if 'password' not in params else params['password'])
  38. )
  39. def init(self,label=None):
  40. properties = pika.ConnectionParameters(host=self.host,port=self.port,virtual_host=self.virtual_host,credentials=self.credentials)
  41. self.connection = pika.BlockingConnection(properties)
  42. self.channel = self.connection.channel()
  43. self.info = self.channel.exchange_declare(exchange=self.exchange,exchange_type='direct',durable=True)
  44. if label is None:
  45. self.qhandler = self.channel.queue_declare(queue=self.queue,durable=True)
  46. else:
  47. self.qhandler = self.channel.queue_declare(queue=label,durable=True)
  48. self.channel.queue_bind(exchange=self.exchange,queue=self.qhandler.method.queue)
  49. def isready(self):
  50. #self.init()
  51. resp = self.connection is not None and self.connection.is_open
  52. # self.close()
  53. return resp
  54. def finalize(self):
  55. pass
  56. def close(self):
  57. if self.connection.is_closed == False :
  58. self.channel.close()
  59. self.connection.close()
  60. class QueueWriter(MessageQueue,Writer):
  61. """
  62. This class is designed to publish content to an AMQP (Rabbitmq)
  63. The class will rely on pika to implement this functionality
  64. We will publish information to a given queue for a given exchange
  65. """
  66. def __init__(self,**params):
  67. #self.host= params['host']
  68. #self.exchange = params['uid']
  69. #self.queue = params['queue']
  70. MessageQueue.__init__(self,**params);
  71. self.init()
  72. def write(self,data,_type='text/plain'):
  73. """
  74. This function writes a stream of data to the a given queue
  75. @param object object to be written (will be converted to JSON)
  76. @TODO: make this less chatty
  77. """
  78. # xchar = None
  79. # if 'xchar' in params:
  80. # xchar = params['xchar']
  81. # object = self.format(params['row'],xchar)
  82. # label = params['label']
  83. # self.init(label)
  84. # _mode = 2
  85. # if isinstance(object,str):
  86. # stream = object
  87. # _type = 'text/plain'
  88. # else:
  89. # stream = json.dumps(object)
  90. # if 'type' in params :
  91. # _type = params['type']
  92. # else:
  93. # _type = 'application/json'
  94. stream = json.dumps(data) if isinstance(data,dict) else data
  95. self.channel.basic_publish(
  96. exchange=self.exchange,
  97. routing_key=self.queue,
  98. body=stream,
  99. properties=pika.BasicProperties(content_type=_type,delivery_mode=2)
  100. );
  101. # self.close()
  102. def flush(self):
  103. self.init()
  104. _mode = 1 #-- Non persistent
  105. self.channel.queue_delete( queue=self.queue);
  106. self.close()
  107. class QueueReader(MessageQueue,Reader):
  108. """
  109. This class will read from a queue provided an exchange, queue and host
  110. @TODO: Account for security and virtualhosts
  111. """
  112. def __init__(self,**params):
  113. """
  114. @param host host
  115. @param uid exchange identifier
  116. @param qid queue identifier
  117. """
  118. #self.host= params['host']
  119. #self.exchange = params['uid']
  120. #self.queue = params['qid']
  121. MessageQueue.__init__(self,**params);
  122. # self.init()
  123. if 'durable' in params :
  124. self.durable = True
  125. else:
  126. self.durable = False
  127. self.size = -1
  128. self.data = {}
  129. # def init(self,qid):
  130. # properties = pika.ConnectionParameters(host=self.host)
  131. # self.connection = pika.BlockingConnection(properties)
  132. # self.channel = self.connection.channel()
  133. # self.channel.exchange_declare(exchange=self.exchange,type='direct',durable=True)
  134. # self.info = self.channel.queue_declare(queue=qid,durable=True)
  135. def callback(self,channel,method,header,stream):
  136. """
  137. This is the callback function designed to process the data stream from the queue
  138. """
  139. r = []
  140. if re.match("^\{|\[",stream) is not None:
  141. r = json.loads(stream)
  142. else:
  143. r = stream
  144. qid = self.qhandler.method.queue
  145. if qid not in self.data :
  146. self.data[qid] = []
  147. self.data[qid].append(r)
  148. #
  149. # We stop reading when the all the messages of the queue are staked
  150. #
  151. if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count:
  152. self.close()
  153. def read(self,size=-1):
  154. """
  155. This function will read, the first message from a queue
  156. @TODO:
  157. Implement channel.basic_get in order to retrieve a single message at a time
  158. Have the number of messages retrieved be specified by size (parameter)
  159. """
  160. r = {}
  161. self.size = size
  162. #
  163. # We enabled the reader to be able to read from several queues (sequentially for now)
  164. # The qid parameter will be an array of queues the reader will be reading from
  165. #
  166. if isinstance(self.queue,str) :
  167. self.queue = [self.queue]
  168. for qid in self.queue:
  169. self.init(qid)
  170. # r[qid] = []
  171. if self.qhandler.method.message_count > 0:
  172. self.channel.basic_consume(queue=qid,on_message_callback=self.callback,auto_ack=False);
  173. self.channel.start_consuming()
  174. else:
  175. pass
  176. #self.close()
  177. # r[qid].append( self.data)
  178. return self.data
  179. class QueueListener(MessageQueue):
  180. """
  181. This class is designed to have an active listener (worker) against a specified Exchange/Queue
  182. It is initialized as would any other object and will require a callback function to address the objects returned.
  183. """
  184. def __init__(self,**args):
  185. MessageQueue.__init__(self,**args)
  186. self.listen = self.read
  187. # def init(self,qid):
  188. # properties = pika.ConnectionParameters(host=self.host)
  189. # self.connection = pika.BlockingConnection(properties)
  190. # self.channel = self.connection.channel()
  191. # self.channel.exchange_declare(exchange=self.exchange,type='direct',durable=True )
  192. # self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid)
  193. # self.channel.queue_bind(exchange=self.exchange,queue=self.info.method.queue,routing_key=qid)
  194. #self.callback = callback
  195. def finalize(self,channel,ExceptionReason):
  196. pass
  197. def callback(self,channel,method,header,stream) :
  198. raise Exception("....")
  199. def read(self):
  200. self.init(self.queue)
  201. self.channel.basic_consume(self.queue,self.callback,auto_ack=True);
  202. self.channel.start_consuming()