__init__.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. """
  2. Data Transport - 1.0
  3. Steve L. Nyemba, The Phi Technology LLC
  4. This module is designed to serve as a wrapper to a set of supported data stores :
  5. - couchdb
  6. - mongodb
  7. - Files (character delimited)
  8. - Queues (RabbmitMq)
  9. - Session (Flask)
  10. - s3
  11. The supported operations are read/write and providing meta data to the calling code
  12. Requirements :
  13. pymongo
  14. boto
  15. couldant
  16. The configuration for the data-store is as follows :
  17. couchdb:
  18. {
  19. args:{
  20. url:<url>,
  21. username:<username>,
  22. password:<password>,
  23. dbname:<database>,
  24. doc:<document id>
  25. }
  26. }
  27. RabbitMQ:
  28. {
  29. }
  30. Mongodb:
  31. {
  32. args:{
  33. host:<url>, #localhost:27017
  34. username:<username>,
  35. password:<password>,
  36. dbname:<database>,
  37. doc:<document id>s
  38. }
  39. }
  40. """
  41. __author__ = 'The Phi Technology'
  42. import numpy as np
  43. import json
  44. import importlib
  45. from common import Reader, Writer #, factory
  46. import disk
  47. import queue
  48. import couch
  49. import mongo
  50. import s3
  51. class factory :
  52. @staticmethod
  53. def instance(**args):
  54. """
  55. This class will create an instance of a transport when providing
  56. :type name of the type we are trying to create
  57. :args The arguments needed to create the instance
  58. """
  59. source = args['type']
  60. params = args['args']
  61. anObject = None
  62. if source in ['HttpRequestReader','HttpSessionWriter']:
  63. #
  64. # @TODO: Make sure objects are serializable, be smart about them !!
  65. #
  66. aClassName = ''.join([source,'(**params)'])
  67. else:
  68. stream = json.dumps(params)
  69. aClassName = ''.join([source,'(**',stream,')'])
  70. try:
  71. anObject = eval( aClassName)
  72. #setattr(anObject,'name',source)
  73. except Exception,e:
  74. print ['Error ',e]
  75. return anObject
  76. # class Reader:
  77. # def __init__(self):
  78. # self.nrows = 0
  79. # self.xchar = None
  80. # def row_count(self):
  81. # content = self.read()
  82. # return np.sum([1 for row in content])
  83. # def delimiter(self,sample):
  84. # """
  85. # This function determines the most common delimiter from a subset of possible delimiters.
  86. # It uses a statistical approach (distribution) to guage the distribution of columns for a given delimiter
  87. # :sample sample string/content expecting matrix i.e list of rows
  88. # """
  89. # m = {',':[],'\t':[],'|':[],'\x3A':[]}
  90. # delim = m.keys()
  91. # for row in sample:
  92. # for xchar in delim:
  93. # if row.split(xchar) > 1:
  94. # m[xchar].append(len(row.split(xchar)))
  95. # else:
  96. # m[xchar].append(0)
  97. # #
  98. # # The delimiter with the smallest variance, provided the mean is greater than 1
  99. # # This would be troublesome if there many broken records sampled
  100. # #
  101. # m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1}
  102. # index = m.values().index( min(m.values()))
  103. # xchar = m.keys()[index]
  104. # return xchar
  105. # def col_count(self,sample):
  106. # """
  107. # This function retirms the number of columns of a given sample
  108. # @pre self.xchar is not None
  109. # """
  110. # m = {}
  111. # i = 0
  112. # for row in sample:
  113. # row = self.format(row)
  114. # id = str(len(row))
  115. # #id = str(len(row.split(self.xchar)))
  116. # if id not in m:
  117. # m[id] = 0
  118. # m[id] = m[id] + 1
  119. # index = m.values().index( max(m.values()) )
  120. # ncols = int(m.keys()[index])
  121. # return ncols;
  122. # def format (self,row):
  123. # """
  124. # This function will clean records of a given row by removing non-ascii characters
  125. # @pre self.xchar is not None
  126. # """
  127. # if isinstance(row,list) == False:
  128. # #
  129. # # We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary)
  130. # cols = self.split(row)
  131. # #cols = row.split(self.xchar)
  132. # else:
  133. # cols = row ;
  134. # return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols]
  135. # def split (self,row):
  136. # """
  137. # This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes.
  138. # @pre : self.xchar is not None
  139. # """
  140. # pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"])
  141. # return re.findall(pattern,row.replace('\n',''))
  142. # class Writer:
  143. # def format(self,row,xchar):
  144. # if xchar is not None and isinstance(row,list):
  145. # return xchar.join(row)+'\n'
  146. # elif xchar is None and isinstance(row,dict):
  147. # row = json.dumps(row)
  148. # return row
  149. # """
  150. # It is important to be able to archive data so as to insure that growth is controlled
  151. # Nothing in nature grows indefinitely neither should data being handled.
  152. # """
  153. # def archive(self):
  154. # pass
  155. # def flush(self):
  156. # pass
  157. # class factory :
  158. # @staticmethod
  159. # def instance(**args):
  160. # source = args['type']
  161. # params = args['args']
  162. # anObject = None
  163. # if source in ['HttpRequestReader','HttpSessionWriter']:
  164. # #
  165. # # @TODO: Make sure objects are serializable, be smart about them !!
  166. # #
  167. # aClassName = ''.join([source,'(**params)'])
  168. # else:
  169. # stream = json.dumps(params)
  170. # aClassName = ''.join([source,'(**',stream,')'])
  171. # try:
  172. # anObject = eval( aClassName)
  173. # #setattr(anObject,'name',source)
  174. # except Exception,e:
  175. # print ['Error ',e]
  176. # return anObject