__init__.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. """
  2. Data Transport - 1.0
  3. Steve L. Nyemba, The Phi Technology LLC
  4. This module is designed to serve as a wrapper to a set of supported data stores :
  5. - couchdb
  6. - mongodb
  7. - Files (character delimited)
  8. - Queues (RabbmitMq)
  9. - Session (Flask)
  10. - s3
  11. The supported operations are read/write and providing meta data to the calling code
  12. Requirements :
  13. pymongo
  14. boto
  15. couldant
  16. The configuration for the data-store is as follows :
  17. couchdb:
  18. {
  19. args:{
  20. url:<url>,
  21. username:<username>,
  22. password:<password>,
  23. dbname:<database>,
  24. doc:<document id>
  25. }
  26. }
  27. RabbitMQ:
  28. {
  29. }
  30. Mongodb:
  31. {
  32. args:{
  33. host:<url>, #localhost:27017
  34. username:<username>,
  35. password:<password>,
  36. dbname:<database>,
  37. doc:<document id>s
  38. }
  39. }
  40. """
  41. __author__ = 'The Phi Technology'
  42. import numpy as np
  43. import json
  44. import importlib
  45. import sys
  46. if sys.version_info[0] > 2 :
  47. from transport.common import Reader, Writer #, factory
  48. from transport import disk
  49. from transport import queue as queue
  50. from transport import couch as couch
  51. from transport import mongo as mongo
  52. from transport import s3 as s3
  53. else:
  54. from common import Reader, Writer #, factory
  55. import disk
  56. import queue
  57. import couch
  58. import mongo
  59. import s3
  60. class factory :
  61. @staticmethod
  62. def instance(**args):
  63. """
  64. This class will create an instance of a transport when providing
  65. :type name of the type we are trying to create
  66. :args The arguments needed to create the instance
  67. """
  68. source = args['type']
  69. params = args['args']
  70. anObject = None
  71. if source in ['HttpRequestReader','HttpSessionWriter']:
  72. #
  73. # @TODO: Make sure objects are serializable, be smart about them !!
  74. #
  75. aClassName = ''.join([source,'(**params)'])
  76. else:
  77. stream = json.dumps(params)
  78. aClassName = ''.join([source,'(**',stream,')'])
  79. try:
  80. anObject = eval( aClassName)
  81. #setattr(anObject,'name',source)
  82. except Exception as e:
  83. print(['Error ',e])
  84. return anObject
  85. # class Reader:
  86. # def __init__(self):
  87. # self.nrows = 0
  88. # self.xchar = None
  89. # def row_count(self):
  90. # content = self.read()
  91. # return np.sum([1 for row in content])
  92. # def delimiter(self,sample):
  93. # """
  94. # This function determines the most common delimiter from a subset of possible delimiters.
  95. # It uses a statistical approach (distribution) to guage the distribution of columns for a given delimiter
  96. # :sample sample string/content expecting matrix i.e list of rows
  97. # """
  98. # m = {',':[],'\t':[],'|':[],'\x3A':[]}
  99. # delim = m.keys()
  100. # for row in sample:
  101. # for xchar in delim:
  102. # if row.split(xchar) > 1:
  103. # m[xchar].append(len(row.split(xchar)))
  104. # else:
  105. # m[xchar].append(0)
  106. # #
  107. # # The delimiter with the smallest variance, provided the mean is greater than 1
  108. # # This would be troublesome if there many broken records sampled
  109. # #
  110. # m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1}
  111. # index = m.values().index( min(m.values()))
  112. # xchar = m.keys()[index]
  113. # return xchar
  114. # def col_count(self,sample):
  115. # """
  116. # This function retirms the number of columns of a given sample
  117. # @pre self.xchar is not None
  118. # """
  119. # m = {}
  120. # i = 0
  121. # for row in sample:
  122. # row = self.format(row)
  123. # id = str(len(row))
  124. # #id = str(len(row.split(self.xchar)))
  125. # if id not in m:
  126. # m[id] = 0
  127. # m[id] = m[id] + 1
  128. # index = m.values().index( max(m.values()) )
  129. # ncols = int(m.keys()[index])
  130. # return ncols;
  131. # def format (self,row):
  132. # """
  133. # This function will clean records of a given row by removing non-ascii characters
  134. # @pre self.xchar is not None
  135. # """
  136. # if isinstance(row,list) == False:
  137. # #
  138. # # We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary)
  139. # cols = self.split(row)
  140. # #cols = row.split(self.xchar)
  141. # else:
  142. # cols = row ;
  143. # return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols]
  144. # def split (self,row):
  145. # """
  146. # This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes.
  147. # @pre : self.xchar is not None
  148. # """
  149. # pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"])
  150. # return re.findall(pattern,row.replace('\n',''))
  151. # class Writer:
  152. # def format(self,row,xchar):
  153. # if xchar is not None and isinstance(row,list):
  154. # return xchar.join(row)+'\n'
  155. # elif xchar is None and isinstance(row,dict):
  156. # row = json.dumps(row)
  157. # return row
  158. # """
  159. # It is important to be able to archive data so as to insure that growth is controlled
  160. # Nothing in nature grows indefinitely neither should data being handled.
  161. # """
  162. # def archive(self):
  163. # pass
  164. # def flush(self):
  165. # pass
  166. # class factory :
  167. # @staticmethod
  168. # def instance(**args):
  169. # source = args['type']
  170. # params = args['args']
  171. # anObject = None
  172. # if source in ['HttpRequestReader','HttpSessionWriter']:
  173. # #
  174. # # @TODO: Make sure objects are serializable, be smart about them !!
  175. # #
  176. # aClassName = ''.join([source,'(**params)'])
  177. # else:
  178. # stream = json.dumps(params)
  179. # aClassName = ''.join([source,'(**',stream,')'])
  180. # try:
  181. # anObject = eval( aClassName)
  182. # #setattr(anObject,'name',source)
  183. # except Exception,e:
  184. # print ['Error ',e]
  185. # return anObject