소스 검색

fixes with plugin handler

Steve Nyemba 2 달 전
부모
커밋
469c6f89a2
3개의 변경된 파일212개의 추가작업 그리고 174개의 파일을 삭제
  1. 46 24
      bin/transport
  2. 34 18
      transport/iowrapper.py
  3. 132 132
      transport/registry.py

+ 46 - 24
bin/transport

@@ -34,6 +34,8 @@ import time
 from termcolor import colored
 from enum import Enum
 from rich import print
+import plugin_ix as pix
+
 
 app = typer.Typer()
 app_e = typer.Typer()   #-- handles etl (run, generate)
@@ -147,7 +149,7 @@ def initregistry (email:Annotated[str,typer.Argument(help="email")],
                   path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"), 
                   override:bool=typer.Option(default=False,help="override existing configuration or not")):
     """
-    This functiion will initialize the registry and have both application and calling code loading the database parameters by a label
+    This functiion will initialize the data-transport registry and have both application and calling code loading the database parameters by a label
 
     """
     try:
@@ -179,42 +181,62 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be
     pass
 @app_x.command(name='add') 
 def register_plugs (
-    alias:Annotated[str,typer.Argument(help="unique alias fo the file being registered")],
-    path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")]
+    alias:Annotated[str,typer.Argument(help="unique function name within a file")],
+    path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")],
+    folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder"),
+    
     ):
     """
-    This function will register a file and the functions within will be refrences <alias>.<function> in a configuration file
+    This function will register a file and the functions within we are interested in using
     """
-    transport.registry.plugins.init()
-    _log = transport.registry.plugins.add(alias,path)
+    if ',' in alias :
+        alias = [_name.strip() for _name in alias.split(',') if _name.strip() != '' ] 
+    else:
+        alias = [alias.strip()]
+    _pregistry  = pix.Registry(folder=folder,plugin_folder='plugins/code')
+    _log = _pregistry.set(path,alias)
+    # transport.registry.plugins.init()
+    # _log = transport.registry.plugins.add(alias,path)
     _mark = TIMES_MARK if not _log else CHECK_MARK
-    _msg  = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {len(_log)} functions added"""
+    _msg  = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {_log} functions registered"""
     print (f"""{_mark} {_msg}""")
 @app_x.command(name="list") 
-def registry_list ():
-
-    transport.registry.plugins.init()
-    _d = []
-    for _alias in transport.registry.plugins._data :
-        _data = transport.registry.plugins._data[_alias]
-        _d +=  [{'alias':_alias,"plugin-count":len(_data['content']),'e.g':'@'.join([_alias,_data['content'][0]]),'plugins':json.dumps(_data['content'])}]
-    if _d:
-        print (pd.DataFrame(_d))
+def registry_list (folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport configuration folder")):
+    """
+    This function will list all the plugins (python functions/files) that are registered and can be reused
+    """
+    _pregistry  = pix.Registry(folder=folder)
+    _df = _pregistry.stats()
+    if _df.empty :
+        print (f"{TIMES_MARK} registry at {folder} is not ready")
     else:
-        print (f"""{TIMES_MARK}, Plugin registry is not available or needs initialization""")
+        print (_df)
 
+@app_x.command ("has")
+def registry_has (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
+                  folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")) :
+    _pregistry  = pix.Registry(folder=folder)
+    if _pregistry.has(alias) :
+        _msg = f"{CHECK_MARK} {alias} was [bold] found [/bold] in registry "
+    else:
+        _msg = f"{TIMES_MARK} {alias} was [bold] NOT found [/bold] in registry "
+    print (_msg)
+    
 @app_x.command(name="test") 
-def registry_test (key):
+def registry_test (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
+                  folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder")) :
+    _pregistry  = pix.Registry(folder=folder)
     """
     This function allows to test syntax for a plugin i.e in terms of alias@function
     """
-    _item = transport.registry.plugins.has(key=key)
-    if _item :
-        del _item['pointer']
-        print (f"""{CHECK_MARK} successfully loaded \033[1m{key}\033[0m found, version {_item['version']}""")
-        print (pd.DataFrame([_item]))
+    # _item = transport.registry.plugins.has(key=key)
+    _pointer = _pregistry.get(alias) if _pregistry.has(alias) else None
+        
+    if _pointer:
+        print (f"""{CHECK_MARK} successfully loaded [bold] {alias}[/bold] found in {folder}""")
+        
     else:
-        print (f"{TIMES_MARK} unable to load \033[1m{key}\033[0m. Make sure it is registered")
+        print (f"{TIMES_MARK} unable to load {alias}. Make sure it is registered")
 app.add_typer(app_e,name='etl',help="This function will run etl or generate a template etl configuration file")
 app.add_typer(app_r,name='registry',help='This function allows labeling database access information')
 app.add_typer(app_i,name="info",help="This function will print either license or supported database technologies")

+ 34 - 18
transport/iowrapper.py

@@ -11,6 +11,7 @@ from transport import providers
 from multiprocessing import Process
 import time
 
+import plugin_ix 
 
 class IO:
     """
@@ -21,20 +22,25 @@ class IO:
         plugins = _args['plugins'] if 'plugins' not in _args else None
 
         self._agent = _agent
+        self._ixloader = plugin_ix.Loader () #--
         if plugins :
-            self._init_plugins(plugins)
-        else:
-            self._plugins = None
+            self.init_plugins(plugins)
+        #     for _ref in plugins :
+        #         self._ixloader.set(_ref)
+        # if plugins :
+        #     self._init_plugins(plugins)
+        # else:
+        #     self._plugins = None
         
-    def _init_plugins(self,_args):
-        """
-        This function will load pipelined functions as a plugin loader
-        """
-        if 'path' in _args and 'names' in _args :
-            self._plugins = PluginLoader(**_args)
-        else:
-            self._plugins = PluginLoader()
-            [self._plugins.set(_pointer) for _pointer in _args]
+    # def _init_plugins(self,_args):
+    #     """
+    #     This function will load pipelined functions as a plugin loader
+    #     """
+    #     if 'path' in _args and 'names' in _args :
+    #         self._plugins = PluginLoader(**_args)
+    #     else:
+    #         self._plugins = PluginLoader()
+    #         [self._plugins.set(_pointer) for _pointer in _args]
         #
         # @TODO: We should have a way to log what plugins are loaded and ready to use
     def meta (self,**_args):
@@ -62,6 +68,10 @@ class IO:
             pointer = getattr(self._agent,_name)
             return pointer(_query)
         return None
+    def init_plugins(self,plugins):
+        for _ref in plugins :
+            self._ixloader.set(_ref)
+
 class IReader(IO):
     """
     This is a wrapper for read functionalities
@@ -71,22 +81,28 @@ class IReader(IO):
         
     def read(self,**_args):
         if 'plugins' in _args :
-            self._init_plugins(_args['plugins'])
+            self.init_plugins(_args['plugins'])
+
         _data = self._agent.read(**_args)
-        if self._plugins and self._plugins.ratio() > 0 :
-            _data = self._plugins.apply(_data)
+        # if self._plugins and self._plugins.ratio() > 0 :
+        #     _data = self._plugins.apply(_data)
         #
         # output data 
+        
+        #
+        # applying the the design pattern 
+        _data = self._ixloader.visitor(_data)
         return _data
 class IWriter(IO):
     def __init__(self,**_args): #_agent,pipeline=None):
         super().__init__(**_args) #_agent,pipeline)  
     def write(self,_data,**_args):
+        # if 'plugins' in _args :
+        #     self._init_plugins(_args['plugins'])
         if 'plugins' in _args :
-            self._init_plugins(_args['plugins'])
-        if self._plugins and self._plugins.ratio() > 0 :
-            _data = self._plugins.apply(_data)
+            self.init_plugins(_args['plugins'])
 
+        self._ixloader.visitor(_data)
         self._agent.write(_data,**_args)
 
 #

+ 132 - 132
transport/registry.py

@@ -21,161 +21,161 @@ if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
     REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH']
 REGISTRY_FILE= 'transport-registry.json'
 DATA = {}
-class plugins:
-    #
-    # This is a utility function that should enable management of plugins-registry
-    # The class allows to add/remove elements 
-    #
-    # @TODO: add read/write properties to the class (better design practice)
-    #
-    _data = {}
-    FOLDER = os.sep.join([REGISTRY_PATH,'plugins'])
-    CODE = os.sep.join([REGISTRY_PATH,'plugins','code'])
-    FILE = os.sep.join([REGISTRY_PATH,'plugin-registry.json'])
-    @staticmethod
-    def init():
+# class plugins:
+#     #
+#     # This is a utility function that should enable management of plugins-registry
+#     # The class allows to add/remove elements 
+#     #
+#     # @TODO: add read/write properties to the class (better design practice)
+#     #
+#     _data = {}
+#     FOLDER = os.sep.join([REGISTRY_PATH,'plugins'])
+#     CODE = os.sep.join([REGISTRY_PATH,'plugins','code'])
+#     FILE = os.sep.join([REGISTRY_PATH,'plugin-registry.json'])
+#     @staticmethod
+#     def init():
         
-        if not os.path.exists(plugins.FOLDER) :
-            os.makedirs(plugins.FOLDER)
-        if not os.path.exists(plugins.CODE):
-            os.makedirs(plugins.CODE)
-        if not os.path.exists(plugins.FILE):
-            f = open(plugins.FILE,'w')
-            f.write("{}")
-            f.close()
-        plugins._read() #-- will load data as a side effect
+#         if not os.path.exists(plugins.FOLDER) :
+#             os.makedirs(plugins.FOLDER)
+#         if not os.path.exists(plugins.CODE):
+#             os.makedirs(plugins.CODE)
+#         if not os.path.exists(plugins.FILE):
+#             f = open(plugins.FILE,'w')
+#             f.write("{}")
+#             f.close()
+#         plugins._read() #-- will load data as a side effect
 
-    @staticmethod
-    def copy (path) :
+#     @staticmethod
+#     def copy (path) :
         
-        shutil.copy2(path,plugins.CODE)
-    @staticmethod
-    def _read ():
-        f = open(plugins.FILE)
-        try:
-            _data = json.loads(f.read())
-            f.close()
-        except Exception as e:
-            print (f"Corrupted registry, resetting ...")
-            _data = {}
-            plugins._write(_data)
+#         shutil.copy2(path,plugins.CODE)
+#     @staticmethod
+#     def _read ():
+#         f = open(plugins.FILE)
+#         try:
+#             _data = json.loads(f.read())
+#             f.close()
+#         except Exception as e:
+#             print (f"Corrupted registry, resetting ...")
+#             _data = {}
+#             plugins._write(_data)
             
-        plugins._data = _data
-    @staticmethod
-    def _write (_data):
-        f = open(plugins.FILE,'w')
-        f.write(json.dumps(_data))
-        f.close()
-        plugins._data = _data
+#         plugins._data = _data
+#     @staticmethod
+#     def _write (_data):
+#         f = open(plugins.FILE,'w')
+#         f.write(json.dumps(_data))
+#         f.close()
+#         plugins._data = _data
 
-    @staticmethod
-    def inspect (_path):
-        _names = []
+#     @staticmethod
+#     def inspect (_path):
+#         _names = []
         
-        if os.path.exists(_path) :
-            _filename = _path.split(os.sep)[-1]
-            spec = importlib.util.spec_from_file_location(_filename, _path)
-            module = importlib.util.module_from_spec(spec)
-            spec.loader.exec_module(module)
+#         if os.path.exists(_path) :
+#             _filename = _path.split(os.sep)[-1]
+#             spec = importlib.util.spec_from_file_location(_filename, _path)
+#             module = importlib.util.module_from_spec(spec)
+#             spec.loader.exec_module(module)
 
-            # _names = [{'name':getattr(getattr(module,_name),'name'),'pointer':getattr(module,_name)} for _name in dir(module) if type( getattr(module,_name)).__name__ == 'function']
-            for _name in dir(module) :
-                _pointer = getattr(module,_name) 
-                if hasattr(_pointer,'transport') :
-                    _item = {'real_name':_name,'name':getattr(_pointer,'name'),'pointer':_pointer,'version':getattr(_pointer,'version')}
-                    _names.append(_item)
+#             # _names = [{'name':getattr(getattr(module,_name),'name'),'pointer':getattr(module,_name)} for _name in dir(module) if type( getattr(module,_name)).__name__ == 'function']
+#             for _name in dir(module) :
+#                 _pointer = getattr(module,_name) 
+#                 if hasattr(_pointer,'transport') :
+#                     _item = {'real_name':_name,'name':getattr(_pointer,'name'),'pointer':_pointer,'version':getattr(_pointer,'version')}
+#                     _names.append(_item)
 
             
-        return _names
-    @staticmethod
-    def add (alias,path):
-        """
-        Add overwrite the registry entries
-        """
-        _names = plugins.inspect (path)
-        _log = []
+#         return _names
+#     @staticmethod
+#     def add (alias,path):
+#         """
+#         Add overwrite the registry entries
+#         """
+#         _names = plugins.inspect (path)
+#         _log = []
         
-        if _names :
-            #
-            # We should make sure we have all the plugins with the attributes (transport,name) set
-            _names = [_item for _item in _names if hasattr(_item['pointer'],'transport') ]
-            if _names :
-                plugins.copy(path)
-                _content = []
+#         if _names :
+#             #
+#             # We should make sure we have all the plugins with the attributes (transport,name) set
+#             _names = [_item for _item in _names if hasattr(_item['pointer'],'transport') ]
+#             if _names :
+#                 plugins.copy(path)
+#                 _content = []
                 
-                for _item in _names :
-                    _key = '@'.join([alias,_item['name']])
-                    _log.append(_item['name'])
-                #
-                # Let us update the registry 
-                # 
-                plugins.update(alias,path,_log)        
-        return _log
+#                 for _item in _names :
+#                     _key = '@'.join([alias,_item['name']])
+#                     _log.append(_item['name'])
+#                 #
+#                 # Let us update the registry 
+#                 # 
+#                 plugins.update(alias,path,_log)        
+#         return _log
     
-    @staticmethod
-    def update (alias,path,_log) :
-        """
-        updating the registry entries of the plugins (management data)
-        """
-        # f = open(plugins.FILE)
-        # _data = json.loads(f.read())
-        # f.close()
-        _data = plugins._data
-        # _log = plugins.add(alias,path)
+#     @staticmethod
+#     def update (alias,path,_log) :
+#         """
+#         updating the registry entries of the plugins (management data)
+#         """
+#         # f = open(plugins.FILE)
+#         # _data = json.loads(f.read())
+#         # f.close()
+#         _data = plugins._data
+#         # _log = plugins.add(alias,path)
         
-        if _log :
-            _data[alias] = {'content':_log,'name':path.split(os.sep)[-1]}
-            plugins._write(_data) #-- will update data as a side effect
+#         if _log :
+#             _data[alias] = {'content':_log,'name':path.split(os.sep)[-1]}
+#             plugins._write(_data) #-- will update data as a side effect
 
-        return _log
-    @staticmethod
-    def get(**_args) :
-        # f = open(plugins.FILE)
-        # _data = json.loads(f.read())
-        # f.close()
-        # if 'key' in _args :
-        #     alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
-        # else :
-        #     alias = _args['alias']
-        #     name  = _args['name']
+#         return _log
+#     @staticmethod
+#     def get(**_args) :
+#         # f = open(plugins.FILE)
+#         # _data = json.loads(f.read())
+#         # f.close()
+#         # if 'key' in _args :
+#         #     alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
+#         # else :
+#         #     alias = _args['alias']
+#         #     name  = _args['name']
         
-        # if alias in _data :
+#         # if alias in _data :
             
-        #     _path = os.sep.join([plugins.CODE,_data[alias]['name']])
-        #     _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
+#         #     _path = os.sep.join([plugins.CODE,_data[alias]['name']])
+#         #     _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
             
-        #     _item = _item[0] if _item else None
-        #     if _item :
+#         #     _item = _item[0] if _item else None
+#         #     if _item :
                 
-        #         return _item['pointer'] 
-        # return None
-        _item = plugins.has(**_args)
-        return _item['pointer'] if _item else None
+#         #         return _item['pointer'] 
+#         # return None
+#         _item = plugins.has(**_args)
+#         return _item['pointer'] if _item else None
     
-    @staticmethod
-    def has (**_args):
-        f = open(plugins.FILE)
-        _data = json.loads(f.read())
-        f.close()
-        if 'key' in _args :
-            alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
-        else :
-            alias = _args['alias']
-            name  = _args['name']
+#     @staticmethod
+#     def has (**_args):
+#         f = open(plugins.FILE)
+#         _data = json.loads(f.read())
+#         f.close()
+#         if 'key' in _args :
+#             alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
+#         else :
+#             alias = _args['alias']
+#             name  = _args['name']
         
-        if alias in _data :
+#         if alias in _data :
             
-            _path = os.sep.join([plugins.CODE,_data[alias]['name']])
-            _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
+#             _path = os.sep.join([plugins.CODE,_data[alias]['name']])
+#             _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
             
-            _item = _item[0] if _item else None
-            if _item :
+#             _item = _item[0] if _item else None
+#             if _item :
                 
-                return copy.copy(_item)
-        return None
-    @staticmethod
-    def synch():
-        pass
+#                 return copy.copy(_item)
+#         return None
+#     @staticmethod
+#     def synch():
+#         pass
 
 def isloaded ():
     return DATA not in [{},None]