|
- import pandas as pd
- import numpy as np
- import os
- import io
- import json
- from multiprocessing import Process
- import transport
- import sqlite3 as lite
- import numpy as np
- import transport
- import matplotlib.pyplot as plt
- import re, base64
- # from weasyprint import HTML, CSS
- COLORS = ["#fbd1a2","#00b2ca","#1d4e89","#4682B4","#c5c3c6","#4c5c68","#1985a1","#f72585","#7209b7","#3a0ca3","#4361ee","#4cc9f0","#ff595e","#ffca3a","#8ac926","#1982c4","#6a4c93"]
- class stdev :
- def __init__(self) :
- self.values = []
- def step(self,value):
- if value : #and type in [np.int64, np.int32,np.float64,np.float32, int]:
- self.values.append(value)
- def finalize(self):
- return np.std(self.values) if self.values else None
- # conn = lite.connect("/home/steve/healthcare-io/healthcare-io.db3")
- # conn.create_aggregate("stdev",1,stdev)
- # df = pd.read_sql("select count(distinct (json_extract(data,'$.patient_id'))) as patient_count, avg(json_array_length(data,'$.procedures')) mean, stdev(json_array_length(data,'$.procedures')) stdev from claims",conn)
- ROOT_FOLDER = 'stats'
- # plt.gcf().subplots_adjust(bottom=0.15)
- # from matplotlib import rcParams
- # rcParams.update({'figure.autolayout': True})
- class Chart :
- @staticmethod
- def remove_borders(axes,wedges,labels,item) :
- # plt.axes()
- axes.spines["top"].set_visible(False)
- # plt.axes().
- axes.spines["right"].set_visible(False)
- axes.legend(wedges, labels #,title=item['label']
- ,loc="upper right",fontsize=12,bbox_to_anchor=(1, 0, 0.5, 1),fancybox=True,framealpha=0.2)
- # plt.axes().
- # axes.spines["left"].set_visible(False)
- if 'axis' in item['chart'] :
-
- axes.set_ylabel(item['chart']['axis']['y'])
- axes.set_xlabel(item['chart']['axis']['x'])
- @staticmethod
- def donut(item,**args) :
- df = item['data']
- x = item['chart']['x'] #args['x']
- labels = item['chart']['y']
- labels = df[labels]
- # figure = plt.figure()
- figure, axes = plt. subplots()
- # wedges, texts = plt.pie(df[x],labels=labels)
- colors = COLORS[:len(labels)] #np.random.choice(COLORS,len(labels),replace=False)
- wedges = axes.pie(df[x],labels=labels,wedgeprops=dict(width=0.3),colors=colors,autopct=lambda pct: "{:.2f}%\n({:.0f})".format(pct,int((pct/100)*df[x].sum() ))) #,autopct=lambda pct: func(pct, df[x].values))
- # my_circle=plt.Circle( (0,0), 0.7, color='#ffffff',fill=True)
- # p=plt.gcf()
- # p.gca().add_artist(my_circle)
- # plt.legend(wedges, labels,title=item['label'],loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1))
- # axes.legend(wedges[0], labels,title=item['label'],loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1),framealpha=0,edgecolor='#CAD5E0',
- # )
- # x = plt.show()
- Chart.remove_borders(axes,wedges[0],labels,item)
- plt.close()
-
- return figure
- @staticmethod
- def barh(item,**args):
- """
- This function will return/render a bar chart (horizontal) which is conducive to showing distributions of things like diagnosis codes
- """
- # figure = plt.figure()
- figure, axes = plt. subplots()
- y_labels = item['chart']['y'][0]
- x_labels = item['chart']['x'] #[args['x']] if type(args['x']) == str else args['x']
- df = item['data'].iloc[:9].copy()
- # odf = item['data'].iloc[9:].copy().mean().to_frame().T
- # odf[y_labels] = 'Other'
- # df = df.append(odf)
- wedges = []
- # COLORS = ['#003f5c','#7a5195','#374c80','#bc5090','#ef5675','#ff764a','#ffa600']
- for x_ in x_labels:
- index = x_labels.index(x_)
- color = COLORS[index]
- w = axes.barh(df[y_labels],df[x_],align='edge',label='counts' ,color=color)
-
- wedges += [w]
- # labels = [name.replace('_',' ') for name in x_labels]
- # axes.legend(wedges,[name.replace('_',' ') for name in x_labels],
- # title=item['label'],
- # framealpha=0,
- # edgecolor='#CAD5E0',
- # loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1)
- # )
- Chart.remove_borders(axes,wedges,[name.replace('_',' ') for name in x_labels],item)
- plt.close()
-
- return figure
- @staticmethod
- def spline(item,**args):
- """
- """
- df = item['data']
- # figure = plt.figure()
- figure, axes = plt. subplots()
- wedges = []
- item['chart']['x'] = [item['chart']['x']]if type(item['chart']['x']) == str else item['chart']['x']
- # COLORS = ['#003f5c','#7a5195','#374c80','#bc5090','#ef5675','#ff764a','#ffa600']
- for xl in item['chart']['x'] :
- x = df[xl]
- index = 0
- for yl in item['chart']['y'] :
- y = df[yl]
- color = COLORS[index]
- if 'scatter' in args :
- w = plt.plot(x,y,'o',color=color)
- else:
- w = plt.plot(x,y,color=color,marker='o')
-
- wedges += w
- index += 1
- # print (item['chart']['x'])
- # if 'axis' in item :
- # axes.set_ylabel(item['axis']['y'])
- # axes.set_xlabel(item['axis']['x'])
- # plt.title(item['label'])
- # axes.legend(wedges,[name.replace('_',' ') for name in item['chart']['y']],
- # title=item['label'],
- # framealpha=0,
- # edgecolor='#CAD5E0',
- # loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1)
- # )
- axes.grid(b=False,which='major',axis='x')
- Chart.remove_borders(axes,wedges,[name.replace('_',' ') for name in item['chart']['y']],item)
- plt.close()
-
- return figure
- @staticmethod
- def scatter(item,**args):
- return Chart.spline(item,scatter=True)
- class Apex :
- """
- This class will format a data-frame to work with Apex charting engine
- """
- @staticmethod
- def apply(item,theme={'mode':'light','palette':'palette6'}):
- pointer = item['chart']['type']
- if hasattr(Apex,pointer) :
- pointer = getattr(Apex,pointer)
-
- options = pointer(item)
- if 'apex' in options and 'colors' in options['apex'] :
- del options['apex']['colors']
- if 'apex' in options :
- options['apex']['theme'] = theme
- options['responsive']= [
- {
- 'breakpoint': 1,
- 'options': {
- 'plotOptions':item['plotOptions'] if 'plotOptions' in item else None,
-
- }
- }
- ]
- return options
- else:
- print ("Oops")
- pass
- @staticmethod
- def radial(item):
- df = item['data']
- x = item['chart']['axis']['x']
- y = item['chart']['axis']['y']
-
- labels = df[y].tolist()
- values = [float(np.round(value,2)) for value in df[x].tolist()]
- chart = {"type":"radialBar","height":200}
- option = {"chart":chart,"series":values,"labels":labels,"plotOptions":{"radialBar":{"hollow":{"size":"70%"}}}}
- return {'apex':option}
- @staticmethod
- def scatter(item):
- options = Apex.spline(item)
- options['apex']['chart']['type'] = 'scatter'
- return options
- @staticmethod
- def scalar(item):
- _df = item['data']
- value = '0'
- unit = ''
- html = '<div class="scalar"><div class="value">:value</div><div class="label">:label</div></div>'
- if _df.shape[0] > 0 :
- print (_df)
- print ('_____________________________________')
- name = _df.columns.tolist()[0]
- value = _df[name].values[0]
-
- if value > 999 and value < 1000000 :
- value = " ".join([str(np.divide(value,1000).round(2)),"K"])
- elif value > 999999 :
- #@ Think of considering the case of a billion ...
- value = " ".join([str(np.divide(value,1000000).round(2)),"M"])
- else:
- value = str(value)
- unit = name.replace('_',' ') if 'unit' not in item else item['unit']
- return {'html':html.replace(':value',value).replace(":label",unit)}
- @staticmethod
- def column(item):
- df = item['data']
- N = df.shape[0] if df.shape[0] < 10 else 10
- axis = item['chart']['axis']
- x = axis['x']
- if type(x) == list :
- x = x[0]
- axis['y'] = [axis['y']] if type(axis['y']) != list else axis['y']
- series = []
- for y in axis['y'] :
- series += [{"data": df[y].values.tolist()[:N],"name":y.upper().replace('_',' ')}]
- xtitle,ytitle = Apex.get_labels(item)
- options = {"chart":{"type":"bar"},"plotOptions":{"bar":{"horizontal":False,"width:":2,"color":["transparent"]}},"dataLabels":{"enabled":False},"legend":{"position":"right"}}
- options['xaxis'] = {"categories":df[x].values.tolist()[:N],"title":xtitle['title']}
- options['yaxis'] = ytitle
- options['series'] = series
- options['colors'] = COLORS[:df[x].size]
- return {"apex":options}
- # options = Apex.barh(item)
- # options['chart']['type'] = 'column'
- # options['plotOptions']['bar'] = {'horizontal':False,'columnWidth':'55%'}
- # options['stroke']={'show':True,'width':2,'colors':['transparent']}
- # return {"apex":options}
- @staticmethod
- def get_labels(item):
- xtitle = ytitle = ""
- if "labels" not in item['chart'] :
- xtitle = item['chart']['axis']['x']
- ytitle = item['chart']['axis']['y']
- else:
- xtitle = item['chart']['labels']['x']
- ytitle = item['chart']['labels']['y']
- xtitle = xtitle if type(xtitle) != list else xtitle[0]
- ytitle = ytitle if type(ytitle) != list else ytitle[0]
- return {"title":{"text":xtitle.lower().replace('_',' '),"style":{"fontWeight":"lighter"}}},{"title":{"text":ytitle.lower().replace('_',' '),"style":{"fontWeight":"lighter"}}}
-
- @staticmethod
- def bar(item):
- return Apex.barh(item)
- @staticmethod
- def barh(item):
- """
- rendering a horizontal bar chart assuming for now that only one series is involved
- @TODO: alias this with bar (!= column)
- """
- df = item['data']
-
- N = df.shape[0] if df.shape[0] < 10 else 10
- axis = item['chart']['axis']
- y = axis['y']
- if type(y) == list :
- y = y[0]
- axis['x'] = [axis['x']] if type(axis['x']) != list else axis['x']
- # if not set(axis['x']) & set(df.columns.tolist()) :
- # print (set(axis['x']) & set(df.columns.tolist()))
- # print (axis['x'])
- # print (df.columns)
- # df.columns = axis['x']
- series = []
- _min=_max = 0
- for x in axis['x'] :
-
- series += [{"data": df[x].values.tolist()[:N],"name":x.upper().replace('_',' ')}]
- _min = df[x].min() if df[x].min() < _min else _min
- _max = df[x].max() if df[x].max() > _max else _max
-
- xtitle , ytitle = Apex.get_labels(item)
- options = {"chart":{"type":"bar"},"plotOptions":{"bar":{"horizontal":True}},"dataLabels":{"enabled":False},"legend":{"position":"right"}}
- options['xaxis'] = {"categories":df[y].values.tolist()[:N],"title":xtitle['title']}
-
- options['yaxis'] = ytitle
- options['series'] = series
- options['colors'] = COLORS[:df[x].size]
- return {"apex":options}
-
- @staticmethod
- def spline(item):
- series = []
- df = item['data']
- N = df.shape[0] if df.shape[0] < 10 else 10
- axis = item['chart']['axis']
- x = axis['x']
- _min=_max = 0
- for y in axis['y'] :
- series += [{"data":df[y].values[:N].tolist(),"name":y.upper().replace('_',' ')}]
- _min = df[y].min() if df[y].min() < _min else _min
- _max = df[y].max() if df[y].max() > _max else _max
- colors = COLORS[:len(axis['y'])]
- options = {"chart":{"type":"line"},"series":series,"stroke":{"curve":"smooth"},"colors":colors,"legend":{"position":"right"}}
- xtitle , ytitle = Apex.get_labels(item)
-
- options['xaxis'] = {"categories":df[x].values[:N].tolist(),"title":xtitle['title']}
- options['yaxis'] = ytitle
- return {"apex":options}
- @staticmethod
- def donut(item):
- """
- :pre data must have more than one item otherwise just make it a scalar
- here we will use the key as labels and the values as the values (obviously)
- labels are y-axis
- values are x-axis
- """
- df = item['data']
- if df.shape [0]> 1 :
- y_cols,x_cols = item['chart']['axis']['y'],item['chart']['axis']['x']
- labels = df[y_cols].values.tolist()
-
- values = df[x_cols].values.round(2).tolist()
- else:
- labels = [name.upper().replace('_',' ') for name in df.columns.tolist()]
- # df = df.astype(float)
- # values = df.values.round(2).tolist()[0] if df.shape[1] > 1 else df.values.round(2).tolist()
- values = df[[name for name in df.columns if df[name].dtype in [float,int]] ].values.round(2).tolist()
-
- colors = COLORS[:len(values)]
- options = {"series":values,"colors":colors,"labels":labels,"dataLabels":{"enabled":True,"style":{"colors":["#000000"]},"dropShadow":{"enabled":False}},"chart":{"type":"donut","width":200},"plotOptions":{"pie":{"customScale":.9}},"legend":{"position":"right"}}
- return {"apex":options}
- pass
-
- class engine :
- """
- This engine is designed to load the configuration and run the queries given they are remittance or claims
- @TODO:
- - make sure the readers of the queries are configurable i.e use data-transport
- """
- def __init__(self,path) :
- """
- Loading configuration file from a designated location ...
- """
- f = open(path) ;
- _config = json.loads(f.read())
- self.store_config = _config['store']
- self.info = _config['analytics']
- _args = self.store_config
- if 'type' not in self.store_config :
- #
- # This is the newer version of data-transport
- self.store_config['context'] = 'read'
- self.store_config = _args ;
-
- def filter (self,**args):
- """
- type: claims or remits
- filter optional identifier claims, procedures, taxonomy, ...
- """
-
-
- _m = {'claim':'837','claims':'837','remits':'835','remit':'835'}
- table = _m[ args['type']]
- _analytics = self.info[table]
- if 'index' in args :
- index = int(args['index'])
- _analytics = [_analytics[index]]
-
- _info = list(_analytics) #if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']]
- # conn = lite.connect(self.store_config['args']['path'],isolation_level=None)
- # conn.create_aggregate("stdev",1,stdev)
- DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql'
- # if DB_TYPE == 'mongo' :
- # self.store_config['args']['doc'] = args['type']
-
- self.reader = transport.factory.instance(**self.store_config)
- r = []
- for row in _info :
- pipeline = row['pipeline']
-
- index = 0
- for item in pipeline:
- if not item[DB_TYPE] :
- continue
- query = {DB_TYPE:item[DB_TYPE]}
-
- df = pd.DataFrame(self.reader.read(**query)) #item)
- df = df.fillna('N/A')
- # item['data'] = df
- chart = item['chart']
- pipe = {"data":df,"chart":chart}
- for key in list(item.keys()) :
- if key not in ["chart","data","mongo","sql","couch"] :
- pipe[key] = item[key]
-
-
-
- r.append(pipe)
- self.reader.close()
- return {"id":_info[0]['id'],'pipeline':r}
-
- def apply (self,**args) :
- """
- type: claims or remits
- filter optional identifier claims, procedures, taxonomy, ...
- """
-
-
- _m = {'claim':'837','claims':'837','remits':'835','remit':'835'}
- # key = '837' if args['type'] == 'claims' else '835'
- table = _m[ args['type']]
-
- _analytics = self.info[table]
- if 'index' in args :
- index = int(args['index'])
- _analytics = [_analytics[index]]
-
- _info = list(_analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']]
- self.reader = transport.factory.instance(**self.store_config)
- DB_TYPE = 'mongo' if self.store_config ['provider'] in ['mongodb','mongo'] else 'sql'
- r = []
- for row in _info :
- pipeline = row['pipeline']
- index = 0
- for item in pipeline:
- # item['data'] = pd.read_sql(item['sql'],conn)
- # query = {DB_TYPE:item[DB_TYPE]}
- query = item[DB_TYPE]
- if not query :
- continue
- if DB_TYPE == 'sql' :
- query = {"sql":query}
- else:
- query = {DB_TYPE:query}
-
- _df = self.reader.read(**query) #item)
- print (query)
- print (self.reader)
- if 'serialize' in args :
-
- # item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data']
- item['data'] = json.dumps(_df.to_dict(orient='record'))
- else:
- # item['data'] = (pd.DataFrame(item['data']))
- item['data'] = _df
- pass
- print (_df.head())
- break
- pipeline[index] = item
- index += 1
- #
- #
- row['pipeline']= pipeline
-
- # if 'info' in item:
- # item['info'] = item['info'].replace(":rows",str(item["data"].shape[0]))
- # conn.close()
- self.reader.close()
- return _info
- def _html(self,item) :
- figure = None
- df = item['data']
- label = ['<div class="label">',item['label'],'</div>']
- text = ['<div class="grid">',df.describe().iloc[:].round(2).to_html().replace('_',' '),'</div>']
- info = ['<div class="info">',item['info'],'</div>'] if 'info' in item else []
- if item['chart']['type'] in ['pie','donut','doughnut'] :
- figure = Chart.donut(item)
- text = ['<div class="grid">',df.to_html(index=False).replace('_',' '),'</div>']
- elif item['chart']['type'] == 'scatter' :
- figure = Chart.scatter(item)
- elif item['chart']['type'] == 'spline' :
- figure = Chart.spline(item)
- elif item['chart']['type'] in ['barh','hbar'] :
- figure = Chart.barh(item)
- elif item['chart']['type'] == 'scalar' :
-
- figure = (item['data'].apply(lambda col: '<div class="scalar"><div class="value bold">'+str(col.values[0].round(2))+'</div><div class="value-text">'+col.name.replace('_', ' ')+'</div></div>' ).tolist())
- label = text = []
- pass
- if figure and item['chart']['type'] != 'scalar':
- stream = io.BytesIO()
- figure.savefig(stream,format='png',dpi=300,quality=95, bbox_inches = "tight",transparent=True)
- stream.seek(0)
- stream = base64.b64encode(stream.getvalue()).decode("utf-8")
- stream = "data:image/png;base64,"+stream
- figure = ['<div class="figure"><img src="'+stream+'">',"</div>"]
-
- # figure.canvas.draw()
- # figure = "".join( map(chr,figure.canvas.tostring_argb())) #--bytes
- # else:
- # figure = [ ]
- if item['chart']['type'] != 'scalar':
- return ['<div class="frame"><div class="chart '+ item['chart']['type']+'">'] + [ " ".join(row) for row in [label,figure,text,info] if row] + ["</div></div>"]
- else:
- return [ " ".join(row) for row in [label,figure,text,info] if row]
- pass
- def _csv(self,item):
- pass
- def export(self,item,format):
- """
- We have a pipeline here and we should attempt to build a figure using seaborn within an html template using jinja2
- This is considered a page (or an item) of an analysis where we will have both data and rendering information with accompanying text
- """
- html = []
- for row in item['pipeline'] :
- p = [ "<h2>",row['label'].replace('_',' '),"</h2>"]
- y_label = [name for name in row['data'].columns if 'count' in name]
- x_label = list(set(row['data'].columns) - set(y_label))
- N = row.shape[0]
- if 'info' in row :
- p += ["<div class='info'>",row['info'],'</div>']
-
- pass
- class LogAnalytics :
- def __init__(self,path):
- logs = open(path).read().split('\n')
- logs = [json.loads(row) for row in logs if row.strip() != '']
- self.remits = {
- "completed": np.sum([1 for row in logs if row['completed'] == True]),
- "files":len(logs)
- }
-
- # m = LogAnalytics('/home/steve/healthcare-io/remits.log')
- css = """
- <meta charset="utf-8">
- <meta name="viewport" content="width=device-width, initial-scale=1.0">
- <title>HealthcareIO - :title </title>
- <style>
- body{
- padding:8px;
- padding-left:4%;
- padding-right:4%;
-
- }
- .pane{
- padding:4px;
- display:grid;
- gap:16px;
- grid-template-columns:repeat(2,1fr) ;
-
-
- }
- .numbers {
- display:grid;
- grid-template-columns:repeat(2,1fr);
- gap:16px;
- /*padding:2px;*/
- /*border:1px solid #CAD5E0;*/
-
- }
- .numbers .scalar {
- padding:8px;
- background-image: linear-gradient(to bottom, #f3f3f3,#d3d3d3, #ffffff);
- border:1px solid #CAD5E0;
- font-family:sans-serif;
- text-transform:capitalize;
- text-align:right;
- font-size:12px;
- display:grid;
- grid-template-rows:auto 28px; gap:2px;
-
- }
- .numbers .scalar .value-text {
- border-top:1px solid #CAD5E0;
- padding:8px;
- font-weight:bold;
- align-items:center;
- font-size:14px;
- display:grid;
-
-
- }
- .numbers .scalar .value {
- display:grid;
- color:#004b79;
- align-content:center;
- font-size:48px; text-align:right; font-weight:bold;}
- .frame {
- background-image: linear-gradient(to bottom, #f3f3f3,#d3d3d3, #ffffff);
- padding:2px;
- border:1px solid #CAD5E0;
-
- }
- .figure {grid-area:figure; width:500px; height:350px; display:grid; align-items:center}
- .info {height:28px; width:100%; grid-area:info;
- display:grid;
- align-items:center;
- text-align:center; text-transform:capitalize; padding:4px; font-size:12px; font-family:sans-serif; border-top:1px solid #CAD5E0;}
- .grid {grid-area:grid; }
- .label {grid-area:label; font-weight:bold; font-size: 22px; text-align:center; text-transform:capitalize}
- .chart {
- padding:4px;
- padding:8px;
- display:grid; grid-template-areas:
- "label label label"
- "figure grid grid"
- "info info info" ;
-
- gap:2px;
-
- }
- img {height:auto; max-width:100% ;}
- table {width:100%; border-collapse: collapse;}
- table , TH, TD{ font-size:14px; padding:8px; font-family:sans-serif; border:1px; border:1px solid #CAD5E0;}
- table thead, tbody th { padding:4px; text-transform:capitalize; background-color:#4682B4; color:#ffffff; text-align:center}
- table thead tr th {text-align:center}
- table tbody td {text-align:right; font-weight: lighter}
- table tbody tr:nth-child(odd) {background: #95bce0}
- table tbody tr:nth-child(even) {background: #c8e5ff}
-
-
- </style>
- """
- # folder = '/home/steve/.healthcareio/config.json'
- # e = engine(path=folder)
- # p = e.apply(type='claims')
- # values = []
- # html = [css]
- # for row in p :
- # frame = []
- # for item in row['pipeline'] :
- # if row['pipeline'].index(item) == 0 :
- # if item['chart']['type'] != 'scalar' :
- # # frame = ['<div class="frame">']
- # pass
- # else:
- # frame = ['<div><div class="numbers">']
-
- # frame += e._html(item) #p[3]['pipeline'][0])
- # frame += ['</div></div>'] if item['chart']['type'] == 'scalar' else []
- # html += frame
- # html = '<div class="pane">' + "\n".join(html) + "</div></div>"
- # f = open('out.html','w')
- # f.write(html.replace(":title","Claims"))
- #
- # HTML(string=html).write_pdf('out.pdf',stylesheets=[CSS(string=css)])
- # x.write_pdf('./out.pdf')
- # print (p[2]['pipeline'][0]['data'])
- # e.export (p[0])
- # features = ['diagnosis.code']
- # split(folder = folder, features=features)
|