disk.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. import os
  2. import sys
  3. if sys.version_info[0] > 2 :
  4. from transport.common import Reader, Writer #, factory
  5. else:
  6. from common import Reader,Writer
  7. # import nujson as json
  8. import json
  9. # from threading import Lock
  10. import sqlite3
  11. import pandas as pd
  12. from multiprocessing import Lock
  13. from transport.common import Reader, Writer, IEncoder
  14. import sqlalchemy
  15. from sqlalchemy import create_engine
  16. class DiskReader(Reader) :
  17. """
  18. This class is designed to read data from disk (location on hard drive)
  19. @pre : isready() == True
  20. """
  21. def __init__(self,**params):
  22. """
  23. @param path absolute path of the file to be read
  24. """
  25. Reader.__init__(self)
  26. self.path = params['path'] if 'path' in params else None
  27. self.delimiter = params['delimiter'] if 'delimiter' in params else ','
  28. def isready(self):
  29. return os.path.exists(self.path)
  30. def meta(self,**_args):
  31. return []
  32. def read(self,**args):
  33. _path = self.path if 'path' not in args else args['path']
  34. _delimiter = self.delimiter if 'delimiter' not in args else args['delimiter']
  35. return pd.read_csv(_path,delimiter=self.delimiter)
  36. def stream(self,**args):
  37. """
  38. This function reads the rows from a designated location on disk
  39. @param size number of rows to be read, -1 suggests all rows
  40. """
  41. size = -1 if 'size' not in args else int(args['size'])
  42. f = open(self.path,'rU')
  43. i = 1
  44. for row in f:
  45. i += 1
  46. if size == i:
  47. break
  48. if self.delimiter :
  49. yield row.split(self.delimiter)
  50. yield row
  51. f.close()
  52. class DiskWriter(Writer):
  53. """
  54. This function writes output to disk in a designated location. The function will write a text to a text file
  55. - If a delimiter is provided it will use that to generate a xchar-delimited file
  56. - If not then the object will be dumped as is
  57. """
  58. THREAD_LOCK = Lock()
  59. def __init__(self,**params):
  60. super().__init__()
  61. self._path = params['path']
  62. self._delimiter = params['delimiter'] if 'delimiter' in params else None
  63. self._mode = 'w' if 'mode' not in params else params['mode']
  64. # def meta(self):
  65. # return self.cache['meta']
  66. # def isready(self):
  67. # """
  68. # This function determines if the class is ready for execution or not
  69. # i.e it determines if the preconditions of met prior execution
  70. # """
  71. # return True
  72. # # p = self.path is not None and os.path.exists(self.path)
  73. # # q = self.name is not None
  74. # # return p and q
  75. # def format (self,row):
  76. # self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys())
  77. # self.cache['meta']['rows'] += 1
  78. # return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n"
  79. def write(self,info,**_args):
  80. """
  81. This function writes a record to a designated file
  82. @param label <passed|broken|fixed|stats>
  83. @param row row to be written
  84. """
  85. try:
  86. DiskWriter.THREAD_LOCK.acquire()
  87. _delim = self._delimiter if 'delimiter' not in _args else _args['delimiter']
  88. _path = self._path if 'path' not in _args else _args['path']
  89. _mode = self._mode if 'mode' not in _args else _args['mode']
  90. info.to_csv(_path,index=False,sep=_delim)
  91. pass
  92. except Exception as e:
  93. #
  94. # Not sure what should be done here ...
  95. pass
  96. finally:
  97. DiskWriter.THREAD_LOCK.release()
  98. class SQLite :
  99. def __init__(self,**_args) :
  100. self.path = _args['database'] if 'database' in _args else _args['path']
  101. self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE")
  102. self.conn.row_factory = sqlite3.Row
  103. self.fields = _args['fields'] if 'fields' in _args else []
  104. def has (self,**_args):
  105. found = False
  106. try:
  107. if 'table' in _args :
  108. table = _args['table']
  109. sql = "SELECT * FROM :table limit 1".replace(":table",table)
  110. _df = pd.read_sql(sql,self.conn)
  111. found = _df.columns.size > 0
  112. except Exception as e:
  113. pass
  114. return found
  115. def close(self):
  116. try:
  117. self.conn.close()
  118. except Exception as e :
  119. print(e)
  120. def apply(self,sql):
  121. try:
  122. if not sql.lower().startswith('select'):
  123. cursor = self.conn.cursor()
  124. cursor.execute(sql)
  125. cursor.close()
  126. self.conn.commit()
  127. else:
  128. return pd.read_sql(sql,self.conn)
  129. except Exception as e:
  130. print (e)
  131. class SQLiteReader (SQLite,DiskReader):
  132. def __init__(self,**args):
  133. super().__init__(**args)
  134. # DiskReader.__init__(self,**args)
  135. # self.path = args['database'] if 'database' in args else args['path']
  136. # self.conn = sqlite3.connect(self.path,isolation_level=None)
  137. # self.conn.row_factory = sqlite3.Row
  138. self.table = args['table'] if 'table' in args else None
  139. def read(self,**args):
  140. if 'sql' in args :
  141. sql = args['sql']
  142. elif 'filter' in args :
  143. sql = "SELECT :fields FROM ",self.table, "WHERE (:filter)".replace(":filter",args['filter'])
  144. sql = sql.replace(":fields",args['fields']) if 'fields' in args else sql.replace(":fields","*")
  145. else:
  146. sql = ' '.join(['SELECT * FROM ',self.table])
  147. if 'limit' in args :
  148. sql = sql + " LIMIT "+args['limit']
  149. return pd.read_sql(sql,self.conn)
  150. def close(self):
  151. try:
  152. self.conn.close()
  153. except Exception as e :
  154. pass
  155. class SQLiteWriter(SQLite,DiskWriter) :
  156. connection = None
  157. LOCK = Lock()
  158. def __init__(self,**args):
  159. """
  160. :path
  161. :fields json|csv
  162. """
  163. # DiskWriter.__init__(self,**args)
  164. super().__init__(**args)
  165. self.table = args['table'] if 'table' in args else None
  166. path = self.path
  167. self._engine = create_engine(f'sqlite:///{path}')
  168. # self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE")
  169. # self.conn.row_factory = sqlite3.Row
  170. # self.fields = args['fields'] if 'fields' in args else []
  171. if self.fields and not self.isready() and self.table:
  172. self.init(self.fields)
  173. SQLiteWriter.connection = self.conn
  174. def init(self,fields):
  175. self.fields = fields;
  176. sql = " ".join(["CREATE TABLE IF NOT EXISTS ",self.table," (", ",".join(self.fields),")"])
  177. cursor = self.conn.cursor()
  178. cursor.execute(sql)
  179. cursor.close()
  180. self.conn.commit()
  181. def isready(self):
  182. try:
  183. sql = "SELECT count(*) FROM sqlite_master where name=':table'"
  184. sql = sql.replace(":table",self.table)
  185. cursor = self.conn.cursor()
  186. r = cursor.execute(sql)
  187. r = r.fetchall()
  188. cursor.close()
  189. return r[0][0] != 0
  190. except Exception as e:
  191. pass
  192. return 0
  193. #
  194. # If the table doesn't exist we should create it
  195. #
  196. def write(self,_data,**_args):
  197. SQLiteWriter.LOCK.acquire()
  198. try:
  199. if type(_data) == dict :
  200. _data = [_data]
  201. _table = self.table if 'table' not in _args else _args['table']
  202. _df = pd.DataFrame(_data)
  203. _df.to_sql(_table,self._engine.connect(),if_exists='append',index=False)
  204. except Exception as e:
  205. print (e)
  206. SQLiteWriter.LOCK.release()
  207. def _write(self,info,**_args):
  208. """
  209. """
  210. #if not self.fields :
  211. # #if type(info) == pd.DataFrame :
  212. # # _columns = list(info.columns)
  213. # #self.init(list(info.keys()))
  214. if type(info) == dict :
  215. info = [info]
  216. elif type(info) == pd.DataFrame :
  217. info = info.fillna('')
  218. info = info.to_dict(orient='records')
  219. if not self.fields :
  220. _rec = info[0]
  221. self.init(list(_rec.keys()))
  222. SQLiteWriter.LOCK.acquire()
  223. try:
  224. cursor = self.conn.cursor()
  225. sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(:values)"])
  226. for row in info :
  227. stream =["".join(["",value,""]) if type(value) == str else value for value in row.values()]
  228. stream = json.dumps(stream,cls=IEncoder)
  229. stream = stream.replace("[","").replace("]","")
  230. self.conn.execute(sql.replace(":values",stream) )
  231. # cursor.commit()
  232. self.conn.commit()
  233. # print (sql)
  234. except Exception as e :
  235. print (e)
  236. pass
  237. SQLiteWriter.LOCK.release()