export.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. """
  2. This file implements exporting data from a mongodb database to another data-store in relational format (csv). Any other use case will have to be performed with mongodb native tools
  3. target:
  4. File/SQLite
  5. PostgreSQL
  6. MySQL
  7. @TODO:
  8. - Insure to support both schemas and table prefixes
  9. Usage :
  10. License:
  11. Copyright 2019, The Phi Technology LLC
  12. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
  13. The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
  14. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  15. """
  16. import transport
  17. import numpy as np
  18. import os
  19. import json
  20. import jsonmerge
  21. import sys
  22. # from airflow import DAG
  23. from datetime import timedelta
  24. # import healthcareio.export.workers as workers
  25. from healthcareio.export import workers
  26. import platform
  27. from datetime import datetime
  28. import copy
  29. import requests
  30. import time
  31. PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','config.json'])
  32. STORE_URI = 'http://healthcareio.the-phi.com/store/healthcareio'
  33. #
  34. # let us see if we have any custom configurations ...
  35. PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','custom'])
  36. CONFIG = {}
  37. CUSTOM_CONFIG = {}
  38. # if os.path.exists(PATH) and os.listdir(PATH) :
  39. # CONFIG = json.loads((open(PATH)).read())
  40. # PATH = os.sep.join([PATH,os.listdir(PATH)[0]])
  41. # CUSTOM_CONFIG = json.loads((open(PATH)).read())
  42. # _args = dict(CONFIG['store'],**{'type':'mongo.MongoReader'})
  43. def get_field_names (_map,label):
  44. fields = list(_map.keys())
  45. return fields if not label else [{label:fields}]
  46. def get_field (entry):
  47. label = list(set(['label','field']) & set(entry.keys()))
  48. label = None if not label else entry[label[0]]
  49. if 'map' not in entry :
  50. return None
  51. _map = entry['map']
  52. return get_field_names(_map,label)
  53. pass
  54. #
  55. #-- Get the fields to export that will go in the the unwind ;
  56. #
  57. def meta(config) :
  58. """
  59. This function will return the metadata associated with a given configuraiton 835 or 838
  60. :params config configuration section
  61. """
  62. _info = []
  63. table_count = 1
  64. cached = {}
  65. for prefix in config :
  66. if 'cache' in config[prefix] :
  67. _cache = config[prefix]['cache']
  68. field = _cache['field']
  69. key = _cache['key']
  70. if 'map' in config[key]:
  71. config[key]['map'][field] = -100
  72. for prefix in config :
  73. # if 'map' in config[prefix] :
  74. # label = list(set(['label','field']) & set(config[prefix].keys()))
  75. # label = None if not label else config[prefix][label[0]]
  76. # _map = config[prefix]['map']
  77. # _info += (field_info(_map,label))
  78. if type(config[prefix]) != dict :
  79. continue
  80. if '@ref' in config[prefix] : #and set(['label','field','map']) & set(config[prefix]['@ref'].keys()):
  81. for subprefix in config[prefix]['@ref'] :
  82. _entry = config[prefix]['@ref'][subprefix]
  83. if 'map' in _entry :
  84. _info += get_field(_entry)
  85. else:
  86. _info += list(_entry.keys())
  87. if set(['label','field','map']) & set(config[prefix].keys()):
  88. _entry = config[prefix]
  89. if 'map' in _entry :
  90. _info += get_field(_entry)
  91. #
  92. # We need to organize the fields appropriately here
  93. #
  94. fields = {"main":[],"rel":{}}
  95. for row in _info :
  96. if type(row) == str :
  97. fields['main'] += [row]
  98. fields['main'] = list(set(fields['main']))
  99. fields['main'].sort()
  100. else :
  101. fields['rel'] = jsonmerge.merge(fields['rel'],row)
  102. return fields
  103. def create (**_args) :
  104. skip = [] if 'skip' not in _args else _args['skip']
  105. fields = ([_args['key']] if 'key' in _args else []) + _args['fields']
  106. fields = ['_id'] + fields
  107. table = _args['table']
  108. sql = ['CREATE TABLE :table ',"(",",\n".join(["\t".join(["\t",name,"VARCHAR(125)"]) for name in fields]),")"]
  109. return " ".join(sql)
  110. def read (**_args) :
  111. """
  112. This function will read rows with a set number of files and store them into a data-store
  113. """
  114. files = _args['files']
  115. fields = _args ['fields']
  116. name = _args['id']
  117. pipeline= {"find":name,"filter":{"name":{"$in":files}}}
  118. #
  119. # @TODO: Find a way to write the data into a data-store
  120. # - use dbi interface with pandas or stream it in
  121. #
  122. def init (**_args) :
  123. """
  124. This function is intended to determine the number of tables to be created, as well as their type.
  125. :param type {835,837}
  126. :param skip list of fields to be skipped
  127. """
  128. TYPE = _args['type']
  129. SKIP = _args['skip'] if 'skip' in _args else []
  130. _config = CONFIG['parser'][TYPE][0]
  131. if TYPE in CUSTOM_CONFIG :
  132. _config = jsonmerge.merge(_config,CUSTOM_CONFIG[TYPE])
  133. #
  134. # @TODO: implement fields to be skipped ...
  135. #
  136. TABLE_NAME = 'claims' if TYPE== '837' else 'remits'
  137. _info = meta(_config)
  138. # project = dict.fromkeys(["_id","claim_id"]+_info['main'],1)
  139. project = {}
  140. for field_name in _info['main'] :
  141. _name = "".join(["$",field_name])
  142. project[field_name] = {"$ifNull":[_name,""]}
  143. project["_id"] = 1
  144. project = {"$project":project}
  145. r = [{"table":TABLE_NAME,"mongo":{"aggregate":TABLE_NAME,"pipeline":[project],"cursor":{},"allowDiskUse":True},"sql":create(table=TABLE_NAME,fields=_info['main'])}]
  146. for table in _info['rel'] :
  147. #
  148. # NOTE: Adding _index to the fields
  149. fields = _info['rel'][table] +["_index"]
  150. project = {"_id":1,"claim_id":1,"_index":1} #dict.fromkeys(["_id","claim_id"]+fields,[ ".".join([table,field_name]) for field_name in fields])
  151. for field_name in fields :
  152. # project[field_name] = "$"+".".join([table,field_name])
  153. _name = "$"+".".join([table,field_name])
  154. project[field_name] = {"$ifNull":[_name,""]} #{"$cond":[{"$eq":[_name,None]},"",_name]}
  155. project["_id"] = 1
  156. # pipeline = [{"$match":{"procedures":{"$nin":[None,'']}}},{"$unwind":"$"+table},{"$project":project}]
  157. pipeline = [{"$match": {table: {"$nin": [None, ""]}}},{"$unwind":"$"+table},{"$project":project}]
  158. r += [{"table":table,"mongo":{"aggregate":TABLE_NAME,"cursor":{},"pipeline":pipeline,"allowDiskUse":True},"sql":create(table=table,key='claim_id',fields=fields)}]
  159. return r
  160. class Factory:
  161. @staticmethod
  162. def license(**_args):
  163. body = {}
  164. body['email'] = _args['email']
  165. body['host'] = platform.node()
  166. body['date'] = {"month":datetime.now().month,"year":datetime.now().year,"day":datetime.now().day}
  167. headers = {'uid': body['email'],'content-type':'application/json'}
  168. uri = STORE_URI+'/init'
  169. http = requests.session()
  170. r = http.post(uri,headers=headers,data=body)
  171. return r.json() if r.status_code == 200 else {}
  172. @staticmethod
  173. def instance(**_args):
  174. """
  175. The creation process will only require a target store and a type (385,837)
  176. :param type EDI type to be processed i.e 835 or 837
  177. :param write_store target data-store (redshift, mariadb,mongodb ...)
  178. """
  179. global PATH
  180. global CONFIG
  181. global CUSTOM_CONFIG
  182. PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','config.json'])
  183. if os.path.exists(PATH):
  184. CONFIG = json.loads((open(PATH)).read())
  185. CUSTOM_PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','custom'])
  186. if os.path.exists(CUSTOM_PATH) and os.listdir(CUSTOM_PATH) :
  187. CUSTOM_PATH = os.sep.join([CUSTOM_PATH,os.listdir(CUSTOM_PATH)[0]])
  188. CUSTOM_CONFIG = json.loads((open(CUSTOM_PATH)).read())
  189. _features = Factory.license(email=CONFIG['owner'])
  190. store = copy.deepcopy(CONFIG['store'])
  191. store['type']='mongo.MongoReader'
  192. wstore = _args['write_store'] #-- output data store
  193. TYPE = _args['type']
  194. PREFIX = 'clm_' if TYPE == '837' else 'era_'
  195. SCHEMA = '' if 'schema' not in wstore['args'] else wstore['args']['schema']
  196. _config = CONFIG['parser'][TYPE][0]
  197. if TYPE in CUSTOM_CONFIG :
  198. _config = jsonmerge.merge(_config,CUSTOM_CONFIG[TYPE])
  199. # _info = meta(_config)
  200. job_args = init(type=TYPE)
  201. # print (json.dumps(job_args))
  202. _jobs = []
  203. for row in job_args:
  204. # _store = json.loads(json.dumps(wstore))
  205. _store = copy.deepcopy(wstore)
  206. _store['args']['table'] = row['table']
  207. _pipe = [
  208. workers.CreateSQL(prefix=PREFIX,schema=SCHEMA,store=_store,sql=row['sql']),
  209. workers.Reader(prefix=PREFIX,schema=SCHEMA,store=store,mongo=row['mongo'],max_rows=250000,features=_features,table=row['table']),
  210. workers.Writer(prefix=PREFIX,schema=SCHEMA,store=_store)
  211. ]
  212. _jobs += [workers.Subject(observers=_pipe,name=row['table'])]
  213. return _jobs
  214. # if __name__ == '__main__' :
  215. # pass
  216. # pipes = Factory.instance(type='835',write_store={"type":"sql.SQLWriter","args":{"provider":"postgresql","db":"sample",}}) #"inspect":0,"cast":0}})
  217. # # pipes[0].run()
  218. # for thread in pipes:
  219. # thread.start()
  220. # time.sleep(1)
  221. # while pipes :
  222. # pipes = [thread for thread in pipes if thread.is_alive()]
  223. # time.sleep(10)
  224. # print (Factory.license(email='steve@the-phi.com'))
  225. #
  226. # check account with basic inormation
  227. #
  228. # class Observerob:
  229. # def __init__(**_args) :
  230. # #-- Let us all flatten the table
  231. # #
  232. # TYPE = '835'
  233. # _config = jsonmerge.merge(CONFIG['parser'][TYPE][0],CUSTOM_CONFIG[TYPE])
  234. # # f = meta(CONFIG['parser'][TYPE][0])
  235. # # _f = meta(CUSTOM_CONFIG[TYPE])
  236. # f = meta(_config)
  237. # # print (json.dumps( (f)))
  238. # print (json.dumps(init(type='835')))
  239. # # print (create(fields=f['rel']['adjudicated'],table='adjudicated',key='claim_id'))