Browse Source

Merge pull request 'Adding nextcloud support' (#5) from nextcloud into dev

adding nextcloud support and minor bug fixes with sqlite
Reviewed-on: https://dev.the-phi.com/git/library/data-transport/pulls/5
Steve L. Nyemba 1 year ago
parent
commit
f1fa4b93e8
5 changed files with 106 additions and 16 deletions
  1. 1 1
      setup.py
  2. 10 7
      transport/disk.py
  3. 80 0
      transport/nextcloud.py
  4. 14 7
      transport/providers.py
  5. 1 1
      transport/version.py

+ 1 - 1
setup.py

@@ -17,7 +17,7 @@ args    = {
     "license":"MIT",
     "packages":["transport"]}
 args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite']
-args["install_requires"] = ['pymongo','sqlalchemy<2.0.0','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python']
+args["install_requires"] = ['pyncclient','pymongo','sqlalchemy<2.0.0','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python']
 args["url"] =   "https://healthcareio.the-phi.com/git/code/transport.git"
 args['scripts'] = ['bin/transport']
 if sys.version_info[0] == 2 :

+ 10 - 7
transport/disk.py

@@ -64,7 +64,7 @@ class DiskWriter(Writer):
 	def __init__(self,**params):
 		super().__init__()
 		self._path = params['path']
-		self._delimiter = params['delimiter']
+		self._delimiter = params['delimiter'] if 'delimiter' in params else None
 		self._mode = 'w' if 'mode' not in params else params['mode']
 	# def meta(self):
 	# 	return self.cache['meta']
@@ -209,17 +209,20 @@ class SQLiteWriter(SQLite,DiskWriter) :
 		"""
 		"""
 		
-		if not self.fields :
-			if type(info) == pd.DataFrame :
-				_columns = list(info.columns) 
-			self.init(list(info.keys()))
+		#if not self.fields :
+		#	#if type(info) == pd.DataFrame :
+		#	#	_columns = list(info.columns) 
+        #   #self.init(list(info.keys()))
 		
 		if type(info) == dict :
 			info = [info]
 		elif type(info) == pd.DataFrame :
 			info = info.fillna('')
 			info = info.to_dict(orient='records')
-		
+        if not self.fields :
+            _rec = info[0]
+            self.init(list(_rec.keys()))
+
 		SQLiteWriter.LOCK.acquire()
 		try:
 			
@@ -238,4 +241,4 @@ class SQLiteWriter(SQLite,DiskWriter) :
 		except Exception as e :
 			print (e)
 			pass
-		SQLiteWriter.LOCK.release()
+		SQLiteWriter.LOCK.release()

+ 80 - 0
transport/nextcloud.py

@@ -0,0 +1,80 @@
+"""
+We are implementing transport to and from nextcloud (just like s3)
+"""
+import os
+import sys
+from transport.common import Reader,Writer
+import pandas as pd
+from io import StringIO
+import json
+import nextcloud_client as nextcloud
+
+class Nextcloud :
+    def __init__(self,**_args):
+        pass
+        self._delimiter = None
+        self._handler = nextcloud.Client(_args['url'])
+        _uid = _args['uid']
+        _token = _args['token']
+        self._uri = _args['folder'] if 'folder' in _args else './'
+        if self._uri.endswith('/') :
+            self._uri = self._uri[:-1]
+        self._file = None if 'file' not in _args else _args['file']
+        self._handler.login(_uid,_token)
+    def close(self):
+        try:
+            self._handler.logout()
+        except Exception as e:
+            pass
+        
+    
+class NextcloudReader(Nextcloud,Reader):
+    def __init__(self,**_args):
+        # self._file = [] if 'file' not in _args else _args['file']
+        super().__init__(**_args)
+        pass
+    def read(self,**_args):
+        _filename = self._file if 'file' not in _args else _args['file']
+        #
+        # @TODO: if _filename is none, an exception should be raised
+        #
+        _uri = '/'.join([self._uri,_filename])
+        if self._handler.get_file(_uri) :
+            #
+            #
+            _info = self._handler.file_info(_uri)
+            _content = self._handler.get_file_contents(_uri).decode('utf8')
+            if _info.get_content_type() == 'text/csv' :
+                #
+                # @TODO: enable handling of csv, xls, parquet, pickles
+                _file = StringIO(_content)
+                return pd.read_csv(_file)
+            else:
+                #
+                # if it is neither a structured document like csv, we will return the content as is
+                return _content
+        return None     
+class NextcloudWriter (Nextcloud,Writer):
+    """
+    This class will write data to an instance of nextcloud
+    """
+    def __init__(self,**_args)    :
+        super().__init__(**_args)
+        self
+    def write(self,_data,**_args):
+        """
+        This function will upload a file to a given destination 
+        :file   has the uri of the location of the file
+        """
+        _filename = self._file if 'file' not in _args else _args['file']
+        _uri = '/'.join([self._uri,_filename])
+        if type(_data) == pd.DataFrame :
+            f = StringIO()
+            _data.to_csv(f,index=False)
+            _content = f.getvalue()
+        elif type(_data) == dict :
+            _content = json.dumps(_data)
+        else:
+            _content = str(_data)
+        self._handler.put_file_contents(_uri,_content)
+

+ 14 - 7
transport/providers.py

@@ -10,6 +10,7 @@ from transport import etl as etl
 from transport import qlistener
 from transport import bricks
 from transport import session
+from transport import nextcloud
 import psycopg2 as pg
 import mysql.connector as my
 from google.cloud import bigquery as bq
@@ -34,7 +35,7 @@ MARIADB  = 'mariadb'
 COUCHDB = 'couch'
 CONSOLE = 'console'
 ETL = 'etl'
-
+NEXTCLOUD = 'nextcloud'
 
 #
 # synonyms of the above
@@ -49,18 +50,19 @@ AWS_S3  = 's3'
 RABBIT = RABBITMQ
 
 QLISTENER = 'qlistener'
+QUEUE = QLISTENER
 DATABRICKS= 'databricks+connector'
 DRIVERS  = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3}
-CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[BIGQUERY,DATABRICKS],'file':[FILE],
-             'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QLISTENER],'http':[HTTP]}
+CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[NEXTCLOUD,S3,BIGQUERY,DATABRICKS],'file':[FILE],
+             'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QUEUE],'http':[HTTP]}
 
 READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader},
-        'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader},
+        'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader,NEXTCLOUD:nextcloud.NextcloudReader},
         'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener},
         # 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console},'http':session.HttpReader
         }
 WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter},
-         'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter},
+         'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter,NEXTCLOUD:nextcloud.NextcloudWriter},
          'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener},
         #  'cli':{CONSOLE:Console},
         #  'memory':{CONSOLE:Console}, 'http':session.HttpReader
@@ -70,6 +72,7 @@ WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.Co
 PROVIDERS = {
     FILE:{'read':disk.DiskReader,'write':disk.DiskWriter},
     SQLITE:{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3},
+    'sqlite3':{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3},
     
     POSTGRESQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}},
     NETEZZA:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':nz,'default':{'port':5480}},
@@ -78,12 +81,16 @@ PROVIDERS = {
     
     MYSQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
     MARIADB:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
+    
     S3:{'read':s3.s3Reader,'write':s3.s3Writer},
     BIGQUERY:{'read':sql.BigQueryReader,'write':sql.BigQueryWriter},
+    DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter},
+    NEXTCLOUD:{'read':nextcloud.NextcloudReader,'write':nextcloud.NextcloudWriter},
+
     QLISTENER:{'read':qlistener.qListener,'write':qlistener.qListener,'default':{'host':'localhost','port':5672}},
     CONSOLE:{'read':qlistener.Console,"write":qlistener.Console},
     HTTP:{'read':session.HttpReader,'write':session.HttpWriter},
-    DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter},
+    
     MONGODB:{'read':mongo.MongoReader,'write':mongo.MongoWriter,'default':{'port':27017,'host':'localhost'}},
     COUCHDB:{'read':couch.CouchReader,'writer':couch.CouchWriter,'default':{'host':'localhost','port':5984}},
     ETL :{'read':etl.Transporter,'write':etl.Transporter}
@@ -92,4 +99,4 @@ DEFAULT = {PG:{'host':'localhost','port':5432},MYSQL:{'host':'localhost','port':
 DEFAULT[MONGODB] = {'port':27017,'host':'localhost'}
 DEFAULT[REDSHIFT] = DEFAULT[PG]
 DEFAULT[MARIADB] = DEFAULT[MYSQL]
-DEFAULT[NETEZZA] = {'port':5480}
+DEFAULT[NETEZZA] = {'port':5480}

+ 1 - 1
transport/version.py

@@ -1,2 +1,2 @@
 __author__ = 'The Phi Technology'
-__version__= '1.9.0'
+__version__= '1.9.2'