mongo.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. """
  2. Data Transport - 1.0
  3. Steve L. Nyemba, The Phi Technology LLC
  4. This file is a wrapper around mongodb for reading/writing content against a mongodb server and executing views (mapreduce)
  5. """
  6. from pymongo import MongoClient
  7. from bson.objectid import ObjectId
  8. from bson.binary import Binary
  9. import json
  10. from datetime import datetime
  11. import pandas as pd
  12. import gridfs
  13. # from transport import Reader,Writer
  14. import sys
  15. if sys.version_info[0] > 2 :
  16. from transport.common import Reader, Writer
  17. else:
  18. from common import Reader, Writer
  19. import json
  20. import re
  21. from multiprocessing import Lock, RLock
  22. class Mongo :
  23. lock = RLock()
  24. """
  25. Basic mongodb functions are captured here
  26. """
  27. def __init__(self,**args):
  28. """
  29. :dbname database name/identifier
  30. :host host and port of the database by default localhost:27017
  31. :username username for authentication
  32. :password password for current user
  33. """
  34. host = args['host'] if 'host' in args else 'localhost:27017'
  35. if 'user' in args and 'password' in args:
  36. self.client = MongoClient(host,
  37. username=args['username'] ,
  38. password=args['password'] ,
  39. authMechanism='SCRAM-SHA-256')
  40. else:
  41. self.client = MongoClient(host,maxPoolSize=10000)
  42. self.uid = args['doc'] #-- document identifier
  43. self.dbname = args['dbname'] if 'dbname' in args else args['db']
  44. self.db = self.client[self.dbname]
  45. self._lock = False if 'lock' not in args else args['lock']
  46. def isready(self):
  47. p = self.dbname in self.client.list_database_names()
  48. q = self.uid in self.client[self.dbname].list_collection_names()
  49. return p and q
  50. def close(self):
  51. self.client.close()
  52. class MongoReader(Mongo,Reader):
  53. """
  54. This class will read from a mongodb data store and return the content of a document (not a collection)
  55. """
  56. def __init__(self,**args):
  57. Mongo.__init__(self,**args)
  58. def read(self,**args):
  59. if 'mongo' in args :
  60. #
  61. # @TODO:
  62. cmd = args['mongo']
  63. r = []
  64. out = self.db.command(cmd)
  65. #@TODO: consider using a yield (generator) works wonders
  66. while True :
  67. if 'values' in out :
  68. r += out['values']
  69. if 'cursor' in out :
  70. key = 'firstBatch' if 'firstBatch' in out['cursor'] else 'nextBatch'
  71. else:
  72. key = 'n'
  73. if 'cursor' in out and out['cursor'][key] :
  74. r += list(out['cursor'][key])
  75. elif key in out and out[key]:
  76. r.append (out[key])
  77. # yield out['cursor'][key]
  78. if key not in ['firstBatch','nextBatch'] or ('cursor' in out and out['cursor']['id'] == 0) :
  79. break
  80. else:
  81. out = self.db.command({"getMore":out['cursor']['id'],"collection":out['cursor']['ns'].split(".")[-1]})
  82. return pd.DataFrame(r)
  83. else:
  84. collection = self.db[self.uid]
  85. _filter = args['filter'] if 'filter' in args else {}
  86. return collection.find(_filter)
  87. def view(self,**args):
  88. """
  89. This function is designed to execute a view (map/reduce) operation
  90. """
  91. pass
  92. class MongoWriter(Mongo,Writer):
  93. """
  94. This class is designed to write to a mongodb collection within a database
  95. """
  96. def __init__(self,**args):
  97. Mongo.__init__(self,**args)
  98. def upload(self,**args) :
  99. """
  100. This function will upload a file to the current database (using GridFS)
  101. :param data binary stream/text to be stored
  102. :param filename filename to be used
  103. :param encoding content_encoding (default utf-8)
  104. """
  105. if 'encoding' not in args :
  106. args['encoding'] = 'utf-8'
  107. gfs = GridFS(self.db)
  108. gfs.put(**args)
  109. def archive(self):
  110. """
  111. This function will archive documents to the
  112. """
  113. collection = self.db[self.uid]
  114. rows = list(collection.find())
  115. for row in rows :
  116. if type(row['_id']) == ObjectId :
  117. row['_id'] = str(row['_id'])
  118. stream = Binary(json.dumps(collection).encode())
  119. collection.delete_many({})
  120. now = "-".join([str(datetime.now().year()),str(datetime.now().month), str(datetime.now().day)])
  121. name = ".".join([self.uid,'archive',now])+".json"
  122. description = " ".join([self.uid,'archive',str(len(rows))])
  123. self.upload(filename=name,data=stream,description=description,content_type='application/json')
  124. # gfs = GridFS(self.db)
  125. # gfs.put(filename=name,description=description,data=stream,encoding='utf-8')
  126. # self.write({{"filename":name,"file":stream,"description":descriptions}})
  127. pass
  128. def write(self,info):
  129. """
  130. This function will write to a given collection i.e add a record to a collection (no updates)
  131. @param info new record in the collection to be added
  132. """
  133. # document = self.db[self.uid].find()
  134. collection = self.db[self.uid]
  135. # if type(info) == list :
  136. # self.db[self.uid].insert_many(info)
  137. # else:
  138. try:
  139. if self._lock :
  140. Mongo.lock.acquire()
  141. if type(info) == list or type(info) == pd.DataFrame :
  142. self.db[self.uid].insert_many(info if type(info) == list else info.to_dict(orient='records'))
  143. else:
  144. self.db[self.uid].insert_one(info)
  145. finally:
  146. if self._lock :
  147. Mongo.lock.release()
  148. def set(self,document):
  149. """
  150. if no identifier is provided the function will delete the entire collection and set the new document.
  151. Please use this function with great care (archive the content first before using it... for safety)
  152. """
  153. collection = self.db[self.uid]
  154. if collection.count_document() > 0 and '_id' in document:
  155. id = document['_id']
  156. del document['_id']
  157. collection.find_one_and_replace({'_id':id},document)
  158. else:
  159. collection.delete_many({})
  160. self.write(info)
  161. def close(self):
  162. Mongo.close(self)
  163. # collecton.update_one({"_id":self.uid},document,True)