Browse Source

nextcloud handling

Steve Nyemba 1 year ago
parent
commit
5660d8ba59
4 changed files with 90 additions and 8 deletions
  1. 1 1
      transport/disk.py
  2. 76 0
      transport/nextcloud.py
  3. 12 6
      transport/providers.py
  4. 1 1
      transport/version.py

+ 1 - 1
transport/disk.py

@@ -64,7 +64,7 @@ class DiskWriter(Writer):
 	def __init__(self,**params):
 	def __init__(self,**params):
 		super().__init__()
 		super().__init__()
 		self._path = params['path']
 		self._path = params['path']
-		self._delimiter = params['delimiter']
+		self._delimiter = params['delimiter'] if 'delimiter' in params else None
 		self._mode = 'w' if 'mode' not in params else params['mode']
 		self._mode = 'w' if 'mode' not in params else params['mode']
 	# def meta(self):
 	# def meta(self):
 	# 	return self.cache['meta']
 	# 	return self.cache['meta']

+ 76 - 0
transport/nextcloud.py

@@ -0,0 +1,76 @@
+"""
+We are implementing transport to and from nextcloud (just like s3)
+"""
+import os
+import sys
+from transport.common import Reader,Writer
+import pandas as pd
+from io import StringIO
+import json
+import nextcloud_client as nextcloud
+
+class Nextcloud :
+    def __init__(self,**_args):
+        pass
+        self._delimiter = None
+        self._handler = nextcloud.Client(_args['url'])
+        _uid = _args['uid']
+        _token = _args['token']
+        self._uri = _args['folder'] if 'folder' in _args else './'
+        if self._uri.endswith('/') :
+            self._uri = self._uri[:-1]
+        self._file = None if 'file' not in _args else _args['file']
+        self._handler.login(_uid,_token)
+    def close(self):
+        try:
+            self._handler.logout()
+        except Exception as e:
+            pass
+        
+    
+class NextcloudReader(Nextcloud,Reader):
+    def __init__(self,**_args):
+        # self._file = [] if 'file' not in _args else _args['file']
+        super().__init__(**_args)
+        pass
+    def read(self,**_args):
+        _filename = self._file if 'file' not in _args else _args['file']
+        #
+        # @TODO: if _filename is none, an exception should be raised
+        #
+        _uri = '/'.join([self._uri,_filename])
+        if self._handler.get_file(_uri) :
+            #
+            #
+            _info = self._handler.file_info(_uri)
+            _content = self._handler.get_file_contents(_uri).decode('utf8')
+            if _info.get_content_type() == 'text/csv' :
+                _file = StringIO(_content)
+                return pd.read_csv(_file)
+            else:
+                return _content
+        return None     
+class NextcloudWriter (Nextcloud,Writer):
+    """
+    This class will write data to an instance of nextcloud
+    """
+    def __init__(self,**_args)    :
+        super().__init__(**_args)
+        self
+    def write(self,_data,**_args):
+        """
+        This function will upload a file to a given destination 
+        :file   has the uri of the location of the file
+        """
+        _filename = self._file if 'file' not in _args else _args['file']
+        _uri = '/'.join([self._uri,_filename])
+        if type(_data) == pd.DataFrame :
+            f = StringIO()
+            _data.to_csv(f,index=False)
+            _content = f.getvalue()
+        elif type(_data) == dict :
+            _content = json.dumps(_data)
+        else:
+            _content = str(_data)
+        self._handler.put_file_contents(_uri,_content)
+

+ 12 - 6
transport/providers.py

@@ -10,6 +10,7 @@ from transport import etl as etl
 from transport import qlistener
 from transport import qlistener
 from transport import bricks
 from transport import bricks
 from transport import session
 from transport import session
+from transport import nextcloud
 import psycopg2 as pg
 import psycopg2 as pg
 import mysql.connector as my
 import mysql.connector as my
 from google.cloud import bigquery as bq
 from google.cloud import bigquery as bq
@@ -34,7 +35,7 @@ MARIADB  = 'mariadb'
 COUCHDB = 'couch'
 COUCHDB = 'couch'
 CONSOLE = 'console'
 CONSOLE = 'console'
 ETL = 'etl'
 ETL = 'etl'
-
+NEXTCLOUD = 'nextcloud'
 
 
 #
 #
 # synonyms of the above
 # synonyms of the above
@@ -49,18 +50,19 @@ AWS_S3  = 's3'
 RABBIT = RABBITMQ
 RABBIT = RABBITMQ
 
 
 QLISTENER = 'qlistener'
 QLISTENER = 'qlistener'
+QUEUE = QLISTENER
 DATABRICKS= 'databricks+connector'
 DATABRICKS= 'databricks+connector'
 DRIVERS  = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3}
 DRIVERS  = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3}
-CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[BIGQUERY,DATABRICKS],'file':[FILE],
-             'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QLISTENER],'http':[HTTP]}
+CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[NEXTCLOUD,S3,BIGQUERY,DATABRICKS],'file':[FILE],
+             'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QUEUE],'http':[HTTP]}
 
 
 READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader},
 READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader},
-        'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader},
+        'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader,NEXTCLOUD:nextcloud.NextcloudReader},
         'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener},
         'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener},
         # 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console},'http':session.HttpReader
         # 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console},'http':session.HttpReader
         }
         }
 WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter},
 WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter},
-         'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter},
+         'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter,NEXTCLOUD:nextcloud.NextcloudWriter},
          'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener},
          'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener},
         #  'cli':{CONSOLE:Console},
         #  'cli':{CONSOLE:Console},
         #  'memory':{CONSOLE:Console}, 'http':session.HttpReader
         #  'memory':{CONSOLE:Console}, 'http':session.HttpReader
@@ -78,12 +80,16 @@ PROVIDERS = {
     
     
     MYSQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
     MYSQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
     MARIADB:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
     MARIADB:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
+    
     S3:{'read':s3.s3Reader,'write':s3.s3Writer},
     S3:{'read':s3.s3Reader,'write':s3.s3Writer},
     BIGQUERY:{'read':sql.BigQueryReader,'write':sql.BigQueryWriter},
     BIGQUERY:{'read':sql.BigQueryReader,'write':sql.BigQueryWriter},
+    DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter},
+    NEXTCLOUD:{'read':nextcloud.NextcloudReader,'write':nextcloud.NextcloudWriter},
+
     QLISTENER:{'read':qlistener.qListener,'write':qlistener.qListener,'default':{'host':'localhost','port':5672}},
     QLISTENER:{'read':qlistener.qListener,'write':qlistener.qListener,'default':{'host':'localhost','port':5672}},
     CONSOLE:{'read':qlistener.Console,"write":qlistener.Console},
     CONSOLE:{'read':qlistener.Console,"write":qlistener.Console},
     HTTP:{'read':session.HttpReader,'write':session.HttpWriter},
     HTTP:{'read':session.HttpReader,'write':session.HttpWriter},
-    DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter},
+    
     MONGODB:{'read':mongo.MongoReader,'write':mongo.MongoWriter,'default':{'port':27017,'host':'localhost'}},
     MONGODB:{'read':mongo.MongoReader,'write':mongo.MongoWriter,'default':{'port':27017,'host':'localhost'}},
     COUCHDB:{'read':couch.CouchReader,'writer':couch.CouchWriter,'default':{'host':'localhost','port':5984}},
     COUCHDB:{'read':couch.CouchReader,'writer':couch.CouchWriter,'default':{'host':'localhost','port':5984}},
     ETL :{'read':etl.Transporter,'write':etl.Transporter}
     ETL :{'read':etl.Transporter,'write':etl.Transporter}

+ 1 - 1
transport/version.py

@@ -1,2 +1,2 @@
 __author__ = 'The Phi Technology'
 __author__ = 'The Phi Technology'
-__version__= '1.9.0'
+__version__= '1.9.2'