mongo.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. """
  2. Data Transport - 1.0
  3. Steve L. Nyemba, The Phi Technology LLC
  4. This file is a wrapper around mongodb for reading/writing content against a mongodb server and executing views (mapreduce)
  5. """
  6. from pymongo import MongoClient
  7. from bson.objectid import ObjectId
  8. from bson.binary import Binary
  9. # import nujson as json
  10. from datetime import datetime
  11. import pandas as pd
  12. import numpy as np
  13. import gridfs
  14. # from transport import Reader,Writer
  15. import sys
  16. if sys.version_info[0] > 2 :
  17. from transport.common import Reader, Writer, IEncoder
  18. else:
  19. from common import Reader, Writer
  20. import json
  21. import re
  22. from multiprocessing import Lock, RLock
  23. class Mongo :
  24. lock = RLock()
  25. """
  26. Basic mongodb functions are captured here
  27. """
  28. def __init__(self,**args):
  29. """
  30. :dbname database name/identifier
  31. :host host and port of the database by default localhost:27017
  32. :username username for authentication
  33. :password password for current user
  34. """
  35. self.mechanism= 'SCRAM-SHA-256' if 'mechanism' not in args else args['mechanism']
  36. # authSource=(args['authSource'] if 'authSource' in args else self.dbname)
  37. self._lock = False if 'lock' not in args else args['lock']
  38. self.dbname = None
  39. username = password = None
  40. if 'auth_file' in args :
  41. _info = json.loads((open(args['auth_file'])).read())
  42. else:
  43. _info = {}
  44. _args = dict(args,**_info)
  45. _map = {'dbname':'db','database':'db','table':'uid','collection':'uid','col':'uid','doc':'uid'}
  46. for key in _args :
  47. if key in ['username','password'] :
  48. username = _args['username'] if key=='username' else username
  49. password = _args['password'] if key == 'password' else password
  50. continue
  51. value = _args[key]
  52. if key in _map :
  53. key = _map[key]
  54. self.setattr(key,value)
  55. #
  56. # Let us perform aliasing in order to remain backwards compatible
  57. self.dbname = self.db if hasattr(self,'db')else self.dbname
  58. self.uid = _args['table'] if 'table' in _args else (_args['doc'] if 'doc' in _args else (_args['collection'] if 'collection' in _args else None))
  59. if username and password :
  60. self.client = MongoClient(self.host,
  61. username=username,
  62. password=password ,
  63. authSource=self.authSource,
  64. authMechanism=self.mechanism)
  65. else:
  66. self.client = MongoClient(self.host,maxPoolSize=10000)
  67. self.db = self.client[self.dbname]
  68. def isready(self):
  69. p = self.dbname in self.client.list_database_names()
  70. q = self.uid in self.client[self.dbname].list_collection_names()
  71. return p and q
  72. def setattr(self,key,value):
  73. _allowed = ['host','port','db','doc','authSource','mechanism']
  74. if key in _allowed :
  75. setattr(self,key,value)
  76. pass
  77. def close(self):
  78. self.client.close()
  79. def meta(self,**_args):
  80. return []
  81. class MongoReader(Mongo,Reader):
  82. """
  83. This class will read from a mongodb data store and return the content of a document (not a collection)
  84. """
  85. def __init__(self,**args):
  86. Mongo.__init__(self,**args)
  87. def read(self,**args):
  88. if 'mongo' in args or 'cmd' in args or 'pipeline' in args:
  89. #
  90. # @TODO:
  91. cmd = {}
  92. if 'pipeline' in args :
  93. cmd['pipeline']= args['pipeline']
  94. if 'aggregate' not in cmd :
  95. cmd['aggregate'] = self.uid
  96. if 'pipeline' not in args or 'aggregate' not in cmd :
  97. cmd = args['mongo'] if 'mongo' in args else args['cmd']
  98. if "aggregate" in cmd :
  99. if "allowDiskUse" not in cmd :
  100. cmd["allowDiskUse"] = True
  101. if "cursor" not in cmd :
  102. cmd["cursor"] = {}
  103. r = []
  104. out = self.db.command(cmd)
  105. #@TODO: consider using a yield (generator) works wonders
  106. while True :
  107. if 'values' in out :
  108. r += out['values']
  109. if 'cursor' in out :
  110. key = 'firstBatch' if 'firstBatch' in out['cursor'] else 'nextBatch'
  111. else:
  112. key = 'n'
  113. if 'cursor' in out and out['cursor'][key] :
  114. r += list(out['cursor'][key])
  115. elif key in out and out[key]:
  116. r.append (out[key])
  117. # yield out['cursor'][key]
  118. if key not in ['firstBatch','nextBatch'] or ('cursor' in out and out['cursor']['id'] == 0) :
  119. break
  120. else:
  121. out = self.db.command({"getMore":out['cursor']['id'],"collection":out['cursor']['ns'].split(".")[-1]})
  122. return pd.DataFrame(r)
  123. else:
  124. if 'table' in args or 'collection' in args :
  125. if 'table' in args:
  126. _uid = args['table']
  127. elif 'collection' in args :
  128. _uid = args['collection']
  129. else:
  130. _uid = self.uid
  131. else:
  132. _uid = self.uid
  133. collection = self.db[_uid]
  134. _filter = args['filter'] if 'filter' in args else {}
  135. _df = pd.DataFrame(collection.find(_filter))
  136. columns = _df.columns.tolist()[1:]
  137. return _df[columns]
  138. def view(self,**args):
  139. """
  140. This function is designed to execute a view (map/reduce) operation
  141. """
  142. pass
  143. class MongoWriter(Mongo,Writer):
  144. """
  145. This class is designed to write to a mongodb collection within a database
  146. """
  147. def __init__(self,**args):
  148. Mongo.__init__(self,**args)
  149. def upload(self,**args) :
  150. """
  151. This function will upload a file to the current database (using GridFS)
  152. :param data binary stream/text to be stored
  153. :param filename filename to be used
  154. :param encoding content_encoding (default utf-8)
  155. """
  156. if 'encoding' not in args :
  157. args['encoding'] = 'utf-8'
  158. gfs = GridFS(self.db)
  159. gfs.put(**args)
  160. def archive(self):
  161. """
  162. This function will archive documents to the
  163. """
  164. collection = self.db[self.uid]
  165. rows = list(collection.find())
  166. for row in rows :
  167. if type(row['_id']) == ObjectId :
  168. row['_id'] = str(row['_id'])
  169. stream = Binary(json.dumps(collection,cls=IEncoder).encode())
  170. collection.delete_many({})
  171. now = "-".join([str(datetime.now().year()),str(datetime.now().month), str(datetime.now().day)])
  172. name = ".".join([self.uid,'archive',now])+".json"
  173. description = " ".join([self.uid,'archive',str(len(rows))])
  174. self.upload(filename=name,data=stream,description=description,content_type='application/json')
  175. # gfs = GridFS(self.db)
  176. # gfs.put(filename=name,description=description,data=stream,encoding='utf-8')
  177. # self.write({{"filename":name,"file":stream,"description":descriptions}})
  178. pass
  179. def write(self,info,**_args):
  180. """
  181. This function will write to a given collection i.e add a record to a collection (no updates)
  182. @param info new record in the collection to be added
  183. """
  184. # document = self.db[self.uid].find()
  185. #collection = self.db[self.uid]
  186. # if type(info) == list :
  187. # self.db[self.uid].insert_many(info)
  188. # else:
  189. try:
  190. if 'table' in _args or 'collection' in _args :
  191. _uid = _args['table'] if 'table' in _args else _args['collection']
  192. else:
  193. _uid = self.uid if 'doc' not in _args else _args['doc']
  194. if self._lock :
  195. Mongo.lock.acquire()
  196. if type(info) == list or type(info) == pd.DataFrame :
  197. self.db[_uid].insert_many(info if type(info) == list else info.to_dict(orient='records'))
  198. else:
  199. self.db[_uid].insert_one(info)
  200. finally:
  201. if self._lock :
  202. Mongo.lock.release()
  203. def set(self,document):
  204. """
  205. if no identifier is provided the function will delete the entire collection and set the new document.
  206. Please use this function with great care (archive the content first before using it... for safety)
  207. """
  208. collection = self.db[self.uid]
  209. if collection.count_document() > 0 and '_id' in document:
  210. id = document['_id']
  211. del document['_id']
  212. collection.find_one_and_replace({'_id':id},document)
  213. else:
  214. collection.delete_many({})
  215. self.write(info)
  216. def close(self):
  217. Mongo.close(self)
  218. # collecton.update_one({"_id":self.uid},document,True)