disk.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. import os
  2. import sys
  3. if sys.version_info[0] > 2 :
  4. from transport.common import Reader, Writer #, factory
  5. else:
  6. from common import Reader,Writer
  7. # import nujson as json
  8. import json
  9. # from threading import Lock
  10. import sqlite3
  11. import pandas as pd
  12. from multiprocessing import Lock
  13. class DiskReader(Reader) :
  14. """
  15. This class is designed to read data from disk (location on hard drive)
  16. @pre : isready() == True
  17. """
  18. def __init__(self,**params):
  19. """
  20. @param path absolute path of the file to be read
  21. """
  22. Reader.__init__(self)
  23. self.path = params['path'] if 'path' in params else None
  24. self.delimiter = params['delimiter'] if 'delimiter' in params else ','
  25. def isready(self):
  26. return os.path.exists(self.path)
  27. def meta(self,**_args):
  28. return []
  29. def read(self,**args):
  30. _path = self.path if 'path' not in args else args['path']
  31. _delimiter = self.delimiter if 'delimiter' not in args else args['delimiter']
  32. return pd.read_csv(_path,delimiter=self.delimiter)
  33. def stream(self,**args):
  34. """
  35. This function reads the rows from a designated location on disk
  36. @param size number of rows to be read, -1 suggests all rows
  37. """
  38. size = -1 if 'size' not in args else int(args['size'])
  39. f = open(self.path,'rU')
  40. i = 1
  41. for row in f:
  42. i += 1
  43. if size == i:
  44. break
  45. if self.delimiter :
  46. yield row.split(self.delimiter)
  47. yield row
  48. f.close()
  49. class DiskWriter(Writer):
  50. """
  51. This function writes output to disk in a designated location. The function will write a text to a text file
  52. - If a delimiter is provided it will use that to generate a xchar-delimited file
  53. - If not then the object will be dumped as is
  54. """
  55. THREAD_LOCK = Lock()
  56. def __init__(self,**params):
  57. Writer.__init__(self)
  58. self.cache['meta'] = {'cols':0,'rows':0,'delimiter':None}
  59. if 'path' in params:
  60. self.path = params['path']
  61. else:
  62. self.path = 'data-transport.log'
  63. self.delimiter = params['delimiter'] if 'delimiter' in params else None
  64. # if 'name' in params:
  65. # self.name = params['name'];
  66. # else:
  67. # self.name = 'data-transport.log'
  68. # if os.path.exists(self.path) == False:
  69. # os.mkdir(self.path)
  70. def meta(self):
  71. return self.cache['meta']
  72. def isready(self):
  73. """
  74. This function determines if the class is ready for execution or not
  75. i.e it determines if the preconditions of met prior execution
  76. """
  77. return True
  78. # p = self.path is not None and os.path.exists(self.path)
  79. # q = self.name is not None
  80. # return p and q
  81. def format (self,row):
  82. self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys())
  83. self.cache['meta']['rows'] += 1
  84. return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n"
  85. def write(self,info,**_args):
  86. """
  87. This function writes a record to a designated file
  88. @param label <passed|broken|fixed|stats>
  89. @param row row to be written
  90. """
  91. try:
  92. _mode = 'a' if 'overwrite' not in _args else 'w'
  93. DiskWriter.THREAD_LOCK.acquire()
  94. f = open(self.path,_mode)
  95. if self.delimiter :
  96. if type(info) == list :
  97. for row in info :
  98. f.write(self.format(row))
  99. else:
  100. f.write(self.format(info))
  101. else:
  102. if not type(info) == str :
  103. f.write(json.dumps(info)+"\n")
  104. else:
  105. f.write(info)
  106. f.close()
  107. except Exception as e:
  108. #
  109. # Not sure what should be done here ...
  110. pass
  111. finally:
  112. DiskWriter.THREAD_LOCK.release()
  113. class SQLite :
  114. def __init__(self,**_args) :
  115. self.path = _args['database'] if 'database' in _args else _args['path']
  116. self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE")
  117. self.conn.row_factory = sqlite3.Row
  118. self.fields = _args['fields'] if 'fields' in _args else []
  119. def has (self,**_args):
  120. found = False
  121. try:
  122. if 'table' in _args :
  123. table = _args['table']
  124. sql = "SELECT * FROM :table limit 1".replace(":table",table)
  125. _df = pd.read_sql(sql,self.conn)
  126. found = _df.columns.size > 0
  127. except Exception as e:
  128. pass
  129. return found
  130. def close(self):
  131. try:
  132. self.conn.close()
  133. except Exception as e :
  134. print(e)
  135. def apply(self,sql):
  136. try:
  137. if not sql.lower().startswith('select'):
  138. cursor = self.conn.cursor()
  139. cursor.execute(sql)
  140. cursor.close()
  141. self.conn.commit()
  142. else:
  143. return pd.read_sql(sql,self.conn)
  144. except Exception as e:
  145. print (e)
  146. class SQLiteReader (SQLite,DiskReader):
  147. def __init__(self,**args):
  148. super().__init__(**args)
  149. # DiskReader.__init__(self,**args)
  150. # self.path = args['database'] if 'database' in args else args['path']
  151. # self.conn = sqlite3.connect(self.path,isolation_level=None)
  152. # self.conn.row_factory = sqlite3.Row
  153. self.table = args['table'] if 'table' in args else None
  154. def read(self,**args):
  155. if 'sql' in args :
  156. sql = args['sql']
  157. elif 'filter' in args :
  158. sql = "SELECT :fields FROM ",self.table, "WHERE (:filter)".replace(":filter",args['filter'])
  159. sql = sql.replace(":fields",args['fields']) if 'fields' in args else sql.replace(":fields","*")
  160. else:
  161. sql = ' '.join(['SELECT * FROM ',self.table])
  162. if 'limit' in args :
  163. sql = sql + " LIMIT "+args['limit']
  164. return pd.read_sql(sql,self.conn)
  165. def close(self):
  166. try:
  167. self.conn.close()
  168. except Exception as e :
  169. pass
  170. class SQLiteWriter(SQLite,DiskWriter) :
  171. connection = None
  172. LOCK = Lock()
  173. def __init__(self,**args):
  174. """
  175. :path
  176. :fields json|csv
  177. """
  178. # DiskWriter.__init__(self,**args)
  179. super().__init__(**args)
  180. self.table = args['table'] if 'table' in args else None
  181. # self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE")
  182. # self.conn.row_factory = sqlite3.Row
  183. # self.fields = args['fields'] if 'fields' in args else []
  184. if self.fields and not self.isready() and self.table:
  185. self.init(self.fields)
  186. SQLiteWriter.connection = self.conn
  187. def init(self,fields):
  188. self.fields = fields;
  189. sql = " ".join(["CREATE TABLE IF NOT EXISTS ",self.table," (", ",".join(self.fields),")"])
  190. cursor = self.conn.cursor()
  191. cursor.execute(sql)
  192. cursor.close()
  193. self.conn.commit()
  194. def isready(self):
  195. try:
  196. sql = "SELECT count(*) FROM sqlite_master where name=':table'"
  197. sql = sql.replace(":table",self.table)
  198. cursor = self.conn.cursor()
  199. r = cursor.execute(sql)
  200. r = r.fetchall()
  201. cursor.close()
  202. return r[0][0] != 0
  203. except Exception as e:
  204. pass
  205. return 0
  206. #
  207. # If the table doesn't exist we should create it
  208. #
  209. def write(self,info):
  210. """
  211. """
  212. if not self.fields :
  213. self.init(list(info.keys()))
  214. if type(info) == dict :
  215. info = [info]
  216. elif type(info) == pd.DataFrame :
  217. info = info.to_dict(orient='records')
  218. SQLiteWriter.LOCK.acquire()
  219. try:
  220. cursor = self.conn.cursor()
  221. sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(:values)"])
  222. for row in info :
  223. stream =["".join(["",value,""]) if type(value) == str else value for value in row.values()]
  224. stream = json.dumps(stream).replace("[","").replace("]","")
  225. self.conn.execute(sql.replace(":values",stream) )
  226. # cursor.commit()
  227. self.conn.commit()
  228. # print (sql)
  229. except Exception as e :
  230. print (e)
  231. pass
  232. SQLiteWriter.LOCK.release()