mongodb.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. """
  2. Data Transport - 1.0
  3. Steve L. Nyemba, The Phi Technology LLC
  4. This file is a wrapper around mongodb for reading/writing content against a mongodb server and executing views (mapreduce)
  5. """
  6. from pymongo import MongoClient
  7. import bson
  8. from bson.objectid import ObjectId
  9. from bson.binary import Binary
  10. # import nujson as json
  11. from datetime import datetime
  12. import pandas as pd
  13. import numpy as np
  14. # import gridfs
  15. from gridfs import GridFS
  16. import sys
  17. import json
  18. import re
  19. from multiprocessing import Lock, RLock
  20. from transport.common import IEncoder
  21. class Mongo :
  22. lock = RLock()
  23. """
  24. Basic mongodb functions are captured here
  25. """
  26. def __init__(self,**args):
  27. """
  28. :dbname database name/identifier
  29. :host host and port of the database by default localhost:27017
  30. :username username for authentication
  31. :password password for current user
  32. """
  33. self.host = 'localhost' if 'host' not in args else args['host']
  34. if ':' not in self.host and 'port' in args :
  35. self.host = ':'.join([self.host,str(args['port'])])
  36. self.mechanism= 'SCRAM-SHA-256' if 'mechanism' not in args else args['mechanism']
  37. # authSource=(args['authSource'] if 'authSource' in args else self.dbname)
  38. self._lock = False if 'lock' not in args else args['lock']
  39. self.dbname = None
  40. username = password = None
  41. if 'auth_file' in args :
  42. _info = json.loads((open(args['auth_file'])).read())
  43. else:
  44. _info = {}
  45. _args = dict(args,**_info)
  46. _map = {'dbname':'db','database':'db','table':'uid','collection':'uid','col':'uid','doc':'uid'}
  47. for key in _args :
  48. if key in ['username','password'] :
  49. username = _args['username'] if key=='username' else username
  50. password = _args['password'] if key == 'password' else password
  51. continue
  52. value = _args[key]
  53. if key in _map :
  54. key = _map[key]
  55. self.setattr(key,value)
  56. #
  57. # Let us perform aliasing in order to remain backwards compatible
  58. self.dbname = self.db if hasattr(self,'db')else self.dbname
  59. self.collection = _args['table'] if 'table' in _args else (_args['doc'] if 'doc' in _args else (_args['collection'] if 'collection' in _args else None))
  60. if username and password :
  61. self.client = MongoClient(self.host,
  62. username=username,
  63. password=password ,
  64. authSource=self.authSource,
  65. authMechanism=self.mechanism)
  66. else:
  67. self.client = MongoClient(self.host,maxPoolSize=10000)
  68. self.db = self.client[self.dbname]
  69. def isready(self):
  70. p = self.dbname in self.client.list_database_names()
  71. q = self.collection in self.client[self.dbname].list_collection_names()
  72. return p and q
  73. def setattr(self,key,value):
  74. _allowed = ['host','port','db','doc','collection','authSource','mechanism']
  75. if key in _allowed :
  76. setattr(self,key,value)
  77. pass
  78. def close(self):
  79. self.client.close()
  80. def meta(self,**_args):
  81. return []
  82. class Reader(Mongo):
  83. """
  84. This class will read from a mongodb data store and return the content of a document (not a collection)
  85. """
  86. def __init__(self,**args):
  87. Mongo.__init__(self,**args)
  88. def read(self,**args):
  89. if 'mongo' in args or 'cmd' in args or 'pipeline' in args:
  90. #
  91. # @TODO:
  92. cmd = {}
  93. if 'aggregate' not in cmd and 'aggregate' not in args:
  94. cmd['aggregate'] = self.collection
  95. elif 'aggregate' in args :
  96. cmd['aggregate'] = args['aggregate']
  97. if 'pipeline' in args :
  98. cmd['pipeline']= args['pipeline']
  99. if 'pipeline' not in args or 'aggregate' not in cmd :
  100. cmd = args['mongo'] if 'mongo' in args else args['cmd']
  101. if "aggregate" in cmd :
  102. if "allowDiskUse" not in cmd :
  103. cmd["allowDiskUse"] = True
  104. if "cursor" not in cmd :
  105. cmd["cursor"] = {}
  106. r = []
  107. out = self.db.command(cmd)
  108. #@TODO: consider using a yield (generator) works wonders
  109. while True :
  110. if 'values' in out :
  111. r += out['values']
  112. if 'cursor' in out :
  113. key = 'firstBatch' if 'firstBatch' in out['cursor'] else 'nextBatch'
  114. else:
  115. key = 'n'
  116. if 'cursor' in out and out['cursor'][key] :
  117. r += list(out['cursor'][key])
  118. elif key in out and out[key]:
  119. r.append (out[key])
  120. # yield out['cursor'][key]
  121. if key not in ['firstBatch','nextBatch'] or ('cursor' in out and out['cursor']['id'] == 0) :
  122. break
  123. else:
  124. out = self.db.command({"getMore":out['cursor']['id'],"collection":out['cursor']['ns'].split(".")[-1]})
  125. return pd.DataFrame(r)
  126. else:
  127. if 'table' in args or 'collection' in args :
  128. if 'table' in args:
  129. _uid = args['table']
  130. elif 'collection' in args :
  131. _uid = args['collection']
  132. else:
  133. _uid = self.collection
  134. else:
  135. _uid = self.collection
  136. collection = self.db[_uid]
  137. _filter = args['filter'] if 'filter' in args else {}
  138. _df = pd.DataFrame(collection.find(_filter))
  139. columns = _df.columns.tolist()[1:]
  140. return _df[columns]
  141. def view(self,**args):
  142. """
  143. This function is designed to execute a view (map/reduce) operation
  144. """
  145. pass
  146. class Writer(Mongo):
  147. """
  148. This class is designed to write to a mongodb collection within a database
  149. """
  150. def __init__(self,**args):
  151. Mongo.__init__(self,**args)
  152. def upload(self,**args) :
  153. """
  154. This function will upload a file to the current database (using GridFS)
  155. :param data binary stream/text to be stored
  156. :param filename filename to be used
  157. :param encoding content_encoding (default utf-8)
  158. """
  159. if 'encoding' not in args :
  160. args['encoding'] = 'utf-8'
  161. gfs = GridFS(self.db)
  162. gfs.put(**args)
  163. def archive(self):
  164. """
  165. This function will archive documents to the
  166. """
  167. collection = self.db[self.collection]
  168. rows = list(collection.find())
  169. for row in rows :
  170. if type(row['_id']) == ObjectId :
  171. row['_id'] = str(row['_id'])
  172. stream = Binary(json.dumps(collection,cls=IEncoder).encode())
  173. collection.delete_many({})
  174. now = "-".join([str(datetime.now().year()),str(datetime.now().month), str(datetime.now().day)])
  175. name = ".".join([self.collection,'archive',now])+".json"
  176. description = " ".join([self.collection,'archive',str(len(rows))])
  177. self.upload(filename=name,data=stream,description=description,content_type='application/json')
  178. # gfs = GridFS(self.db)
  179. # gfs.put(filename=name,description=description,data=stream,encoding='utf-8')
  180. # self.write({{"filename":name,"file":stream,"description":descriptions}})
  181. pass
  182. def write(self,info,**_args):
  183. """
  184. This function will write to a given collection i.e add a record to a collection (no updates)
  185. @param info new record in the collection to be added
  186. """
  187. # document = self.db[self.collection].find()
  188. #collection = self.db[self.collection]
  189. # if type(info) == list :
  190. # self.db[self.collection].insert_many(info)
  191. # else:
  192. try:
  193. if 'table' in _args or 'collection' in _args :
  194. _uid = _args['table'] if 'table' in _args else _args['collection']
  195. else:
  196. _uid = self.collection if 'doc' not in _args else _args['doc']
  197. if self._lock :
  198. Mongo.lock.acquire()
  199. if type(info) == list or type(info) == pd.DataFrame :
  200. if type(info) == pd.DataFrame :
  201. info = info.to_dict(orient='records')
  202. # info if type(info) == list else info.to_dict(orient='records')
  203. info = json.loads(json.dumps(info,cls=IEncoder))
  204. self.db[_uid].insert_many(info)
  205. else:
  206. #
  207. # sometimes a dictionary can have keys with arrays (odd shaped)
  208. #
  209. _keycount = len(info.keys())
  210. _arraycount = [len(info[key]) for key in info if type(info[key]) in (list,np.array,np.ndarray)]
  211. if _arraycount and len(_arraycount) == _keycount and np.max(_arraycount) == np.min(_arraycount) :
  212. #
  213. # In case an object with consistent structure is passed, we store it accordingly
  214. #
  215. self.write(pd.DataFrame(info),**_args)
  216. else:
  217. self.db[_uid].insert_one(json.loads(json.dumps(info,cls=IEncoder)))
  218. finally:
  219. if self._lock :
  220. Mongo.lock.release()
  221. def set(self,document):
  222. """
  223. if no identifier is provided the function will delete the entire collection and set the new document.
  224. Please use this function with great care (archive the content first before using it... for safety)
  225. """
  226. collection = self.db[self.collection]
  227. if collection.count_documents() > 0 and '_id' in document:
  228. id = document['_id']
  229. del document['_id']
  230. collection.find_one_and_replace({'_id':id},document)
  231. else:
  232. #
  233. # Nothing to be done if we did not find anything
  234. #
  235. pass
  236. # collection.delete_many({})
  237. # self.write(info)
  238. def close(self):
  239. Mongo.close(self)
  240. # collecton.update_one({"_id":self.collection},document,True)