浏览代码

bug fix, enhancement with pandas

Steve Nyemba 3 年之前
父节点
当前提交
79cdc0c0d0
共有 4 个文件被更改,包括 17 次插入7 次删除
  1. 3 2
      bin/transport
  2. 1 1
      setup.py
  3. 2 2
      transport/mongo.py
  4. 11 2
      transport/sql.py

+ 3 - 2
bin/transport

@@ -41,6 +41,7 @@ class Post(Process):
 		self.rows 	=	 args['rows']
 	def run(self):
 		_info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows
+		
 		self.writer.write(_info)
 		self.writer.close()
 
@@ -70,7 +71,7 @@ class ETL (Process):
 			rows = np.array_split(np.arange(idf.shape[0]),self.JOB_COUNT)
 			jobs = []
 			for i in rows :
-				segment = idf.loc[i,:].to_dict(orient='records')
+				segment = idf.loc[i,:] #.to_dict(orient='records')
 				proc = Post(target = self._oargs,rows = segment)
 				jobs.append(proc)
 				proc.start()
@@ -89,6 +90,6 @@ if __name__ == '__main__' :
 		if 'source' in SYS_ARGS :
 			_config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}}
 
-		_config['jobs']  = 10 if 'jobs' not in SYS_ARGS else SYS_ARGS['jobs']
+		_config['jobs']  = 10 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs'])
 		etl = ETL (**_config)
 		etl.start()

+ 1 - 1
setup.py

@@ -8,7 +8,7 @@ def read(fname):
     return open(os.path.join(os.path.dirname(__file__), fname)).read() 
 args    = {
     "name":"data-transport",
-    "version":"1.3.9.0",
+    "version":"1.3.9.2",
     "author":"The Phi Technology LLC","author_email":"info@the-phi.com",
     "license":"MIT",
     "packages":["transport"]}

+ 2 - 2
transport/mongo.py

@@ -142,8 +142,8 @@ class MongoWriter(Mongo,Writer):
         # if type(info) == list :
         #     self.db[self.uid].insert_many(info)
         # else:
-        if (type(info) == list) :
-            self.db[self.uid].insert_many(info)
+        if type(info) == list or type(info) == pd.DataFrame :
+            self.db[self.uid].insert_many(info if type(info) == list else info.to_dict(orient='records'))
         else:
             self.db[self.uid].insert_one(info)
     def set(self,document):

+ 11 - 2
transport/sql.py

@@ -157,14 +157,23 @@ class SQLWriter(SQLRW,Writer):
         # inspect = False if 'inspect' not in _args else _args['inspect']
         # cast = False if 'cast' not in _args else _args['cast']
         if not self.fields :
-            _fields = info.keys() if type(info) == dict else info[0].keys()
+            if type(info) == list :
+                _fields = info[0].keys()
+            elif type(info) == dict :
+                _fields = info.keys()
+            elif type(info) == pd.DataFrame :
+                _fields = info.columns
+
+            # _fields = info.keys() if type(info) == dict else info[0].keys()
             _fields = list (_fields)
             self.init(_fields)
         #
         # @TODO: Use pandas/odbc ? Not sure b/c it requires sqlalchemy
         #
         if type(info) != list :
-            info = [info]        
+            #
+            # We are assuming 2 cases i.e dict or pd.DataFrame
+            info = [info]  if type(info) == dict else info.values.tolist()       
         cursor = self.conn.cursor()
         try:
             _sql = "INSERT INTO :table (:fields) VALUES (:values)".replace(":table",self.table) #.replace(":table",self.table).replace(":fields",_fields)