Browse Source

Merge pull request #21 from lnyemba/v2.2.0

V2.2.0
Steve L. Nyemba 1 month ago
parent
commit
d6f96ac1b2

+ 125 - 29
bin/transport

@@ -24,19 +24,28 @@ from multiprocessing import Process
 
 
 import os
 import os
 import transport
 import transport
-from transport import etl
+# from transport import etl
+from transport.iowrapper import IETL
 # from transport import providers
 # from transport import providers
 import typer
 import typer
 from typing_extensions import Annotated
 from typing_extensions import Annotated
 from typing import Optional
 from typing import Optional
 import time
 import time
 from termcolor import colored
 from termcolor import colored
+from enum import Enum
+from rich import print
+import plugin_ix as pix
+
 
 
 app = typer.Typer()
 app = typer.Typer()
+app_e = typer.Typer()   #-- handles etl (run, generate)
+app_x = typer.Typer()   #-- handles plugins (list,add, test)
+app_i = typer.Typer()   #-- handles information (version, license)
+app_r = typer.Typer()   #-- handles registry    
 REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
 REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
 REGISTRY_FILE= 'transport-registry.json'
 REGISTRY_FILE= 'transport-registry.json'
-CHECK_MARK = ' '.join(['[',colored(u'\u2713', 'green'),']'])
-TIMES_MARK= ' '.join(['[',colored(u'\u2717','red'),']'])
+CHECK_MARK = '[ [green]\u2713[/green] ]' #' '.join(['[',colored(u'\u2713', 'green'),']'])
+TIMES_MARK= '[ [red]\u2717[/red] ]' #' '.join(['[',colored(u'\u2717','red'),']'])
 # @app.command()
 # @app.command()
 def help() :     
 def help() :     
 	print (__doc__)
 	print (__doc__)
@@ -44,10 +53,15 @@ def wait(jobs):
     while jobs :
     while jobs :
         jobs = [thread for thread in jobs if thread.is_alive()]
         jobs = [thread for thread in jobs if thread.is_alive()]
         time.sleep(1)
         time.sleep(1)
+def wait (jobs):
+    while jobs :
+            jobs = [pthread for pthread in jobs if pthread.is_alive()]
 
 
-@app.command(name="apply")
+@app_e.command(name="run")
 def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
 def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
-        index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed")):
+        index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"),
+        batch:int = typer.Option(default=5, help="The number of parallel processes to run at once")
+        ):
     """
     """
     This function applies data transport ETL feature to read data from one source to write it one or several others
     This function applies data transport ETL feature to read data from one source to write it one or several others
     """
     """
@@ -56,23 +70,34 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil
         file = open(path)
         file = open(path)
         _config = json.loads (file.read() )
         _config = json.loads (file.read() )
         file.close()
         file.close()
-        if index :
+        if index is not None:            
             _config = [_config[ int(index)]]
             _config = [_config[ int(index)]]
-        jobs = []            
+        jobs = []          
         for _args in _config :
         for _args in _config :
-            pthread = etl.instance(**_args) #-- automatically starts the process
+            # pthread = etl.instance(**_args) #-- automatically starts the process
+            def bootup ():
+                _worker = IETL(**_args)
+                _worker.run()
+            pthread = Process(target=bootup)
+            pthread.start()
             jobs.append(pthread)
             jobs.append(pthread)
+            if len(jobs) == batch :
+                wait(jobs)
+                jobs = []
+        
+        if jobs :
+            wait (jobs)
         #
         #
-        # @TODO: Log the number of processes started and estimated time
-        while jobs :
-             jobs = [pthread for pthread in jobs if pthread.is_alive()]
-             time.sleep(1)
+        # @TODO: Log the number of processes started and estfrom transport impfrom transport impimated time
+        # while jobs :
+        #      jobs = [pthread for pthread in jobs if pthread.is_alive()]
+        #      time.sleep(1)
         #
         #
         # @TODO: Log the job termination here ...
         # @TODO: Log the job termination here ...
-@app.command(name="providers")
+@app_i.command(name="supported")
 def supported (format:Annotated[str,typer.Argument(help="format of the output, supported formats are (list,table,json)")]="table") :
 def supported (format:Annotated[str,typer.Argument(help="format of the output, supported formats are (list,table,json)")]="table") :
     """
     """
-    This function will print supported providers/vendors and their associated classifications
+    This function will print supported database technologies
     """
     """
     _df =  (transport.supported())
     _df =  (transport.supported())
     if format in ['list','json'] :
     if format in ['list','json'] :
@@ -80,17 +105,26 @@ def supported (format:Annotated[str,typer.Argument(help="format of the output, s
     else:
     else:
          print (_df)
          print (_df)
     print ()
     print ()
-
-@app.command()
-def version():
+@app_i.command(name="version")
+def version ():
+    """
+    This function will return the version of the data-transport
+    """
+    print()
+    print (f'[bold] {transport.__app_name__} ,[blue] {transport.__edition__} edition [/blue], version {transport.__version__}[/bold]')
+    print ()
+ 
+@app_i.command(name="license")
+def info():
     """
     """
     This function will display version and license information
     This function will display version and license information
     """
     """
-
-    print (transport.__app_name__,'version ',transport.__version__)
+    print()
+    print (f'[bold] {transport.__app_name__} ,{transport.__edition__}, version {transport.__version__}[/bold]')
+    print ()
     print (transport.__license__)
     print (transport.__license__)
 
 
-@app.command()
+@app_e.command()
 def generate (path:Annotated[str,typer.Argument(help="path of the ETL configuration file template (name included)")]):
 def generate (path:Annotated[str,typer.Argument(help="path of the ETL configuration file template (name included)")]):
     """
     """
     This function will generate a configuration template to give a sense of how to create one
     This function will generate a configuration template to give a sense of how to create one
@@ -99,45 +133,45 @@ def generate (path:Annotated[str,typer.Argument(help="path of the ETL configurat
             {
             {
                 "source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"},
                 "source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"},
                 "target":
                 "target":
-            [{"provider":"files","path":"addresses.csv","delimiter":","},{"provider":"sqlite","database":"sample.db3","table":"addresses"}]
+            [{"provider":"files","path":"addresses.csv","delimiter":","},{"provider":"sqlite3","database":"sample.db3","table":"addresses"}]
             }
             }
             ]
             ]
     file = open(path,'w')
     file = open(path,'w')
     file.write(json.dumps(_config))
     file.write(json.dumps(_config))
     file.close()
     file.close()
-    print (f"""{CHECK_MARK} Successfully generated a template ETL file at {path}""" )
+    print (f"""{CHECK_MARK} Successfully generated a template ETL file at [bold]{path}[/bold]""" )
     print ("""NOTE: Each line (source or target) is the content of an auth-file""")
     print ("""NOTE: Each line (source or target) is the content of an auth-file""")
 
 
 
 
 
 
-@app.command(name="init")
+@app_r.command(name="reset")
 def initregistry (email:Annotated[str,typer.Argument(help="email")],
 def initregistry (email:Annotated[str,typer.Argument(help="email")],
                   path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"), 
                   path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"), 
                   override:bool=typer.Option(default=False,help="override existing configuration or not")):
                   override:bool=typer.Option(default=False,help="override existing configuration or not")):
     """
     """
-    This functiion will initialize the registry and have both application and calling code loading the database parameters by a label
+    This functiion will initialize the data-transport registry and have both application and calling code loading the database parameters by a label
 
 
     """
     """
     try:
     try:
         transport.registry.init(email=email, path=path, override=override)
         transport.registry.init(email=email, path=path, override=override)
-        _msg = f"""{CHECK_MARK} Successfully wrote configuration to {path} from {email}"""
+        _msg = f"""{CHECK_MARK} Successfully wrote configuration to [bold]{path}[/bold] from [bold]{email}[/bold]"""
     except Exception as e:
     except Exception as e:
         _msg = f"{TIMES_MARK} {e}"
         _msg = f"{TIMES_MARK} {e}"
     print (_msg)
     print (_msg)
     print ()
     print ()
-@app.command(name="register")
+@app_r.command(name="add")
 def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")],
 def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")],
               auth_file:Annotated[str,typer.Argument(help="path of the auth_file")],
               auth_file:Annotated[str,typer.Argument(help="path of the auth_file")],
               default:bool=typer.Option(default=False,help="set the auth_file as default"),
               default:bool=typer.Option(default=False,help="set the auth_file as default"),
               path:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")):
               path:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")):
     """
     """
-    This function will register an auth-file i.e database connection and assign it a label, 
-    Learn more about auth-file at https://healthcareio.the-phi.com/data-transport
+    This function add  a database label for a given auth-file. which allows access to the database using a label of your choice.
+    
     """
     """
     try:
     try:
         if transport.registry.exists(path) :
         if transport.registry.exists(path) :
             transport.registry.set(label=label,auth_file=auth_file, default=default, path=path)
             transport.registry.set(label=label,auth_file=auth_file, default=default, path=path)
-            _msg = f"""{CHECK_MARK} Successfully added label "{label}" to data-transport registry"""
+            _msg = f"""{CHECK_MARK} Successfully added label [bold]"{label}"[/bold] to data-transport registry"""
         else:
         else:
             _msg = f"""{TIMES_MARK} Registry is not initialized, please initialize the registry (check help)"""
             _msg = f"""{TIMES_MARK} Registry is not initialized, please initialize the registry (check help)"""
     except Exception as e:
     except Exception as e:
@@ -145,6 +179,68 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be
     print (_msg)
     print (_msg)
     
     
     pass
     pass
+@app_x.command(name='add') 
+def register_plugs (
+    alias:Annotated[str,typer.Argument(help="unique function name within a file")],
+    path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")],
+    folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder"),
+    
+    ):
+    """
+    This function will register a file and the functions within we are interested in using
+    """
+    if ',' in alias :
+        alias = [_name.strip() for _name in alias.split(',') if _name.strip() != '' ] 
+    else:
+        alias = [alias.strip()]
+    _pregistry  = pix.Registry(folder=folder,plugin_folder='plugins/code')
+    _log = _pregistry.set(path,alias)
+    # transport.registry.plugins.init()
+    # _log = transport.registry.plugins.add(alias,path)
+    _mark = TIMES_MARK if not _log else CHECK_MARK
+    _msg  = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {_log} functions registered"""
+    print (f"""{_mark} {_msg}""")
+@app_x.command(name="list") 
+def registry_list (folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport configuration folder")):
+    """
+    This function will list all the plugins (python functions/files) that are registered and can be reused
+    """
+    _pregistry  = pix.Registry(folder=folder)
+    _df = _pregistry.stats()
+    if _df.empty :
+        print (f"{TIMES_MARK} registry at {folder} is not ready")
+    else:
+        print (_df)
+
+@app_x.command ("has")
+def registry_has (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
+                  folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")) :
+    _pregistry  = pix.Registry(folder=folder)
+    if _pregistry.has(alias) :
+        _msg = f"{CHECK_MARK} {alias} was [bold] found [/bold] in registry "
+    else:
+        _msg = f"{TIMES_MARK} {alias} was [bold] NOT found [/bold] in registry "
+    print (_msg)
+    
+@app_x.command(name="test") 
+def registry_test (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
+                  folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder")) :
+    _pregistry  = pix.Registry(folder=folder)
+    """
+    This function allows to test syntax for a plugin i.e in terms of alias@function
+    """
+    # _item = transport.registry.plugins.has(key=key)
+    _pointer = _pregistry.get(alias) if _pregistry.has(alias) else None
+        
+    if _pointer:
+        print (f"""{CHECK_MARK} successfully loaded [bold] {alias}[/bold] found in {folder}""")
+        
+    else:
+        print (f"{TIMES_MARK} unable to load {alias}. Make sure it is registered")
+app.add_typer(app_e,name='etl',help="This function will run etl or generate a template etl configuration file")
+app.add_typer(app_r,name='registry',help='This function allows labeling database access information')
+app.add_typer(app_i,name="info",help="This function will print either license or supported database technologies")
+app.add_typer(app_x, name="plugins",help="This function enables add/list/test of plugins in the registry")
 if __name__ == '__main__' :
 if __name__ == '__main__' :
      app()
      app()
 	
 	

+ 7 - 5
info/__init__.py

@@ -1,7 +1,8 @@
 __app_name__  = 'data-transport'
 __app_name__  = 'data-transport'
 __author__ = 'The Phi Technology'
 __author__ = 'The Phi Technology'
-__version__= '2.2.6'
+__version__= '2.2.12'
 __email__  = "info@the-phi.com"
 __email__  = "info@the-phi.com"
+__edition__= 'community'
 __license__=f"""
 __license__=f"""
 Copyright 2010 - 2024, Steve L. Nyemba
 Copyright 2010 - 2024, Steve L. Nyemba
 
 
@@ -13,9 +14,10 @@ THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR I
 
 
 """
 """
 
 
-__whatsnew__=f"""version {__version__}, focuses on collaborative environments like jupyter-base servers (apache zeppelin; jupyter notebook, jupyterlab, jupyterhub)
+__whatsnew__=f"""version {__version__}, 
+1. Added support for read/write logs as well as plugins (when applied)
+2. Bug fix with duckdb (adding readonly) for readers because there are issues with threads & processes
+3. support for streaming data, important to use this with large volumes of data
+
 
 
-    1. simpler syntax to create readers/writers
-    2. auth-file registry that can be referenced using a label
-    3. duckdb support
 """
 """

+ 138 - 0
notebooks/iceberg.ipynb

@@ -0,0 +1,138 @@
+{
+ "cells": [
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "#### Writing to Apache Iceberg\n",
+    "\n",
+    "1. Insure you have a Google Bigquery service account key on disk\n",
+    "2. The service key location is set as an environment variable **BQ_KEY**\n",
+    "3. The dataset will be automatically created within the project associated with the service key\n",
+    "\n",
+    "The cell below creates a dataframe that will be stored within Google Bigquery"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 15,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "['data transport version ', '2.4.0']\n"
+     ]
+    }
+   ],
+   "source": [
+    "#\n",
+    "# Writing to Google Bigquery database\n",
+    "#\n",
+    "import transport\n",
+    "from transport import providers\n",
+    "import pandas as pd\n",
+    "import os\n",
+    "\n",
+    "PRIVATE_KEY = os.environ['BQ_KEY'] #-- location of the service key\n",
+    "DATASET = 'demo'\n",
+    "_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
+    "# bqw = transport.get.writer(provider=providers.ICEBERG,catalog='mz',database='edw.mz',table='friends')\n",
+    "bqw = transport.get.writer(provider=providers.ICEBERG,table='edw.mz.friends')\n",
+    "bqw.write(_data,if_exists='replace') #-- default is append\n",
+    "print (['data transport version ', transport.__version__])\n"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "#### Reading from Google Bigquery\n",
+    "\n",
+    "The cell below reads the data that has been written by the cell above and computes the average age within a Google Bigquery (simple query). \n",
+    "\n",
+    "- Basic read of the designated table (friends) created above\n",
+    "- Execute an aggregate SQL against the table\n",
+    "\n",
+    "**NOTE**\n",
+    "\n",
+    "By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
+    "Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 14,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "           name  age\n",
+      "0    James Bond   55\n",
+      "1  Steve Rogers  150\n",
+      "2  Steve Nyemba   44\n",
+      "--------- STATISTICS ------------\n"
+     ]
+    }
+   ],
+   "source": [
+    "\n",
+    "import transport\n",
+    "from transport import providers\n",
+    "import os\n",
+    "PRIVATE_KEY=os.environ['BQ_KEY']\n",
+    "pgr = transport.get.reader(provider=providers.ICEBERG,database='edw.mz')\n",
+    "_df = pgr.read(table='friends')\n",
+    "_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
+    "_sdf = pgr.read(sql=_query)\n",
+    "print (_df)\n",
+    "print ('--------- STATISTICS ------------')\n",
+    "# print (_sdf)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "An **auth-file** is a file that contains database parameters used to access the database. \n",
+    "For code in shared environments, we recommend \n",
+    "\n",
+    "1. Having the **auth-file** stored on disk \n",
+    "2. and the location of the file is set to an environment variable.\n",
+    "\n",
+    "To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": []
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": "Python 3",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 3
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython3",
+   "version": "3.9.7"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}

+ 6 - 2
setup.py

@@ -5,7 +5,7 @@ from setuptools import setup, find_packages
 import os
 import os
 import sys
 import sys
 # from version import __version__,__author__
 # from version import __version__,__author__
-from info import __version__, __author__,__app_name__,__license__
+from info import __version__, __author__,__app_name__,__license__,__edition__
 
 
 
 
 def read(fname):
 def read(fname):
@@ -19,9 +19,13 @@ args    = {
 
 
     "packages": find_packages(include=['info','transport', 'transport.*'])}
     "packages": find_packages(include=['info','transport', 'transport.*'])}
 args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite']
 args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite']
-args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql']
+args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql','pyspark','pydrill','sqlalchemy_drill','plugin-ix@git+https://github.com/lnyemba/plugins-ix']
 args["url"] =   "https://healthcareio.the-phi.com/git/code/transport.git"
 args["url"] =   "https://healthcareio.the-phi.com/git/code/transport.git"
 args['scripts'] = ['bin/transport']
 args['scripts'] = ['bin/transport']
+args['classifiers'] = ['Programming Language :: Python :: 3',
+                    'License :: OSI Approved :: MIT License',
+                     'Operating System :: OS Independent',
+                                ],
 # if sys.version_info[0] == 2 :
 # if sys.version_info[0] == 2 :
 #     args['use_2to3'] = True
 #     args['use_2to3'] = True
 #     args['use_2to3_exclude_fixers']=['lib2to3.fixes.fix_import']
 #     args['use_2to3_exclude_fixers']=['lib2to3.fixes.fix_import']

+ 67 - 26
transport/__init__.py

@@ -18,31 +18,36 @@ Source Code is available under MIT License:
 """
 """
 import numpy as np
 import numpy as np
 
 
-from transport import sql, nosql, cloud, other
+from transport import sql, nosql, cloud, other, warehouse
 import pandas as pd
 import pandas as pd
 import json
 import json
 import os
 import os
-from info import __version__,__author__,__email__,__license__,__app_name__,__whatsnew__
+from info import __version__,__author__,__email__,__license__,__app_name__,__whatsnew__,__edition__
 from transport.iowrapper import IWriter, IReader, IETL
 from transport.iowrapper import IWriter, IReader, IETL
 from transport.plugins import PluginLoader
 from transport.plugins import PluginLoader
 from transport import providers
 from transport import providers
 import copy 
 import copy 
 from transport import registry
 from transport import registry
-
+from transport.plugins import Plugin 
 PROVIDERS = {}
 PROVIDERS = {}
 
 
 def init():
 def init():
     global PROVIDERS
     global PROVIDERS
-    for _module in [cloud,sql,nosql,other] :
+    for _module in [cloud,sql,nosql,other,warehouse] :
         for _provider_name in dir(_module) :
         for _provider_name in dir(_module) :
             if _provider_name.startswith('__') or _provider_name == 'common':
             if _provider_name.startswith('__') or _provider_name == 'common':
                 continue
                 continue
             PROVIDERS[_provider_name] = {'module':getattr(_module,_provider_name),'type':_module.__name__}
             PROVIDERS[_provider_name] = {'module':getattr(_module,_provider_name),'type':_module.__name__}
-def _getauthfile (path) :
-    f = open(path)
-    _object = json.loads(f.read())
-    f.close()
-    return _object
+    #
+    # loading the registry
+    if not registry.isloaded() :
+        registry.load()
+
+# def _getauthfile (path) :
+#     f = open(path)
+#     _object = json.loads(f.read())
+#     f.close()
+#     return _object
 def instance (**_args):
 def instance (**_args):
     """
     """
     This function returns an object of to read or write from a supported database provider/vendor
     This function returns an object of to read or write from a supported database provider/vendor
@@ -52,16 +57,7 @@ def instance (**_args):
     kwargs      These are arguments that are provider/vendor specific
     kwargs      These are arguments that are provider/vendor specific
     """
     """
     global PROVIDERS
     global PROVIDERS
-    # if not registry.isloaded () :
-    #     if ('path' in _args and registry.exists(_args['path'] )) or registry.exists():
-    #         registry.load() if 'path' not in _args else registry.load(_args['path'])
-    #         print ([' GOT IT'])
-    # if 'label' in _args and registry.isloaded():
-    #     _info = registry.get(_args['label'])
-    #     if _info :
-    #         #
-    #         _args = dict(_args,**_info)
-
+    
     if 'auth_file' in _args:
     if 'auth_file' in _args:
         if os.path.exists(_args['auth_file']) :
         if os.path.exists(_args['auth_file']) :
             #
             #
@@ -78,7 +74,7 @@ def instance (**_args):
             filename = _args['auth_file']
             filename = _args['auth_file']
             raise Exception(f" {filename} was not found or is invalid")
             raise Exception(f" {filename} was not found or is invalid")
     if 'provider' not in _args and 'auth_file' not in _args :
     if 'provider' not in _args and 'auth_file' not in _args :
-        if not registry.isloaded () :
+        if not registry.isloaded () : 
             if ('path' in _args and registry.exists(_args['path'] )) or registry.exists():
             if ('path' in _args and registry.exists(_args['path'] )) or registry.exists():
                 registry.load() if 'path' not in _args else registry.load(_args['path'])
                 registry.load() if 'path' not in _args else registry.load(_args['path'])
         _info = {}
         _info = {}
@@ -87,8 +83,6 @@ def instance (**_args):
         else:
         else:
             _info = registry.get()    
             _info = registry.get()    
         if _info :
         if _info :
-            #
-            # _args = dict(_args,**_info)
             _args = dict(_info,**_args) #-- we can override the registry parameters with our own arguments
             _args = dict(_info,**_args) #-- we can override the registry parameters with our own arguments
 
 
     if 'provider' in _args and _args['provider'] in PROVIDERS :
     if 'provider' in _args and _args['provider'] in PROVIDERS :
@@ -119,8 +113,32 @@ def instance (**_args):
         #         for _delegate in _params :
         #         for _delegate in _params :
         #             loader.set(_delegate)
         #             loader.set(_delegate)
         
         
-        loader = None if 'plugins' not in _args else _args['plugins']
-        return IReader(_agent,loader) if _context == 'read' else IWriter(_agent,loader)
+        _plugins = None if 'plugins' not in _args else _args['plugins']
+        
+        # if registry.has('logger') :
+        #     _kwa = registry.get('logger')
+        #     _lmodule = getPROVIDERS[_kwa['provider']]
+            
+        if ( ('label' in _args and _args['label'] != 'logger') and registry.has('logger')):
+            #
+            # We did not request label called logger, so we are setting up a logger if it is specified in the registry
+            #
+            _kwargs = registry.get('logger')
+            _kwargs['context']  = 'write'
+            _kwargs['table']    =_module.__name__.split('.')[-1]+'_logs'
+            # _logger = instance(**_kwargs)
+            _module = PROVIDERS[_kwargs['provider']]['module']
+            _logger = getattr(_module,'Writer')
+            _logger = _logger(**_kwargs)
+        else:
+            _logger = None
+        
+        _kwargs = {'agent':_agent,'plugins':_plugins,'logger':_logger}
+        if 'args' in _args :
+            _kwargs['args'] = _args['args']
+        # _datatransport =  IReader(_agent,_plugins,_logger) if _context == 'read' else IWriter(_agent,_plugins,_logger)
+        _datatransport =  IReader(**_kwargs) if _context == 'read' else IWriter(**_kwargs)
+        return _datatransport
 
 
     else:
     else:
         #
         #
@@ -137,7 +155,14 @@ class get :
         if not _args or ('provider' not in _args and 'label' not in _args):
         if not _args or ('provider' not in _args and 'label' not in _args):
             _args['label'] = 'default'
             _args['label'] = 'default'
         _args['context'] = 'read'
         _args['context'] = 'read'
-        return instance(**_args)
+        # return instance(**_args)
+        # _args['logger'] = instance(**{'label':'logger','context':'write','table':'logs'})
+        
+        _handler =  instance(**_args)
+        # _handler.setLogger(get.logger())
+        return _handler
+
+    
     @staticmethod
     @staticmethod
     def writer(**_args):
     def writer(**_args):
         """
         """
@@ -146,10 +171,26 @@ class get :
         if not _args or ('provider' not in _args and 'label' not in _args):
         if not _args or ('provider' not in _args and 'label' not in _args):
             _args['label'] = 'default'
             _args['label'] = 'default'
         _args['context'] = 'write'
         _args['context'] = 'write'
-        return instance(**_args)
+        # _args['logger'] = instance(**{'label':'logger','context':'write','table':'logs'})
+
+        _handler =  instance(**_args)
+        #
+        # Implementing logging with the 'eat-your-own-dog-food' approach
+        # Using dependency injection to set the logger (problem with imports)
+        #
+        # _handler.setLogger(get.logger())
+        return _handler
+    @staticmethod
+    def logger ():
+        if registry.has('logger') :
+            _args = registry.get('logger')
+            _args['context']  = 'write'
+            return instance(**_args)
+        return None
     @staticmethod
     @staticmethod
     def etl (**_args):
     def etl (**_args):
         if 'source' in _args and 'target' in _args :
         if 'source' in _args and 'target' in _args :
+
             return IETL(**_args)
             return IETL(**_args)
         else:
         else:
             raise Exception ("Malformed input found, object must have both 'source' and 'target' attributes")
             raise Exception ("Malformed input found, object must have both 'source' and 'target' attributes")

+ 60 - 33
transport/iowrapper.py

@@ -5,33 +5,42 @@ NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading o
         - upon initialization we will load plugins
         - upon initialization we will load plugins
         - on read/write we apply a pipeline (if passed as an argument)
         - on read/write we apply a pipeline (if passed as an argument)
 """    
 """    
-from transport.plugins import plugin, PluginLoader
+from transport.plugins import Plugin, PluginLoader
 import transport
 import transport
 from transport import providers
 from transport import providers
 from multiprocessing import Process
 from multiprocessing import Process
 import time
 import time
 
 
+import plugin_ix 
 
 
 class IO:
 class IO:
     """
     """
     Base wrapper class for read/write and support for logs
     Base wrapper class for read/write and support for logs
     """
     """
-    def __init__(self,_agent,plugins):
+    def __init__(self,**_args):
+        _agent  = _args['agent']
+        plugins = _args['plugins'] if 'plugins' in _args else None
+
         self._agent = _agent
         self._agent = _agent
+        self._ixloader = plugin_ix.Loader () #--
         if plugins :
         if plugins :
-            self._init_plugins(plugins)
-        else:
-            self._plugins = None
+            self.init_plugins(plugins)
+        #     for _ref in plugins :
+        #         self._ixloader.set(_ref)
+        # if plugins :
+        #     self._init_plugins(plugins)
+        # else:
+        #     self._plugins = None
         
         
-    def _init_plugins(self,_args):
-        """
-        This function will load pipelined functions as a plugin loader
-        """
-        if 'path' in _args and 'names' in _args :
-            self._plugins = PluginLoader(**_args)
-        else:
-            self._plugins = PluginLoader()
-            [self._plugins.set(_pointer) for _pointer in _args]
+    # def _init_plugins(self,_args):
+    #     """
+    #     This function will load pipelined functions as a plugin loader
+    #     """
+    #     if 'path' in _args and 'names' in _args :
+    #         self._plugins = PluginLoader(**_args)
+    #     else:
+    #         self._plugins = PluginLoader()
+    #         [self._plugins.set(_pointer) for _pointer in _args]
         #
         #
         # @TODO: We should have a way to log what plugins are loaded and ready to use
         # @TODO: We should have a way to log what plugins are loaded and ready to use
     def meta (self,**_args):
     def meta (self,**_args):
@@ -42,12 +51,12 @@ class IO:
     def close(self):
     def close(self):
         if hasattr(self._agent,'close') :
         if hasattr(self._agent,'close') :
             self._agent.close()
             self._agent.close()
-    def apply(self):
-        """
-        applying pre/post conditions given a pipeline expression
-        """
-        for _pointer in self._plugins :
-            _data = _pointer(_data)
+    # def apply(self):
+    #     """
+    #     applying pre/post conditions given a pipeline expression
+    #     """
+    #     for _pointer in self._plugins :
+    #         _data = _pointer(_data)
     def apply(self,_query):
     def apply(self,_query):
         if hasattr(self._agent,'apply') :
         if hasattr(self._agent,'apply') :
             return self._agent.apply(_query)
             return self._agent.apply(_query)
@@ -59,30 +68,41 @@ class IO:
             pointer = getattr(self._agent,_name)
             pointer = getattr(self._agent,_name)
             return pointer(_query)
             return pointer(_query)
         return None
         return None
+    def init_plugins(self,plugins):
+        for _ref in plugins :
+            self._ixloader.set(_ref)
+
 class IReader(IO):
 class IReader(IO):
     """
     """
     This is a wrapper for read functionalities
     This is a wrapper for read functionalities
     """
     """
-    def __init__(self,_agent,pipeline=None):
-        super().__init__(_agent,pipeline)
+    def __init__(self,**_args):
+        super().__init__(**_args)
+        
     def read(self,**_args):
     def read(self,**_args):
         if 'plugins' in _args :
         if 'plugins' in _args :
-            self._init_plugins(_args['plugins'])
+            self.init_plugins(_args['plugins'])
+
         _data = self._agent.read(**_args)
         _data = self._agent.read(**_args)
-        if self._plugins and self._plugins.ratio() > 0 :
-            _data = self._plugins.apply(_data)
+        # if self._plugins and self._plugins.ratio() > 0 :
+        #     _data = self._plugins.apply(_data)
         #
         #
         # output data 
         # output data 
+        
+        #
+        # applying the the design pattern 
+        _data = self._ixloader.visitor(_data)
         return _data
         return _data
 class IWriter(IO):
 class IWriter(IO):
-    def __init__(self,_agent,pipeline=None):
-        super().__init__(_agent,pipeline)  
+    def __init__(self,**_args): #_agent,pipeline=None):
+        super().__init__(**_args) #_agent,pipeline)  
     def write(self,_data,**_args):
     def write(self,_data,**_args):
+        # if 'plugins' in _args :
+        #     self._init_plugins(_args['plugins'])
         if 'plugins' in _args :
         if 'plugins' in _args :
-            self._init_plugins(_args['plugins'])
-        if self._plugins and self._plugins.ratio() > 0 :
-            _data = self._plugins.apply(_data)
+            self.init_plugins(_args['plugins'])
 
 
+        self._ixloader.visitor(_data)
         self._agent.write(_data,**_args)
         self._agent.write(_data,**_args)
 
 
 #
 #
@@ -94,7 +114,7 @@ class IETL(IReader) :
     This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
     This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
     """
     """
     def __init__(self,**_args):
     def __init__(self,**_args):
-        super().__init__(transport.get.reader(**_args['source']))
+        super().__init__(agent=transport.get.reader(**_args['source']),plugins=None)
         if 'target' in _args:
         if 'target' in _args:
             self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
             self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
         else:
         else:
@@ -105,16 +125,23 @@ class IETL(IReader) :
         self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
         self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
     def read(self,**_args):
     def read(self,**_args):
         _data = super().read(**_args)
         _data = super().read(**_args)
-
+        _schema = super().meta()
         for _kwargs in self._targets :
         for _kwargs in self._targets :
+            if _schema :
+                _kwargs['schema'] = _schema
             self.post(_data,**_kwargs)
             self.post(_data,**_kwargs)
 
 
         return _data
         return _data
+    def run(self) :
+        return self.read()
     def post (self,_data,**_args) :
     def post (self,_data,**_args) :
         """
         """
         This function returns an instance of a process that will perform the write operation
         This function returns an instance of a process that will perform the write operation
         :_args  parameters associated with writer object
         :_args  parameters associated with writer object
         """
         """
         writer = transport.get.writer(**_args)
         writer = transport.get.writer(**_args)
-        writer.write(_data)
+        if 'schema' in _args :
+            writer.write(_data,schema=_args['schema'])
+        else:
+            writer.write(_data)
         writer.close()
         writer.close()

+ 72 - 40
transport/plugins/__init__.py

@@ -11,8 +11,10 @@ import importlib as IL
 import importlib.util
 import importlib.util
 import sys
 import sys
 import os
 import os
+import pandas as pd
+import time
 
 
-class plugin :
+class Plugin :
     """
     """
     Implementing function decorator for data-transport plugins (post-pre)-processing
     Implementing function decorator for data-transport plugins (post-pre)-processing
     """
     """
@@ -22,8 +24,9 @@ class plugin :
         :mode   restrict to reader/writer
         :mode   restrict to reader/writer
         :about  tell what the function is about    
         :about  tell what the function is about    
         """
         """
-        self._name = _args['name']
-        self._about = _args['about']
+        self._name = _args['name'] if 'name' in _args else None
+        self._version = _args['version'] if 'version' in _args else '0.1'
+        self._doc = _args['doc'] if 'doc' in _args else "N/A"
         self._mode = _args['mode'] if 'mode' in _args else 'rw'
         self._mode = _args['mode'] if 'mode' in _args else 'rw'
     def __call__(self,pointer,**kwargs):
     def __call__(self,pointer,**kwargs):
         def wrapper(_args,**kwargs):
         def wrapper(_args,**kwargs):
@@ -32,57 +35,67 @@ class plugin :
         # @TODO:
         # @TODO:
         # add attributes to the wrapper object
         # add attributes to the wrapper object
         #
         #
+        self._name = pointer.__name__ if not self._name else self._name
         setattr(wrapper,'transport',True)
         setattr(wrapper,'transport',True)
         setattr(wrapper,'name',self._name)
         setattr(wrapper,'name',self._name)
-        setattr(wrapper,'mode',self._mode)
-        setattr(wrapper,'about',self._about)
+        setattr(wrapper,'version',self._version)
+        setattr(wrapper,'doc',self._doc)
         return wrapper
         return wrapper
 
 
-
 class PluginLoader :
 class PluginLoader :
     """
     """
     This class is intended to load a plugin and make it available and assess the quality of the developed plugin
     This class is intended to load a plugin and make it available and assess the quality of the developed plugin
     """
     """
+   
     def __init__(self,**_args):
     def __init__(self,**_args):
         """
         """
-        :path   location of the plugin (should be a single file)
-        :_names of functions to load
         """
         """
-        _names = _args['names'] if 'names' in _args else None
-        path = _args['path'] if 'path' in _args else None
-        self._names = _names if type(_names) == list else [_names]
+        # _names = _args['names'] if 'names' in _args else None
+        # path = _args['path'] if 'path' in _args else None
+        # self._names = _names if type(_names) == list else [_names]
         self._modules = {}
         self._modules = {}
         self._names = []
         self._names = []
-        if path and os.path.exists(path) and _names:
-            for _name in self._names :
-                
-                spec = importlib.util.spec_from_file_location('private', path)
-                module = importlib.util.module_from_spec(spec)
-                spec.loader.exec_module(module) #--loads it into sys.modules
-                if hasattr(module,_name) :
-                    if self.isplugin(module,_name) :
-                        self._modules[_name] = getattr(module,_name)
-                    else:
-                        print ([f'Found {_name}', 'not plugin'])
-                else:
-                    #
-                    # @TODO: We should log this somewhere some how
-                    print (['skipping ',_name, hasattr(module,_name)])
-                    pass
-        else:
-            #
-            # Initialization is empty
-            self._names = []
+        self._registry = _args['registry']
+
         pass
         pass
-    def set(self,_pointer) :
+    def load (self,**_args):
+        """
+        This function loads a plugin
+        """
+        self._modules = {}
+        self._names = []
+        path = _args ['path']
+        if os.path.exists(path) :
+            _alias = path.split(os.sep)[-1]
+            spec = importlib.util.spec_from_file_location(_alias, path)
+            module = importlib.util.module_from_spec(spec)
+            spec.loader.exec_module(module) #--loads it into sys.modules
+            for _name in dir(module) :
+                if self.isplugin(module,_name) :
+                    self._module[_name] = getattr(module,_name)
+                    # self._names [_name]
+    def format (self,**_args):
+        uri = _args['alias'],_args['name']
+    # def set(self,_pointer) :
+    def set(self,_key) :
         """
         """
         This function will set a pointer to the list of modules to be called
         This function will set a pointer to the list of modules to be called
         This should be used within the context of using the framework as a library
         This should be used within the context of using the framework as a library
         """
         """
-        _name = _pointer.__name__
+        if type(_key).__name__ == 'function':
+            #
+            # The pointer is in the code provided by the user and loaded in memory
+            #
+            _pointer = _key
+            _key = 'inline@'+_key.__name__
+            # self._names.append(_key.__name__)
+        else:
+            _pointer = self._registry.get(key=_key)
+
+        if _pointer  :
+            self._modules[_key] = _pointer
+            self._names.append(_key)
         
         
-        self._modules[_name] = _pointer
-        self._names.append(_name)
     def isplugin(self,module,name):
     def isplugin(self,module,name):
         """
         """
         This function determines if a module is a recognized plugin
         This function determines if a module is a recognized plugin
@@ -107,12 +120,31 @@ class PluginLoader :
 
 
         _n = len(self._names)
         _n = len(self._names)
         return len(set(self._modules.keys()) & set (self._names)) / _n
         return len(set(self._modules.keys()) & set (self._names)) / _n
-    def apply(self,_data):
+    def apply(self,_data,_logger=[]):
+        _input= {}
+        
         for _name in self._modules :
         for _name in self._modules :
-            _pointer = self._modules[_name]
-            #
-            # @TODO: add exception handling
-            _data = _pointer(_data)
+            try:
+                _input = {'action':'plugin','object':_name,'input':{'status':'PASS'}}
+                _pointer = self._modules[_name]
+                if type(_data) == list :
+                    _data = pd.DataFrame(_data)
+                _brow,_bcol = list(_data.shape) 
+                
+                #
+                # @TODO: add exception handling
+                _data = _pointer(_data)
+                
+                _input['input']['shape'] = {'rows-dropped':_brow - _data.shape[0]}
+            except Exception as e:
+                _input['input']['status'] = 'FAILED'
+                print (e)
+            time.sleep(1)
+            if _logger:
+                try:
+                    _logger(**_input)
+                except Exception as e:
+                    pass    
         return _data
         return _data
     # def apply(self,_data,_name):
     # def apply(self,_data,_name):
     #     """
     #     """

+ 5 - 3
transport/providers/__init__.py

@@ -11,7 +11,7 @@ BIGQUERY	='bigquery'
 FILE 	= 'file'
 FILE 	= 'file'
 ETL = 'etl'
 ETL = 'etl'
 
 
-SQLITE = 'sqlite'
+SQLITE = 'sqlite3'
 SQLITE3= 'sqlite3'
 SQLITE3= 'sqlite3'
 DUCKDB = 'duckdb'
 DUCKDB = 'duckdb'
 
 
@@ -44,7 +44,9 @@ PGSQL	= POSTGRESQL
 
 
 AWS_S3  = 's3'
 AWS_S3  = 's3'
 RABBIT = RABBITMQ
 RABBIT = RABBITMQ
-
-
+ICEBERG='iceberg'
+APACHE_ICEBERG = 'iceberg'
+DRILL = 'drill'
+APACHE_DRILL = 'drill'
 # QLISTENER = 'qlistener'
 # QLISTENER = 'qlistener'
     
     

+ 167 - 7
transport/registry.py

@@ -3,6 +3,10 @@ import json
 from info import __version__
 from info import __version__
 import copy
 import copy
 import transport
 import transport
+import importlib
+import importlib.util
+import shutil
+
 
 
 """
 """
 This class manages data from the registry and allows (read only)
 This class manages data from the registry and allows (read only)
@@ -16,28 +20,182 @@ REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
 if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
 if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
     REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH']
     REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH']
 REGISTRY_FILE= 'transport-registry.json'
 REGISTRY_FILE= 'transport-registry.json'
-
 DATA = {}
 DATA = {}
+# class plugins:
+#     #
+#     # This is a utility function that should enable management of plugins-registry
+#     # The class allows to add/remove elements 
+#     #
+#     # @TODO: add read/write properties to the class (better design practice)
+#     #
+#     _data = {}
+#     FOLDER = os.sep.join([REGISTRY_PATH,'plugins'])
+#     CODE = os.sep.join([REGISTRY_PATH,'plugins','code'])
+#     FILE = os.sep.join([REGISTRY_PATH,'plugin-registry.json'])
+#     @staticmethod
+#     def init():
+        
+#         if not os.path.exists(plugins.FOLDER) :
+#             os.makedirs(plugins.FOLDER)
+#         if not os.path.exists(plugins.CODE):
+#             os.makedirs(plugins.CODE)
+#         if not os.path.exists(plugins.FILE):
+#             f = open(plugins.FILE,'w')
+#             f.write("{}")
+#             f.close()
+#         plugins._read() #-- will load data as a side effect
+
+#     @staticmethod
+#     def copy (path) :
+        
+#         shutil.copy2(path,plugins.CODE)
+#     @staticmethod
+#     def _read ():
+#         f = open(plugins.FILE)
+#         try:
+#             _data = json.loads(f.read())
+#             f.close()
+#         except Exception as e:
+#             print (f"Corrupted registry, resetting ...")
+#             _data = {}
+#             plugins._write(_data)
+            
+#         plugins._data = _data
+#     @staticmethod
+#     def _write (_data):
+#         f = open(plugins.FILE,'w')
+#         f.write(json.dumps(_data))
+#         f.close()
+#         plugins._data = _data
+
+#     @staticmethod
+#     def inspect (_path):
+#         _names = []
+        
+#         if os.path.exists(_path) :
+#             _filename = _path.split(os.sep)[-1]
+#             spec = importlib.util.spec_from_file_location(_filename, _path)
+#             module = importlib.util.module_from_spec(spec)
+#             spec.loader.exec_module(module)
+
+#             # _names = [{'name':getattr(getattr(module,_name),'name'),'pointer':getattr(module,_name)} for _name in dir(module) if type( getattr(module,_name)).__name__ == 'function']
+#             for _name in dir(module) :
+#                 _pointer = getattr(module,_name) 
+#                 if hasattr(_pointer,'transport') :
+#                     _item = {'real_name':_name,'name':getattr(_pointer,'name'),'pointer':_pointer,'version':getattr(_pointer,'version')}
+#                     _names.append(_item)
+
+            
+#         return _names
+#     @staticmethod
+#     def add (alias,path):
+#         """
+#         Add overwrite the registry entries
+#         """
+#         _names = plugins.inspect (path)
+#         _log = []
+        
+#         if _names :
+#             #
+#             # We should make sure we have all the plugins with the attributes (transport,name) set
+#             _names = [_item for _item in _names if hasattr(_item['pointer'],'transport') ]
+#             if _names :
+#                 plugins.copy(path)
+#                 _content = []
+                
+#                 for _item in _names :
+#                     _key = '@'.join([alias,_item['name']])
+#                     _log.append(_item['name'])
+#                 #
+#                 # Let us update the registry 
+#                 # 
+#                 plugins.update(alias,path,_log)        
+#         return _log
+    
+#     @staticmethod
+#     def update (alias,path,_log) :
+#         """
+#         updating the registry entries of the plugins (management data)
+#         """
+#         # f = open(plugins.FILE)
+#         # _data = json.loads(f.read())
+#         # f.close()
+#         _data = plugins._data
+#         # _log = plugins.add(alias,path)
+        
+#         if _log :
+#             _data[alias] = {'content':_log,'name':path.split(os.sep)[-1]}
+#             plugins._write(_data) #-- will update data as a side effect
+
+#         return _log
+#     @staticmethod
+#     def get(**_args) :
+#         # f = open(plugins.FILE)
+#         # _data = json.loads(f.read())
+#         # f.close()
+#         # if 'key' in _args :
+#         #     alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
+#         # else :
+#         #     alias = _args['alias']
+#         #     name  = _args['name']
+        
+#         # if alias in _data :
+            
+#         #     _path = os.sep.join([plugins.CODE,_data[alias]['name']])
+#         #     _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
+            
+#         #     _item = _item[0] if _item else None
+#         #     if _item :
+                
+#         #         return _item['pointer'] 
+#         # return None
+#         _item = plugins.has(**_args)
+#         return _item['pointer'] if _item else None
+    
+#     @staticmethod
+#     def has (**_args):
+#         f = open(plugins.FILE)
+#         _data = json.loads(f.read())
+#         f.close()
+#         if 'key' in _args :
+#             alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
+#         else :
+#             alias = _args['alias']
+#             name  = _args['name']
+        
+#         if alias in _data :
+            
+#             _path = os.sep.join([plugins.CODE,_data[alias]['name']])
+#             _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
+            
+#             _item = _item[0] if _item else None
+#             if _item :
+                
+#                 return copy.copy(_item)
+#         return None
+#     @staticmethod
+#     def synch():
+#         pass
 
 
 def isloaded ():
 def isloaded ():
     return DATA not in [{},None]
     return DATA not in [{},None]
-def exists (path=REGISTRY_PATH) :
+def exists (path=REGISTRY_PATH,_file=REGISTRY_FILE) :
     """
     """
     This function determines if there is a registry at all
     This function determines if there is a registry at all
     """
     """
     p = os.path.exists(path)
     p = os.path.exists(path)
-    q = os.path.exists( os.sep.join([path,REGISTRY_FILE]))
+    q = os.path.exists( os.sep.join([path,_file]))
     
     
     return p and q
     return p and q
-def load (_path=REGISTRY_PATH):
+def load (_path=REGISTRY_PATH,_file=REGISTRY_FILE):
     global DATA
     global DATA
     
     
     if exists(_path) :
     if exists(_path) :
-        path = os.sep.join([_path,REGISTRY_FILE])
+        path = os.sep.join([_path,_file])
         f = open(path)
         f = open(path)
         DATA = json.loads(f.read())
         DATA = json.loads(f.read())
         f.close()
         f.close()
-def init (email,path=REGISTRY_PATH,override=False):
+def init (email,path=REGISTRY_PATH,override=False,_file=REGISTRY_FILE):
     """
     """
     Initializing the registry and will raise an exception in the advent of an issue
     Initializing the registry and will raise an exception in the advent of an issue
     """
     """
@@ -47,7 +205,7 @@ def init (email,path=REGISTRY_PATH,override=False):
         _config = {"email":email,'version':__version__}
         _config = {"email":email,'version':__version__}
         if not os.path.exists(path):
         if not os.path.exists(path):
             os.makedirs(path)
             os.makedirs(path)
-        filename = os.sep.join([path,REGISTRY_FILE])
+        filename = os.sep.join([path,_file])
         if not os.path.exists(filename) or override == True :
         if not os.path.exists(filename) or override == True :
 
 
             f = open(filename,'w')
             f = open(filename,'w')
@@ -62,6 +220,8 @@ def init (email,path=REGISTRY_PATH,override=False):
 def lookup (label):
 def lookup (label):
     global DATA
     global DATA
     return label in DATA
     return label in DATA
+has = lookup 
+
 def get (label='default') :
 def get (label='default') :
     global DATA
     global DATA
     return copy.copy(DATA[label]) if label in DATA else {}
     return copy.copy(DATA[label]) if label in DATA else {}

+ 51 - 21
transport/sql/common.py

@@ -3,7 +3,7 @@ This file encapsulates common operations associated with SQL databases via SQLAl
 
 
 """
 """
 import sqlalchemy as sqa
 import sqlalchemy as sqa
-from sqlalchemy import text 
+from sqlalchemy import text , MetaData, inspect
 
 
 import pandas as pd
 import pandas as pd
 
 
@@ -13,7 +13,13 @@ class Base:
         self._port = None
         self._port = None
         self._database = _args['database']
         self._database = _args['database']
         self._table = _args['table'] if 'table' in _args else None
         self._table = _args['table'] if 'table' in _args else None
-        self._engine= sqa.create_engine(self._get_uri(**_args),future=True)
+        _uri = self._get_uri(**_args)
+        if type(_uri) == str :
+            self._engine= sqa.create_engine(_uri,future=True)
+        else:
+            
+            _uri,_kwargs = _uri
+            self._engine= sqa.create_engine(_uri,**_kwargs,future=True)
     def _set_uri(self,**_args) :
     def _set_uri(self,**_args) :
         """
         """
         :provider   provider
         :provider   provider
@@ -34,21 +40,33 @@ class Base:
         :table  optional name of the table (can be fully qualified)
         :table  optional name of the table (can be fully qualified)
         """
         """
         _table = self._table if 'table' not in _args else _args['table']
         _table = self._table if 'table' not in _args else _args['table']
+        _map = {'TINYINT':'INTEGER','BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
         _schema = []
         _schema = []
-        if _table :
-            if sqa.__version__.startswith('1.') :
-                _handler = sqa.MetaData(bind=self._engine)
-                _handler.reflect()
-            else:
-                #
-                # sqlalchemy's version 2.+
-                _handler = sqa.MetaData()
-                _handler.reflect(bind=self._engine)
-            #
-            # Let us extract the schema with the native types
-            _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
-            _schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns]
-        return _schema
+        # if _table :
+        #     if sqa.__version__.startswith('1.') :
+        #         _handler = sqa.MetaData(bind=self._engine)
+        #         _handler.reflect()
+        #     else:
+        #         #
+        #         # sqlalchemy's version 2.+
+        #         _handler = sqa.MetaData()
+        #         _handler.reflect(bind=self._engine)
+        #     #
+        #     # Let us extract the schema with the native types
+        #     _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
+        #     _schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns]
+        #
+        try:
+            if _table :
+                _inspector = inspect(self._engine)
+                _columns = _inspector.get_columns(_table)
+                _schema = [{'name':column['name'],'type':_map.get(str(column['type']),str(column['type'])) } for column in _columns]
+                return _schema
+        except Exception as e:
+            pass
+
+        # else:
+        return []
     def  has(self,**_args):
     def  has(self,**_args):
         return self.meta(**_args)
         return self.meta(**_args)
     def apply(self,sql):
     def apply(self,sql):
@@ -58,8 +76,8 @@ class Base:
 
 
         @TODO: Execution of stored procedures
         @TODO: Execution of stored procedures
         """
         """
-        if sql.lower().startswith('select') or sql.lower().startswith('with') :
-
+        if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'):
+            
             return pd.read_sql(sql,self._engine) 
             return pd.read_sql(sql,self._engine) 
         else:
         else:
             _handler = self._engine.connect()
             _handler = self._engine.connect()
@@ -71,6 +89,7 @@ class Base:
 class SQLBase(Base):
 class SQLBase(Base):
     def __init__(self,**_args):
     def __init__(self,**_args):
         super().__init__(**_args)
         super().__init__(**_args)
+        self._schema = _args.get('schema',None)
     def get_provider(self):
     def get_provider(self):
         raise Exception ("Provider Needs to be set ...")
         raise Exception ("Provider Needs to be set ...")
     def get_default_port(self) :
     def get_default_port(self) :
@@ -94,7 +113,11 @@ class SQLBase(Base):
         # _uri = [_item.strip() for _item in _uri if _item.strip()]
         # _uri = [_item.strip() for _item in _uri if _item.strip()]
         # return '/'.join(_uri)
         # return '/'.join(_uri)
         return f'{_provider}://{_host}/{_database}' if _account == '' else f'{_provider}://{_account}{_host}/{_database}'
         return f'{_provider}://{_host}/{_database}' if _account == '' else f'{_provider}://{_account}{_host}/{_database}'
-
+    def close(self,) :
+        try:
+            self._engine.dispose()
+        except :
+            pass
 class BaseReader(SQLBase):
 class BaseReader(SQLBase):
     def __init__(self,**_args):
     def __init__(self,**_args):
         super().__init__(**_args)    
         super().__init__(**_args)    
@@ -106,6 +129,8 @@ class BaseReader(SQLBase):
             sql = _args['sql']
             sql = _args['sql']
         else:
         else:
             _table = _args['table'] if 'table' in _args else self._table
             _table = _args['table'] if 'table' in _args else self._table
+            if self._schema and type(self._schema) == str :
+                _table = f'{self._schema}.{_table}'
             sql = f'SELECT * FROM {_table}'
             sql = f'SELECT * FROM {_table}'
         return self.apply(sql)
         return self.apply(sql)
     
     
@@ -116,9 +141,11 @@ class BaseWriter (SQLBase):
     """
     """
     def __init__(self,**_args):
     def __init__(self,**_args):
         super().__init__(**_args)
         super().__init__(**_args)
+        
     def write(self,_data,**_args):
     def write(self,_data,**_args):
+        
         if type(_data) == dict :
         if type(_data) == dict :
-            _df = pd.DataFrame(_data)
+            _df = pd.DataFrame([_data])
         elif type(_data) == list :
         elif type(_data) == list :
             _df = pd.DataFrame(_data)
             _df = pd.DataFrame(_data)
         else:
         else:
@@ -135,5 +162,8 @@ class BaseWriter (SQLBase):
         #     _mode['schema'] = _args['schema']
         #     _mode['schema'] = _args['schema']
         # if 'if_exists' in _args :
         # if 'if_exists' in _args :
         #     _mode['if_exists'] = _args['if_exists']
         #     _mode['if_exists'] = _args['if_exists']
-
+        if 'schema' in _args and type(_args['schema']) == str:
+            self._schema = _args.get('schema',None)
+        if self._schema :
+           _mode['schema'] = self._schema
         _df.to_sql(_table,self._engine,**_mode)
         _df.to_sql(_table,self._engine,**_mode)

+ 3 - 1
transport/sql/duckdb.py

@@ -15,9 +15,11 @@ class Duck :
     def _get_uri(self,**_args):
     def _get_uri(self,**_args):
         return f"""duckdb:///{self.database}"""
         return f"""duckdb:///{self.database}"""
 class Reader(Duck,BaseReader) :
 class Reader(Duck,BaseReader) :
-    def __init__(self,**_args):
+    def __init__(self,**_args):        
         Duck.__init__(self,**_args)
         Duck.__init__(self,**_args)
         BaseReader.__init__(self,**_args)
         BaseReader.__init__(self,**_args)
+    def _get_uri(self,**_args):
+        return super()._get_uri(**_args),{'connect_args':{'read_only':True}}
 class Writer(Duck,BaseWriter):
 class Writer(Duck,BaseWriter):
     def __init__(self,**_args):
     def __init__(self,**_args):
         Duck.__init__(self,**_args)
         Duck.__init__(self,**_args)

+ 7 - 0
transport/warehouse/__init__.py

@@ -0,0 +1,7 @@
+"""
+This namespace/package is intended to handle read/writes against data warehouse solutions like :
+    - apache iceberg
+    - clickhouse (...)
+"""
+
+from . import iceberg, drill

+ 55 - 0
transport/warehouse/drill.py

@@ -0,0 +1,55 @@
+import sqlalchemy
+import pandas as pd
+from .. sql.common import BaseReader , BaseWriter
+import sqlalchemy as sqa
+
+class Drill :
+    __template = {'host':None,'port':None,'ssl':None,'table':None,'database':None}
+    def __init__(self,**_args):
+
+        self._host = _args['host'] if 'host' in _args else 'localhost'
+        self._port = _args['port'] if 'port' in _args else self.get_default_port()
+        self._ssl = False if 'ssl' not in _args else _args['ssl']
+        
+        self._table = _args['table'] if 'table' in _args else None
+        if self._table and '.' in self._table :
+            _seg = self._table.split('.')
+            if len(_seg) > 2 :
+                self._schema,self._database = _seg[:2]
+        else:
+            
+            self._database=_args['database']
+            self._schema = self._database.split('.')[0]
+        
+    def _get_uri(self,**_args):
+        return f'drill+sadrill://{self._host}:{self._port}/{self._database}?use_ssl={self._ssl}'
+    def get_provider(self):
+        return "drill+sadrill"
+    def get_default_port(self):
+        return "8047"
+    def meta(self,**_args):
+        _table = _args['table'] if 'table' in _args else self._table
+        if '.' in _table :
+            _schema = _table.split('.')[:2]
+            _schema = '.'.join(_schema)
+            _table = _table.split('.')[-1]
+        else:
+            _schema = self._schema
+        
+        # _sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( 125 )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
+        _sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( '||COLUMN_SIZE||' )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
+        try:
+            _df  = pd.read_sql(_sql,self._engine)
+            return _df.to_dict(orient='records')
+        except Exception as e:
+            print (e)
+            pass
+        return []
+class Reader (Drill,BaseReader) :
+    def __init__(self,**_args):
+        super().__init__(**_args)
+        self._chunksize = 0 if 'chunksize' not in _args else _args['chunksize']
+        self._engine= sqa.create_engine(self._get_uri(),future=True)
+class Writer(Drill,BaseWriter):
+    def __init__(self,**_args):
+        super().__init__(self,**_args)

+ 151 - 0
transport/warehouse/iceberg.py

@@ -0,0 +1,151 @@
+"""
+dependency:
+    - spark and SPARK_HOME environment variable must be set
+NOTE:
+    When using streaming option, insure that it is inline with default (1000 rows) or increase it in spark-defaults.conf
+
+"""
+from pyspark.sql import SparkSession
+from pyspark import SparkContext
+from pyspark.sql.types import *
+from pyspark.sql.functions import col, to_date, to_timestamp
+import copy
+
+class Iceberg :
+    def __init__(self,**_args):
+        """
+        providing catalog meta information (you must get this from apache iceberg)
+        """
+        #
+        # Turning off logging (it's annoying & un-professional)
+        #
+        # _spconf = SparkContext()
+        # _spconf.setLogLevel("ERROR")
+        #
+        # @TODO:
+        #   Make arrangements for additional configuration elements 
+        #
+        self._session = SparkSession.builder.appName("data-transport").getOrCreate()
+        self._session.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
+        # self._session.sparkContext.setLogLevel("ERROR")
+        self._catalog = self._session.catalog
+        self._table = _args['table'] if 'table' in _args else None
+        
+        if 'catalog' in _args :
+            #
+            # Let us set the default catalog
+            self._catalog.setCurrentCatalog(_args['catalog'])
+            
+        else:
+            # No current catalog has been set ...
+            pass
+        if 'database' in _args :
+            self._database = _args['database']
+            self._catalog.setCurrentDatabase(self._database)
+        else:
+            #
+            # Should we set the default as the first one if available ?
+            #
+            pass
+        self._catalogName = self._catalog.currentCatalog()
+        self._databaseName = self._catalog.currentDatabase()
+    def meta (self,**_args) :
+        """
+        This function should return the schema of a table (only)
+        """
+        _schema = []
+        try:
+            _table = _args['table'] if 'table' in _args else self._table
+            _tableName = self._getPrefix(**_args) + f".{_table}"
+            _tmp = self._session.table(_tableName).schema
+            _schema = _tmp.jsonValue()['fields']
+            for _item in _schema :
+                del _item['nullable'],_item['metadata']
+        except Exception as e:
+            
+            pass
+        return _schema
+    def _getPrefix (self,**_args):        
+        _catName = self._catalogName if 'catalog' not in _args else _args['catalog']
+        _datName = self._databaseName if 'database' not in _args else _args['database']
+        
+        return '.'.join([_catName,_datName])
+    def apply(self,_query):
+        """
+        sql query/command to run against apache iceberg
+        """
+        return self._session.sql(_query)
+    def has (self,**_args):
+        try:
+            _prefix = self._getPrefix(**_args)
+            if _prefix.endswith('.') :
+                return False
+            return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)]
+        except Exception as e:
+            print (e)
+            return False
+    
+    def close(self):
+        self._session.stop()
+class Reader(Iceberg) :
+    def __init__(self,**_args):
+        super().__init__(**_args)
+    def read(self,**_args):
+        _table = self._table
+        _prefix = self._getPrefix(**_args)        
+        if 'table' in _args or _table:
+            _table = _args['table'] if 'table' in _args else _table
+            _table = _prefix + f'.{_table}'
+            return self._session.table(_table).toPandas()
+        else:
+            sql = _args['sql']
+            return self._session.sql(sql).toPandas()
+        pass
+class Writer (Iceberg):
+    """
+    Writing data to an Apache Iceberg data warehouse (using pyspark)
+    """
+    def __init__(self,**_args):
+        super().__init__(**_args)
+        self._mode = 'append' if 'mode' not in _args else _args['mode']
+        self._table = None if 'table' not in _args else _args['table']
+    def format (self,_schema) :
+        _iceSchema = StructType([])
+        _map = {'integer':IntegerType(),'float':DoubleType(),'double':DoubleType(),'date':DateType(),
+                'timestamp':TimestampType(),'datetime':TimestampType(),'string':StringType(),'varchar':StringType()}
+        for _item in _schema :
+            _name = _item['name']
+            _type = _item['type'].lower()
+            if _type not in _map :
+                _iceType = StringType()
+            else:
+                _iceType = _map[_type]
+            
+            _iceSchema.add (StructField(_name,_iceType,True))
+        return _iceSchema if len(_iceSchema) else []
+    def write(self,_data,**_args):
+        _prefix = self._getPrefix(**_args)
+        if 'table' not in _args and not self._table :
+            raise Exception (f"Table Name should be specified for catalog/database {_prefix}")
+        _schema = self.format(_args['schema']) if 'schema' in _args else []
+        if not _schema :
+            rdd = self._session.createDataFrame(_data,verifySchema=False)
+        else :
+            rdd = self._session.createDataFrame(_data,schema=_schema,verifySchema=True)
+        _mode = self._mode if 'mode' not in _args else _args['mode']
+        _table = self._table if 'table' not in _args else _args['table']
+        
+        # print (_data.shape,_mode,_table)
+        
+        if not self._session.catalog.tableExists(_table):
+        #     # @TODO:
+        #     # add partitioning information here 
+            rdd.writeTo(_table).using('iceberg').create()
+            
+        # #     _mode = 'overwrite'
+        # #     rdd.write.format('iceberg').mode(_mode).saveAsTable(_table)
+        else:
+            # rdd.writeTo(_table).append()
+        # #     _table = f'{_prefix}.{_table}'
+
+            rdd.coalesce(10).write.format('iceberg').mode('append').save(_table)