浏览代码

bug fix: ETL, Mongodb

Steve Nyemba 1 年之前
父节点
当前提交
e1763b1b19
共有 3 个文件被更改,包括 14 次插入3 次删除
  1. 7 1
      bin/transport
  2. 6 1
      transport/etl.py
  3. 1 1
      transport/nosql/mongodb.py

+ 7 - 1
bin/transport

@@ -62,8 +62,14 @@ def wait(jobs):
         time.sleep(1)
 
 @app.command(name="apply")
-def move (path,index=None):
+def apply (path,index=None):
+    """
+    This function applies data transport from one source to one or several others
 
+        :path   path of the configuration file
+        
+        :index  index of the _item of interest (otherwise everything will be processed)
+    """
     _proxy = lambda _object: _object.write(_object.read())
     if os.path.exists(path):
         file = open(path)

+ 6 - 1
transport/etl.py

@@ -83,7 +83,12 @@ class Transporter(Process):
         _reader = transport.factory.instance(**self._source)
         #
         # If arguments are provided then a query is to be executed (not just a table dump)
-        return _reader.read() if 'args' not in self._source else _reader.read(**self._source['args'])
+        if 'cmd' in self._source or 'query' in self._source :
+            _query = self._source['cmd'] if 'cmd' in self._source else self._source['query']
+            return _reader.read(**_query)
+        else:
+            return _reader.read()
+        # return _reader.read() if 'query' not in self._source else _reader.read(**self._source['query'])
         
     def _delegate_write(self,_data,**_args):
         """

+ 1 - 1
transport/nosql/mongodb.py

@@ -218,7 +218,7 @@ class Writer(Mongo):
                 if type(info) == pd.DataFrame :
                     info = info.to_dict(orient='records')
                 # info if type(info) == list else info.to_dict(orient='records')
-                info = json.loads(json.dumps(info)) 
+                info = json.loads(json.dumps(info,cls=IEncoder)) 
                 self.db[_uid].insert_many(info)
             else:
                 #