Browse Source

bug fix: sqlite and cursors and transport

Steve Nyemba 1 year ago
parent
commit
2bb07aedec
3 changed files with 80 additions and 63 deletions
  1. 20 8
      bin/transport
  2. 45 42
      transport/disk.py
  3. 15 13
      transport/etl.py

+ 20 - 8
bin/transport

@@ -46,6 +46,7 @@ import time
 from multiprocessing import Process
 import typer
 import os
+import transport
 from transport import etl
 from transport import providers
 
@@ -88,7 +89,7 @@ def move (path,index=None):
             _config = _config[ int(index)]
             etl.instance(**_config)
         else:
-            etl.instance(_config)
+            etl.instance(config=_config)
                  
         #
         # if type(_config) == dict :
@@ -109,19 +110,30 @@ def move (path,index=None):
         #         jobs.append(thread())
         #         if _config.index(_args) == 0 :
         #             thread.join()
-            wait(jobs)
-
+            # wait(jobs)
+@app.command()
+def version():
+     print (transport.version.__version__)
 @app.command()
 def generate (path:str):
-	__doc__="""
-	
 	"""
-	_config = [{"source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"},"target":{"provider":"file","path":"addresses.csv","delimiter":"csv"}}]
+	This function will generate a configuration template to give a sense of how to create one
+	"""
+	_config = [
+          {
+               "source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"},
+             "target":
+            [{"provider":"file","path":"addresses.csv","delimiter":"csv"},{"provider":"sqlite","database":"sample.db3","table":"addresses"}]
+            }
+            ]
 	file = open(path,'w')
 	file.write(json.dumps(_config))
 	file.close()
-	
-# if __name__ == '__main__' :
+@app.command()
+def usage():
+     print (__doc__)	
+if __name__ == '__main__' :
+     app()
 # 	#
 # 	# Load information from the file ...
 # 	if 'help' in SYS_ARGS :

+ 45 - 42
transport/disk.py

@@ -62,34 +62,25 @@ class DiskWriter(Writer):
 	"""
 	THREAD_LOCK = Lock()
 	def __init__(self,**params):
-		Writer.__init__(self)
-		self.cache['meta'] = {'cols':0,'rows':0,'delimiter':None}
-		if 'path' in params:
-			self.path = params['path']
-		else:
-			self.path = 'data-transport.log'
-		self.delimiter = params['delimiter'] if 'delimiter' in params else None
-		# if 'name' in params:
-		# 	self.name = params['name'];
-		# else:
-		# 	self.name = 'data-transport.log'
-		# if os.path.exists(self.path) == False:
-		# 	os.mkdir(self.path)
-	def meta(self):
-		return self.cache['meta']
-	def isready(self):
-		"""
-			This function determines if the class is ready for execution or not
-			i.e it determines if the preconditions of met prior execution
-		"""
-		return True
-		# p =  self.path is not None and os.path.exists(self.path)
-		# q = self.name is not None 
-		# return p and q
-	def format (self,row):
-		self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys())
-		self.cache['meta']['rows'] += 1
-		return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n"
+		super().__init__()
+		self._path = params['path']
+		self._delimiter = params['delimiter']
+		
+	# def meta(self):
+	# 	return self.cache['meta']
+	# def isready(self):
+	# 	"""
+	# 		This function determines if the class is ready for execution or not
+	# 		i.e it determines if the preconditions of met prior execution
+	# 	"""
+	# 	return True
+	# 	# p =  self.path is not None and os.path.exists(self.path)
+	# 	# q = self.name is not None 
+	# 	# return p and q
+	# def format (self,row):
+	# 	self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys())
+	# 	self.cache['meta']['rows'] += 1
+	# 	return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n"
 	def write(self,info,**_args):
 		"""
 			This function writes a record to a designated file
@@ -97,21 +88,30 @@ class DiskWriter(Writer):
 			@param	row	row to be written
 		"""
 		try:
+			
 			_mode = 'a' if 'overwrite' not in _args else 'w'
 			DiskWriter.THREAD_LOCK.acquire()
-			f = open(self.path,_mode)
-			if self.delimiter :
-				if type(info) == list :
-					for row in info :
-						f.write(self.format(row))
-				else:
-					f.write(self.format(info))
-			else:
-				if not type(info) == str :
-					f.write(json.dumps(info)+"\n")
-				else:
-					f.write(info)
-			f.close()
+			# # _path = _args['path'] if 'path' in _args else self.path
+			# # _delim= _args['delimiter'] if 'delimiter' in _args else self._delimiter
+			# # info.to_csv(_path,sep=_delim)
+			# info.to_csv(self.path)
+			# f = open(self.path,_mode)
+			# if self.delimiter :
+			# 	if type(info) == list :
+			# 		for row in info :
+			# 			f.write(self.format(row))
+			# 	else:
+			# 		f.write(self.format(info))
+			# else:
+			# 	if not type(info) == str :
+			# 		f.write(json.dumps(info)+"\n")
+			# 	else:
+			# 		f.write(info)
+			# f.close()
+			_delim = self._delimiter if 'delimiter' not in _args else _args['delimiter']
+			_path = self.path if 'path' not  in _args else _args['path']
+			info.to_csv(_path,index=False,sep=_delim)
+			pass
 		except Exception as e:
 			#
 			# Not sure what should be done here ...
@@ -220,16 +220,19 @@ class SQLiteWriter(SQLite,DiskWriter) :
 		#
 		# If the table doesn't exist we should create it
 		#
-	def write(self,info):
+	def write(self,info,**_args):
 		"""
 		"""
 		
 		if not self.fields :
+			if type(info) == pd.DataFrame :
+				_columns = list(info.columns) 
 			self.init(list(info.keys()))
 		
 		if type(info) == dict :
 			info = [info]
 		elif type(info) == pd.DataFrame :
+			info = info.fillna('')
 			info = info.to_dict(orient='records')
 		
 		SQLiteWriter.LOCK.acquire()

+ 15 - 13
transport/etl.py

@@ -70,7 +70,7 @@ class Transporter(Process):
         # self._onerror = _args['onError']
         self._source = _args['source']
         self._target = _args['target']
-
+        
         #
         # Let's insure we can support multiple targets
         self._target = [self._target] if type(self._target) != list else self._target
@@ -90,16 +90,18 @@ class Transporter(Process):
         This function will write a data-frame to a designated data-store, The function is built around a delegation design pattern
         :data   data-frame or object to be written
         """
-        for _target in self._target :
-            if 'write' not in _target :
-                _target['context'] = 'write'
-                _target['lock'] = True
-            else:
-                _target['write']['lock'] = True
-            _writer = transport.factory.instance(**_target)
-            _writer.write(_data,**_args)
-            if hasattr(_writer,'close') :
-                _writer.close()
+        if _data.shape[0] > 0 :
+            for _target in self._target :
+                if 'write' not in _target :
+                    _target['context'] = 'write'
+                    # _target['lock'] = True
+                else:
+                    # _target['write']['lock'] = True
+                    pass
+                _writer = transport.factory.instance(**_target)
+                _writer.write(_data.copy(),**_args)
+                if hasattr(_writer,'close') :
+                    _writer.close()
         
     def write(self,_df,**_args):
         """
@@ -109,12 +111,12 @@ class Transporter(Process):
         # _df = self.read()
         _segments = np.array_split(np.range(_df.shape[0]),SEGMENT_COUNT) if _df.shape[0] > MAX_ROWS else np.array( [np.arange(_df.shape[0])])
         # _index = 0
-       
+        
         
         for _indexes in _segments :
             _fwd_args = {} if not _args else _args
             
-            self._delegate_write(_df.iloc[_indexes],**_fwd_args)
+            self._delegate_write(_df.iloc[_indexes])
             #
             # @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?)
             pass