__init__.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. """
  2. Data Transport, The Phi Technology LLC
  3. Steve L. Nyemba, steve@the-phi.com
  4. This library is designed to serve as a wrapper to a set of supported data stores :
  5. - couchdb
  6. - mongodb
  7. - Files (character delimited)
  8. - Queues (RabbmitMq)
  9. - Session (Flask)
  10. - s3
  11. - sqlite
  12. The supported operations are read/write and providing meta data to the calling code
  13. We separated reads from writes to mitigate accidents associated with writes.
  14. Source Code is available under MIT License:
  15. https://healthcareio.the-phi.com/data-transport
  16. https://hiplab.mc.vanderbilt.edu/git/hiplab/data-transport
  17. """
  18. import numpy as np
  19. #from transport import sql, nosql, cloud, other, warehouse
  20. from transport import sql
  21. try:
  22. from transport import nosql
  23. finally:
  24. pass
  25. try:
  26. from transport import cloud
  27. finally:
  28. pass
  29. try:
  30. from transport import warehouse
  31. finally:
  32. pass
  33. try:
  34. from transport import other
  35. finally:
  36. pass
  37. import pandas as pd
  38. import json
  39. import os
  40. from info import __version__,__author__,__email__,__license__,__app_name__,__whatsnew__,__edition__
  41. from transport.iowrapper import IWriter, IReader, IETL
  42. from transport.plugins import PluginLoader
  43. from transport import providers
  44. import copy
  45. from transport import registry
  46. from transport.plugins import Plugin
  47. PROVIDERS = {}
  48. def init():
  49. global PROVIDERS
  50. for _module in [cloud,sql,nosql,other,warehouse] :
  51. for _provider_name in dir(_module) :
  52. if _provider_name.startswith('__') or _provider_name == 'common':
  53. continue
  54. PROVIDERS[_provider_name] = {'module':getattr(_module,_provider_name),'type':_module.__name__}
  55. #
  56. # loading the registry
  57. if not registry.isloaded() :
  58. registry.load()
  59. # def _getauthfile (path) :
  60. # f = open(path)
  61. # _object = json.loads(f.read())
  62. # f.close()
  63. # return _object
  64. def instance (**_args):
  65. """
  66. This function returns an object of to read or write from a supported database provider/vendor
  67. @provider provider
  68. @context read/write (default is read)
  69. @auth_file: Optional if the database information provided is in a file. Useful for not sharing passwords
  70. kwargs These are arguments that are provider/vendor specific
  71. """
  72. global PROVIDERS
  73. if 'auth_file' in _args:
  74. if os.path.exists(_args['auth_file']) :
  75. #
  76. # @TODO: add encryption module and decryption to enable this to be secure
  77. #
  78. f = open(_args['auth_file'])
  79. #_args = dict (_args,** json.loads(f.read()) )
  80. #
  81. # we overrite file parameters with arguments passed
  82. _args = dict (json.loads(f.read()),**_args )
  83. f.close()
  84. else:
  85. filename = _args['auth_file']
  86. raise Exception(f" {filename} was not found or is invalid")
  87. if 'provider' not in _args and 'auth_file' not in _args :
  88. if not registry.isloaded () :
  89. if ('path' in _args and registry.exists(_args['path'] )) or registry.exists():
  90. registry.load() if 'path' not in _args else registry.load(_args['path'])
  91. _info = {}
  92. if 'label' in _args and registry.isloaded():
  93. _info = registry.get(_args['label'])
  94. else:
  95. _info = registry.get()
  96. if _info :
  97. _args = dict(_info,**_args) #-- we can override the registry parameters with our own arguments
  98. if 'provider' in _args and _args['provider'] in PROVIDERS :
  99. _info = PROVIDERS[_args['provider']]
  100. _module = _info['module']
  101. if 'context' in _args :
  102. _context = _args['context']
  103. else:
  104. _context = 'read'
  105. _pointer = getattr(_module,'Reader') if _context == 'read' else getattr(_module,'Writer')
  106. _agent = _pointer (**_args)
  107. #
  108. loader = None
  109. #
  110. # @TODO:
  111. # define a logger object here that will used by the wrapper
  112. # this would allow us to know what the data-transport is doing and where/how it fails
  113. #
  114. # if 'plugins' in _args :
  115. # _params = _args['plugins']
  116. # if 'path' in _params and 'names' in _params :
  117. # loader = PluginLoader(**_params)
  118. # elif type(_params) == list:
  119. # loader = PluginLoader()
  120. # for _delegate in _params :
  121. # loader.set(_delegate)
  122. _plugins = None if 'plugins' not in _args else _args['plugins']
  123. # if registry.has('logger') :
  124. # _kwa = registry.get('logger')
  125. # _lmodule = getPROVIDERS[_kwa['provider']]
  126. if ( ('label' in _args and _args['label'] != 'logger') and registry.has('logger')):
  127. #
  128. # We did not request label called logger, so we are setting up a logger if it is specified in the registry
  129. #
  130. _kwargs = registry.get('logger')
  131. _kwargs['context'] = 'write'
  132. _kwargs['table'] =_module.__name__.split('.')[-1]+'_logs'
  133. # _logger = instance(**_kwargs)
  134. _module = PROVIDERS[_kwargs['provider']]['module']
  135. _logger = getattr(_module,'Writer')
  136. _logger = _logger(**_kwargs)
  137. else:
  138. _logger = None
  139. _kwargs = {'agent':_agent,'plugins':_plugins,'logger':_logger}
  140. if 'args' in _args :
  141. _kwargs['args'] = _args['args']
  142. # _datatransport = IReader(_agent,_plugins,_logger) if _context == 'read' else IWriter(_agent,_plugins,_logger)
  143. _datatransport = IReader(**_kwargs) if _context == 'read' else IWriter(**_kwargs)
  144. return _datatransport
  145. else:
  146. #
  147. # We can handle the case for an ETL object
  148. #
  149. raise Exception ("Missing or Unknown provider")
  150. pass
  151. class get :
  152. """
  153. This class is just a wrapper to make the interface (API) more conversational and easy to understand
  154. """
  155. @staticmethod
  156. def reader (**_args):
  157. if not _args or ('provider' not in _args and 'label' not in _args):
  158. _args['label'] = 'default'
  159. _args['context'] = 'read'
  160. # return instance(**_args)
  161. # _args['logger'] = instance(**{'label':'logger','context':'write','table':'logs'})
  162. _handler = instance(**_args)
  163. # _handler.setLogger(get.logger())
  164. return _handler
  165. @staticmethod
  166. def writer(**_args):
  167. """
  168. This function is a wrapper that will return a writer to a database. It disambiguates the interface
  169. """
  170. if not _args or ('provider' not in _args and 'label' not in _args):
  171. _args['label'] = 'default'
  172. _args['context'] = 'write'
  173. # _args['logger'] = instance(**{'label':'logger','context':'write','table':'logs'})
  174. _handler = instance(**_args)
  175. #
  176. # Implementing logging with the 'eat-your-own-dog-food' approach
  177. # Using dependency injection to set the logger (problem with imports)
  178. #
  179. # _handler.setLogger(get.logger())
  180. return _handler
  181. @staticmethod
  182. def logger ():
  183. if registry.has('logger') :
  184. _args = registry.get('logger')
  185. _args['context'] = 'write'
  186. return instance(**_args)
  187. return None
  188. @staticmethod
  189. def etl (**_args):
  190. if 'source' in _args and 'target' in _args :
  191. return IETL(**_args)
  192. else:
  193. raise Exception ("Malformed input found, object must have both 'source' and 'target' attributes")
  194. def supported ():
  195. _info = {}
  196. for _provider in PROVIDERS :
  197. _item = PROVIDERS[_provider]
  198. if _item['type'] not in _info :
  199. _info[_item['type']] = []
  200. _info[_item['type']].append(_provider)
  201. _df = pd.DataFrame()
  202. for _id in _info :
  203. if not _df.shape[0] :
  204. _df = pd.DataFrame(_info[_id],columns=[_id.replace('transport.','')])
  205. else:
  206. _df = pd.DataFrame(_info[_id],columns=[_id.replace('transport.','')]).join(_df, how='outer')
  207. return _df.fillna('')
  208. class factory :
  209. pass
  210. factory.instance = instance
  211. init()