__init__.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. """
  2. This function contains code to load plugins, the idea of a plugin is that it is designed to load, functions written outside of the scope of the parser.
  3. This enables better parsing, and the creation of a pipeline that can perform pre/post conditions given that {x12} is organized in loops
  4. Contract:
  5. Functions will take in an object (the row of the claim they are intended to parse), and prior rows (parsed)
  6. """
  7. import os
  8. import importlib as IL
  9. import sys
  10. DEFAULT_PLUGINS='healthcareio.x12.plugins.default'
  11. class MODE :
  12. TRUST,CHECK,TEST,TEST_AND_CHECK= [0,1,2,3]
  13. def instance(**_args) :
  14. pass
  15. def has(**_args) :
  16. """
  17. This function will inspect if a function is valid as a plugin or not
  18. name : function name for a given file
  19. path : python file to examine
  20. """
  21. _pyfile = _args['path'] if 'path' in _args else ''
  22. _name = _args['name']
  23. # p = os.path.exists(_pyfile)
  24. _module = {}
  25. if os.path.exists(_pyfile):
  26. _info = IL.utils.spec_from_file_location(_name,_pyfile)
  27. if _info :
  28. _module = IL.utils.module_from_spec(_info)
  29. _info.load.exec(_module)
  30. else:
  31. _module = sys.modules[DEFAULT_PLUGINS]
  32. return hasattr(_module,_name)
  33. def get(**_args) :
  34. """
  35. This function will inspect if a function is valid as a plugin or not
  36. name : function name for a given file
  37. path : python file to examine
  38. """
  39. _pyfile = _args['path'] if 'path' in _args else ''
  40. _name = _args['name']
  41. # p = os.path.exists(_pyfile)
  42. _module = {}
  43. if os.path.exists(_pyfile):
  44. _info = IL.utils.spec_from_file_location(_name,_pyfile)
  45. if _info :
  46. _module = IL.utils.module_from_spec(_info)
  47. _info.load.exec(_module)
  48. else:
  49. _module = sys.modules[DEFAULT_PLUGINS]
  50. return getattr(_module,_name) if hasattr(_module,_name) else None
  51. def test (**_args):
  52. """
  53. This function will test a plugin to insure the plugin conforms to the norm we are setting here
  54. :pointer function to call
  55. """
  56. _params = {}
  57. try:
  58. if 'pointer' in _args :
  59. _caller = _args['pointer']
  60. else:
  61. _name = _args['name']
  62. _path = _args['path'] if 'path' in _args else None
  63. _caller = get(name=_name,path=_path)
  64. _params = _caller()
  65. #
  66. # the expected result is a list of field names [field_o,field_i]
  67. #
  68. return [_item for _item in _params if _item not in ['',None] and type(_item) == str]
  69. except Exception as e :
  70. return []
  71. pass
  72. def inspect(**_args):
  73. _mode = _args['mode']
  74. _name= _args['name']
  75. _path= _args['path']
  76. if _mode == MODE.CHECK :
  77. _doapply = [has]
  78. elif _mode == MODE.TEST :
  79. _doapply = [test]
  80. elif _mode == MODE.TEST_AND_CHECK :
  81. _doapply = [has,test]
  82. _status = True
  83. _plugin = {"name":_name}
  84. for _pointer in _doapply :
  85. _plugin[_pointer.__name__] = _pointer(name=_name,path=_path)
  86. if not _plugin[_pointer.__name__] :
  87. _status = False
  88. break
  89. _plugin['loaded'] = _status
  90. return _plugin
  91. def load(**_args):
  92. """
  93. This function will load all the plugins given an set of arguments :
  94. path file
  95. name list of functions to export
  96. mode 1- CHECK ONLY, 2 - TEST ONLY, 3- TEST_AND_CHECK
  97. """
  98. _path = _args ['path']
  99. _names= _args['names']
  100. _mode= _args ['mode'] if 'mode' in _args else MODE.TEST_AND_CHECK
  101. _doapply = []
  102. if _mode == MODE.CHECK :
  103. _doapply = [has]
  104. elif _mode == MODE.TEST :
  105. _doapply = [test]
  106. elif _mode == MODE.TEST_AND_CHECK :
  107. _doapply = [has,test]
  108. # _plugins = []
  109. _plugins = {}
  110. for _name in _names :
  111. _plugin = {"name":_name}
  112. if 'inspect' in _args and _args['inspect'] :
  113. _plugin = inspect(name=_name,mode=_mode,path=_path)
  114. else:
  115. _plugin["method"] = ""
  116. _status = True
  117. _plugin['loaded'] = _status
  118. if _plugin['loaded'] :
  119. _plugin['pointer'] = get(name=_name,path=_path)
  120. else:
  121. _plugin['pointer'] = None
  122. # _plugins.append(_plugin)
  123. _plugins[_name] = _plugin
  124. return _plugins
  125. def parse(**_args):
  126. """
  127. This function will apply a function against a given function, and data
  128. :row claim/remits pre-processed
  129. :plugins list of plugins
  130. :conifg configuration associated with
  131. """
  132. _row = _args['row']
  133. _document = _args['document']
  134. _config = _args['config']
  135. """
  136. "apply":"@path:name"
  137. """
  138. _info = _args['config']['apply']
  139. _plug_conf = _args['config']['plugin'] if 'plugin' in _args['config'] else {}
  140. if _info.startswith('@') :
  141. _path = '' #-- get this from general configuration
  142. elif _info.startswith('!'):
  143. _path = _info.split('!')[0][1:]
  144. _name = _info.split(':')[-1]
  145. _name = _args['config']['apply'].split(_path)