Browse Source

bug fix:sqlitewriter

Steve Nyemba 4 years ago
parent
commit
617f829e1e
2 changed files with 85 additions and 6 deletions
  1. 1 1
      setup.py
  2. 84 5
      transport/disk.py

+ 1 - 1
setup.py

@@ -8,7 +8,7 @@ def read(fname):
     return open(os.path.join(os.path.dirname(__file__), fname)).read() 
 args    = {
     "name":"data-transport",
-    "version":"1.1.2",
+    "version":"1.1.6",
     "author":"The Phi Technology LLC","author_email":"info@the-phi.com",
     "license":"MIT",
     "packages":["transport"]}

+ 84 - 5
transport/disk.py

@@ -5,13 +5,15 @@ if sys.version_info[0] > 2 :
 else:
 	from common import Reader,Writer
 import json
+from threading import Lock
+import sqlite3
 
 class DiskReader(Reader) :
 	"""
 	This class is designed to read data from disk (location on hard drive)
 	@pre : isready() == True
 	"""
-
+	
 	def __init__(self,**params):
 		"""
 			@param	path	absolute path of the file to be read
@@ -40,12 +42,13 @@ class DiskReader(Reader) :
 			yield row
 		f.close()
 class DiskWriter(Writer):
+
 	"""
 		This function writes output to disk in a designated location. The function will write a text to a text file
 		- If a delimiter is provided it will use that to generate a xchar-delimited file
 		- If not then the object will be dumped as is
 	"""
-
+	THREAD_LOCK = Lock()
 	def __init__(self,**params):
 		Writer.__init__(self)
 		self.cache['meta'] = {'cols':0,'rows':0,'delimiter':None}
@@ -81,8 +84,84 @@ class DiskWriter(Writer):
 			@param	label	<passed|broken|fixed|stats>
 			@param	row	row to be written
 		"""
-		f = open(self.path,'a')
-		f.write(self.format(info))
-		f.close()
+		try:
+			DiskWriter.THREAD_LOCK.acquire()
+			f = open(self.path,'a')
+			if self.delimiter :
+				if type(info) == list :
+					for row in info :
+						f.write(self.format(row))
+				else:
+					f.write(self.format(info))
+			else:
+				if not type(info) == str :
+					f.write(json.dumps(info))
+				else:
+					f.write(info)
+			f.close()
+		except Exception as e:
+			#
+			# Not sure what should be done here ...
+			pass
+		finally:
+			DiskWriter.THREAD_LOCK.release()
 
+class SQLiteWriter(DiskWriter) :
+	def __init__(self,**args):
+		"""
+		:path
+		:fields json|csv
+		"""
+		DiskWriter.__init__(self,**args)
+		self.table = args['table']
+		
+		self.conn = sqlite3.connect(self.path,isolation_level=None)
+		self.conn.row_factory = sqlite3.Row
+		self.fields = args['fields'] if 'fields' in args else []
 		
+		if self.fields and not self.isready():
+			self.init(self.fields)
+			
+	def init(self,fields):
+		self.fields = fields;
+		sql = " ".join(["CREATE TABLE  IF NOT EXISTS ",self.table," (", ",".join(self.fields),")"])
+		
+		cursor = self.conn.cursor()
+		cursor.execute(sql)
+		cursor.close()
+		self.conn.commit()
+	def isready(self):
+		try:
+			sql = "SELECT count(*) FROM sqlite_master where name=':table'"
+			sql = sql.replace(":table",self.table)
+			cursor = self.conn.cursor()
+
+			r = cursor.execute(sql)
+			r = r.fetchall()
+			cursor.close()
+			
+			return r[0][0]
+		except Exception as e:
+			pass
+		return 0
+		#
+		# If the table doesn't exist we should create it
+		#
+	def write(self,info):
+		"""
+		"""
+		
+		if not self.fields :
+			self.init(list(info.keys()))
+		
+		if type(info) != list :
+			info = [info]
+		cursor = self.conn.cursor()
+		
+		
+		sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(':values')"])
+		for row in info :
+			cursor.execute(sql.replace(":values",json.dumps(row)))
+			# self.conn.commit()
+				# print (sql)
+