浏览代码

bug fix and enhancements

steve 6 年之前
父节点
当前提交
6ff91fd031
共有 1 个文件被更改,包括 17 次插入8 次删除
  1. 17 8
      src/risk.py

+ 17 - 8
src/risk.py

@@ -139,19 +139,23 @@ class risk :
         fields  = list(set(table['fields']) - set([key]))
         #-- We need to select n-fields max 64
         k = len(fields)        
-        n = np.random.randint(2,24)  #-- how many random fields are we processing
+        n = np.random.randint(2,64)  #-- how many random fields are we processing
         ii = np.random.choice(k,n,replace=False)
-        fields = list(np.array(fields)[ii])
+        stream = np.zeros(len(fields) + 1) 
+	stream[ii] = 1
+	stream = pd.DataFrame(stream.tolist()).T
+	stream.columns = args['table']['fields']
+	fields = list(np.array(fields)[ii])
 
         sql = """
-            SELECT COUNT(g_size) as group_count, SUM(g_size) as patient_count, COUNT(g_size)/SUM(g_size) as marketer, 1/ MIN(g_size) as prosecutor
+            SELECT COUNT(g_size) as group_count, COUNT( DISTINCT :key) as patient_count,SUM(g_size) as rec_count, COUNT(g_size)/SUM(g_size) as marketer, 1/ MIN(g_size) as prosecutor, :n as field_count
             FROM (
                 SELECT COUNT(*) as g_size,:key,:fields
                 FROM :full_name
                 GROUP BY :key,:fields
             )
         """.replace(":fields", ",".join(fields)).replace(":full_name",table['full_name']).replace(":key",key).replace(":n",str(n))
-        return sql
+        return {"sql":sql,"stream":stream}
     
 
         
@@ -195,14 +199,19 @@ if 'action' in SYS_ARGS and  SYS_ARGS['action'] in ['create','compute'] :
         #
         #
         tables = [tab for tab in tables if tab['name'] == SYS_ARGS['table'] ]  
+	limit = int(SYS_ARGS['limit']) if 'limit' in SYS_ARGS else 1
         if tables :            
             risk = risk()
             df = pd.DataFrame()
-            for i in range(0,10) :
-                sql = risk.get_sql(key=SYS_ARGS['key'],table=tables[0])
+	    dfs = pd.DataFrame()
+            for i in range(0,limit) :
+                r = risk.get_sql(key=SYS_ARGS['key'],table=tables[0])
+		sql = r['sql']
+		dfs = dfs.append(r['stream'])
                 df = df.append(pd.read_gbq(query=sql,private_key=path,dialect='standard'))
                 df.to_csv(SYS_ARGS['table']+'.csv')
-                print [i,' ** ',df.shape[0]]
+		dfs.to_csv(SYS_ARGS['table']+'_stream.csv') 
+                print [i,' ** ',df.shape[0],pd.DataFrame(r['stream']).shape]
                 time.sleep(2)
                 
     pass
@@ -223,4 +232,4 @@ else:
 # p = r.compute()
 # print p
 # p.to_csv("risk.csv")
-# r.write('foo.sql')
+# r.write('foo.sql')