Browse Source

bug fixes

Steve Nyemba 1 year ago
parent
commit
4320159f3d
2 changed files with 8 additions and 23 deletions
  1. 7 22
      transport/disk.py
  2. 1 1
      transport/etl.py

+ 7 - 22
transport/disk.py

@@ -65,7 +65,7 @@ class DiskWriter(Writer):
 		super().__init__()
 		self._path = params['path']
 		self._delimiter = params['delimiter']
-		
+		self._mode = 'w' if 'mode' not in params else params['mode']
 	# def meta(self):
 	# 	return self.cache['meta']
 	# def isready(self):
@@ -89,28 +89,13 @@ class DiskWriter(Writer):
 		"""
 		try:
 			
-			_mode = 'a' if 'overwrite' not in _args else 'w'
-			DiskWriter.THREAD_LOCK.acquire()
-			# # _path = _args['path'] if 'path' in _args else self.path
-			# # _delim= _args['delimiter'] if 'delimiter' in _args else self._delimiter
-			# # info.to_csv(_path,sep=_delim)
-			# info.to_csv(self.path)
-			# f = open(self.path,_mode)
-			# if self.delimiter :
-			# 	if type(info) == list :
-			# 		for row in info :
-			# 			f.write(self.format(row))
-			# 	else:
-			# 		f.write(self.format(info))
-			# else:
-			# 	if not type(info) == str :
-			# 		f.write(json.dumps(info)+"\n")
-			# 	else:
-			# 		f.write(info)
-			# f.close()
+			
+			DiskWriter.THREAD_LOCK.acquire()		
+			
 			_delim = self._delimiter if 'delimiter' not in _args else _args['delimiter']
-			_path = self.path if 'path' not  in _args else _args['path']
-			info.to_csv(_path,index=False,sep=_delim)
+			_path = self._path if 'path' not  in _args else _args['path']
+			_mode = self._mode if 'mode' not in _args else _args['mode']
+			info.to_csv(_path,index=False,sep=_delim, mode=_mode)
 			pass
 		except Exception as e:
 			#

+ 1 - 1
transport/etl.py

@@ -116,7 +116,7 @@ class Transporter(Process):
         for _indexes in _segments :
             _fwd_args = {} if not _args else _args
             
-            self._delegate_write(_df.iloc[_indexes])
+            self._delegate_write(_df.iloc[_indexes],**_fwd_args)
             #
             # @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?)
             pass