فهرست منبع

bug fix and enhancements

Steve L. Nyemba -- The Architect 6 سال پیش
والد
کامیت
11a247d0ea
2فایلهای تغییر یافته به همراه125 افزوده شده و 60 حذف شده
  1. 51 36
      src/pandas_risk.py
  2. 74 24
      src/risk.py

+ 51 - 36
src/pandas_risk.py

@@ -40,61 +40,76 @@ class deid :
         
         id  = args['id']
         if 'quasi_id' in args :
-		num_runs = 1
-		columns = list(set(args['quasi_id'])- set(id) )
-	else :
-		num_runs  = args['num_runs'] if 'num_runs' in args else 100
-		columns = list(set(self._df.columns) - set([id]))
-        r   = pd.DataFrame()
+            num_runs = 1
+            columns = list(set(args['quasi_id'])- set(id) )
+        else :
+            num_runs  = args['num_runs'] if 'num_runs' in args else 100
+            columns = list(set(self._df.columns) - set([id]))
         
+        r   = pd.DataFrame()    
         k = len(columns)
+        N = self._df.shape[0]
+        tmp = self._df.fillna(' ')
+        np.random.seed(1)
         for i in range(0,num_runs) :
+            
             #
             # let's chose a random number of columns and compute marketer and prosecutor risk
             # Once the fields are selected we run a groupby clause
             #
-	    if 'quasi_id' not in args :
-		    n   = np.random.randint(2,k) #-- number of random fields we are picking
-		    ii = np.random.choice(k,n,replace=False)
-		    cols = np.array(columns)[ii].tolist()
+            if 'quasi_id' not in args :
+                if 'field_count' in args :
+                    #
+                    # We chose to limit how many fields we passin
+                    n   = np.random.randint(2,int(args['field_count'])) #-- number of random fields we are picking    
+                else :
+                    n   = np.random.randint(2,k) #-- number of random fields we are picking
+                ii = np.random.choice(k,n,replace=False)
+                cols = np.array(columns)[ii].tolist()
+                policy = np.zeros(k)
+                policy [ii]  = 1
+                policy =  pd.DataFrame(policy).T
+
             else:
-	    	cols 	= columns
-		n 	= len(cols)
-	    x_ = self._df.groupby(cols).count()[id].values
+                cols 	= columns
+                policy = np.ones(k)
+                policy = pd.DataFrame(policy).T
+            n 	= len(cols)
+            policy.columns = columns
+            N = tmp.shape[0]
+
+            x_ = tmp.groupby(cols).size().values
+            # print [id,i,n,k,self._df.groupby(cols).count()]
             r = r.append(
                 pd.DataFrame(
                     [
                         {
-                            "selected":n,
+                            "group_count":x_.size,
+                            "patient_count":N,
+                            "field_count":n,
                             "marketer": x_.size / np.float64(np.sum(x_)),
                             "prosecutor":1 / np.float64(np.min(x_))
 
                         }
                     ]
-                )
+                ).join(policy)
             )
-            g_size = x_.size
-            n_ids = np.float64(np.sum(x_))
+            # g_size = x_.size
+            # n_ids = np.float64(np.sum(x_))  
+            # sql = """
+            #  SELECT COUNT(g_size) as group_count, :patient_count as patient_count,SUM(g_size) as rec_count, COUNT(g_size)/SUM(g_size) as marketer, 1/ MIN(g_size) as prosecutor, :n as field_count
+            #     FROM (
+            #     SELECT COUNT(*) as g_size,:key,:fields
+            #     FROM :full_name
+            #     GROUP BY :fields
+            # """.replace(":n",str(n)).replace(":fields",",".join(cols)).replace(":key",id).replace(":patient_count",str(N))
+            # r.append(self._df.query(sql.replace("\n"," ").replace("\r"," ") ))
 
         return r
 
 
-import pandas as pd
-import numpy as np
-from io import StringIO
-csv = """
-id,sex,age,profession,drug_test
-1,M,37,doctor,-
-2,F,28,doctor,+
-3,M,37,doctor,-
-4,M,28,doctor,+
-5,M,28,doctor,-
-6,M,37,doctor,-
-"""
-f = StringIO()
-f.write(unicode(csv))
-f.seek(0)
-df = pd.read_csv(f)     
-print df.deid.risk(id='id',num_runs=2)   
-print " *** "
-print df.deid.risk(id='id',quasi_id=['sex','age','profession'])
+# df = pd.read_gbq("select * from deid_risk.risk_30k",private_key='/home/steve/dev/google-cloud-sdk/accounts/curation-test.json')
+# r =  df.deid.risk(id='person_id',num_runs=200)
+# print r[['field_count','patient_count','marketer','prosecutor']]
+
+

+ 74 - 24
src/risk.py

@@ -51,9 +51,10 @@ class utils :
         r = []
         ref = client.dataset(dataset)
         tables = list(client.list_tables(ref))
+        TERMS = ['type','unit','count','refills','stop','supply','quantity']
         for table in tables :
             
-            if table.table_id.strip() in ['people_seed']:
+            if table.table_id.strip() in ['people_seed','measurement','drug_exposure','procedure_occurrence','visit_occurrence','condition_occurrence','device_exposure']:
                 print ' skiping ...'
                 continue
             ref = table.reference
@@ -62,12 +63,15 @@ class utils :
             rows = table.num_rows
             if rows == 0 :
                 continue
-            names = [f.name for f in schema]
+            
+            names = [f.name for f in schema if len (set(TERMS) & set(f.name.strip().split("_"))) == 0 ]
+            
             x = list(set(names) & set([key]))
             if x  :
                 full_name = ".".join([dataset,table.table_id])
                 r.append({"name":table.table_id,"fields":names,"row_count":rows,"full_name":full_name})
         return r
+    
     def get_field_name(self,alias,field_name,index):
         """
             This function will format the a field name given an index (the number of times it has occurred in projection)
@@ -82,6 +86,25 @@ class utils :
             return ".".join(name)+" AS :field_name:index".replace(":field_name",field_name).replace(":index",str(index))
         else:
             return ".".join(name)
+    def get_filtered_table(self,table,key):
+        """
+            This function will return a table with a single record per individual patient
+        """
+        return """
+            
+            SELECT :table.* FROM (
+                SELECT row_number() over () as top, * FROM :full_name ) as :table
+                
+            
+            INNER JOIN (
+            SELECT MAX(top) as top, :key FROM ( 
+                SELECT row_number() over () as top,:key from :full_name ) GROUP BY :key
+                
+            )as filter
+            ON filter.top = :table.top and filter.:key = :table.:key 
+
+        """.replace(":key",key).replace(":full_name",table['full_name']).replace(":table",table['name'])
+    
     def get_sql(self,**args):
         """
             This function will generate that will join a list of tables given a key and a limit of records
@@ -91,7 +114,7 @@ class utils :
         """
         tables  = args['tables'] 
         key     = args['key']
-        limit   = args['limit'] if 'limit' in args else 300000
+        limit   = args['limit'] if 'limit' in args else 10000
         limit   = str(limit) 
         SQL = [
             """ 
@@ -105,9 +128,10 @@ class utils :
             alias= table['name']
             index = tables.index(table)
             sql_ = """ 
-                (select * from :name limit :limit) as :alias
+                (select * from :name ) as :alias
             """.replace(":limit",limit)
-            sql_ = sql_.replace(":name",name).replace(":alias",alias)
+            # sql_ = " ".join(["(",self.get_filtered_table(table,key)," ) as :alias"])
+            sql_ = sql_.replace(":name",name).replace(":alias",alias).replace(":limit",limit)
             fields += [self.get_field_name(alias,field_name,index) for field_name in table['fields'] if field_name != key or  (field_name==key and  tables.index(table) == 0) ]
             if tables.index(table) > 0 :
                 join = """
@@ -139,20 +163,23 @@ class risk :
         fields  = list(set(table['fields']) - set([key]))
         #-- We need to select n-fields max 64
         k = len(fields)        
-        n = np.random.randint(2,64)  #-- how many random fields are we processing
+        if 'field_count' in args :
+            n   = np.random.randint(2, int(args['field_count']) ) #-- number of random fields we are picking
+        else:
+            n = np.random.randint(2,k)  #-- how many random fields are we processing
         ii = np.random.choice(k,n,replace=False)
         stream = np.zeros(len(fields) + 1) 
-	stream[ii] = 1
-	stream = pd.DataFrame(stream.tolist()).T
-	stream.columns = args['table']['fields']
-	fields = list(np.array(fields)[ii])
+        stream[ii] = 1
+        stream = pd.DataFrame(stream.tolist()).T
+        stream.columns = args['table']['fields']
+        fields = list(np.array(fields)[ii])
 
         sql = """
-            SELECT COUNT(g_size) as group_count, COUNT( DISTINCT :key) as patient_count,SUM(g_size) as rec_count, COUNT(g_size)/SUM(g_size) as marketer, 1/ MIN(g_size) as prosecutor, :n as field_count
+            SELECT COUNT(g_size) as group_count,SUM(g_size) as patient_count, COUNT(g_size)/SUM(g_size) as marketer, 1/ MIN(g_size) as prosecutor, :n as field_count
             FROM (
-                SELECT COUNT(*) as g_size,:key,:fields
+                SELECT COUNT(*) as g_size,:fields
                 FROM :full_name
-                GROUP BY :key,:fields
+                GROUP BY :fields
             )
         """.replace(":fields", ",".join(fields)).replace(":full_name",table['full_name']).replace(":key",key).replace(":n",str(n))
         return {"sql":sql,"stream":stream}
@@ -161,7 +188,7 @@ class risk :
         
 
 
-if 'action' in SYS_ARGS and  SYS_ARGS['action'] in ['create','compute'] :
+if 'action' in SYS_ARGS and  SYS_ARGS['action'] in ['create','compute','migrate'] :
 
     path = SYS_ARGS['path']
     client = bq.Client.from_service_account_json(path)
@@ -195,26 +222,49 @@ if 'action' in SYS_ARGS and  SYS_ARGS['action'] in ['create','compute'] :
             r = client.query(create_sql,location='US',job_config=job) 
             
             print [r.job_id,' ** ',r.state]
+    elif SYS_ARGS['action'] == 'migrate' :
+        #
+        #
+
+        o_dataset = SYS_ARGS['o_dataset']
+        for table in tables:
+            sql = " ".join(["SELECT ",",".join(table['fields']) ," FROM (",mytools.get_filtered_table(table,key),") as ",table['name']])
+            
+            job = bq.QueryJobConfig()
+            job.destination = client.dataset(o_dataset).table(table['name'])
+            job.use_query_cache = True
+            job.allow_large_results = True 
+            job.priority = 'INTERACTIVE'
+            job.time_partitioning = bq.table.TimePartitioning(type_=bq.table.TimePartitioningType.DAY)
+
+            r = client.query(sql,location='US',job_config=job) 
+            
+            print [table['full_name'],' ** ',r.job_id,' ** ',r.state]
+
+
+        pass
     else:
         #
         #
-        tables = [tab for tab in tables if tab['name'] == SYS_ARGS['table'] ]  
-	limit = int(SYS_ARGS['limit']) if 'limit' in SYS_ARGS else 1
+        tables  = [tab for tab in tables if tab['name'] == SYS_ARGS['table'] ]  
+        limit   = int(SYS_ARGS['limit']) if 'limit' in SYS_ARGS else 1
         if tables :            
-            risk = risk()
-            df = pd.DataFrame()
-	    dfs = pd.DataFrame()
+            risk= risk()
+            df  = pd.DataFrame()
+            dfs = pd.DataFrame()
+            np.random.seed(1)
             for i in range(0,limit) :
                 r = risk.get_sql(key=SYS_ARGS['key'],table=tables[0])
-		sql = r['sql']
-		dfs = dfs.append(r['stream'])
-                df = df.append(pd.read_gbq(query=sql,private_key=path,dialect='standard'))
+                sql = r['sql']
+                dfs = dfs.append(r['stream'],sort=True)
+                df = df.append(pd.read_gbq(query=sql,private_key=path,dialect='standard').join(dfs))
+                # df = df.join(dfs,sort=True)
                 df.to_csv(SYS_ARGS['table']+'.csv')
-		dfs.to_csv(SYS_ARGS['table']+'_stream.csv') 
+                # dfs.to_csv(SYS_ARGS['table']+'_stream.csv') 
                 print [i,' ** ',df.shape[0],pd.DataFrame(r['stream']).shape]
                 time.sleep(2)
                 
-    pass
+    
 else:
     print 'ERROR'
     pass