Browse Source

bug fix: ETL logging and rabbitmq-server listener

Steve Nyemba 3 years ago
parent
commit
8cd34d902a
5 changed files with 74 additions and 67 deletions
  1. 6 4
      transport/__init__.py
  2. 4 4
      transport/common.py
  3. 48 44
      transport/etl.py
  4. 11 12
      transport/rabbitmq.py
  5. 5 3
      transport/sql.py

+ 6 - 4
transport/__init__.py

@@ -68,11 +68,13 @@ class factory :
         "mariadb":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"},"driver":my},
 		"mongo":{"port":27017,"host":"localhost","class":{"read":mongo.MongoReader,"write":mongo.MongoWriter}},		
 		"couch":{"port":5984,"host":"localhost","class":{"read":couch.CouchReader,"write":couch.CouchWriter}},		
-        "netezza":{"port":5480,"driver":nz,"default":{"type":"VARCHAR(256)"}}}
+        "netezza":{"port":5480,"driver":nz,"default":{"type":"VARCHAR(256)"}},
+		"rabbitmq":{"port":5672,"host":"localhost","class":{"read":queue.QueueReader,"write":queue.QueueWriter,"listen":queue.QueueListener},"default":{"type":"application/json"}}}
 	#
 	# creating synonyms
 	PROVIDERS['mongodb'] = PROVIDERS['mongo']
 	PROVIDERS['couchdb'] = PROVIDERS['couch']
+	PROVIDERS['bq'] 	 = PROVIDERS['bigquery']
 	PROVIDERS['sqlite3'] = PROVIDERS['sqlite']
 	
 	@staticmethod
@@ -124,7 +126,7 @@ def instance(**_args):
 	
 	provider = _args['provider']
 	context = _args['context']if 'context' in _args else None
-	_id = context if context in ['read','write'] else 'read'
+	_id = context if context in list(factory.PROVIDERS[provider]['class'].keys()) else 'read'
 	if _id :
 		args = {'provider':_id}
 		for key in factory.PROVIDERS[provider] :
@@ -147,7 +149,7 @@ def instance(**_args):
 		try:
 			
 			host = ''
-			if provider not in ['bigquery','mongodb','couchdb','sqlite','console','etl','file'] :
+			if provider not in ['bigquery','mongodb','couchdb','sqlite','console','etl','file','rabbitmq'] :
 				#
 				# In these cases we are assuming RDBMS and thus would exclude NoSQL and BigQuery
 				username = args['username'] if 'username' in args else ''
@@ -165,7 +167,7 @@ def instance(**_args):
 				account = ''
 				host = ''
 				database = args['path'] if 'path' in args else args['database']
-			if provider not in ['mongodb','couchdb','bigquery','console','etl','file'] :
+			if provider not in ['mongodb','couchdb','bigquery','console','etl','file','rabbitmq'] :
 				uri = ''.join([provider,"://",account,host,'/',database])
 				
 				e = sqlalchemy.create_engine (uri,future=True)

+ 4 - 4
transport/common.py

@@ -98,15 +98,15 @@ class Console(Writer):
 		self.debug = self.write
 		self.log = self.write
 		pass
-	def write (self,info,**_args):
+	def write (self,**_args):
 		if self.lock :
 			Console.lock.acquire()
 		try:
-			if type(info) == list:
-				for row in info :
+			if type(_args) == list:
+				for row in _args :
 					print (row)
 			else:
-				print (info)
+				print (_args)
 		except Exception as e :
 			print (e)
 		finally:

+ 48 - 44
transport/etl.py

@@ -54,41 +54,46 @@ if len(sys.argv) > 1:
         i += 2
 
 class Post(Process):
-	def __init__(self,**args):
-		super().__init__()
-		
-		if 'provider' not in args['target'] :
-			self.PROVIDER = args['target']['type']
-			self.writer = 	transport.factory.instance(**args['target'])
-		else:
-			self.PROVIDER = args['target']['provider']
-			args['target']['context'] = 'write'
-			self.store = args['target']
-			# self.writer = transport.instance(**args['target'])
-		#
-		# If the table doesn't exists maybe create it ?
-		#
-		self.rows 	=	 args['rows'].fillna('')
+    def __init__(self,**args):
+        super().__init__()
+        
+        if 'provider' not in args['target'] :
+            self.PROVIDER = args['target']['type']
+            self.writer = 	transport.factory.instance(**args['target'])
+        else:
+            self.PROVIDER = args['target']['provider']
+            args['target']['context'] = 'write'
+            self.store = args['target']
+            self.store['lock'] = True
+            # self.writer = transport.instance(**args['target'])
+        #
+        # If the table doesn't exists maybe create it ?
+        #
+        self.rows 	=	 args['rows'].fillna('')
 		
+    def log(self,**_args) :
+        if ETL.logger :
+            ETL.logger.info(**_args)
 		
-	def run(self):
-		_info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows	
-		ltypes = self.rows.dtypes.values
-		columns = self.rows.dtypes.index.tolist()
-		# if not self.writer.has() :
+    def run(self):
+        _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows	
+        ltypes = self.rows.dtypes.values
+        columns = self.rows.dtypes.index.tolist()
+        # if not self.writer.has() :
 
-			
-			# self.writer.make(fields=columns)
-			# ETL.logger.info(module='write',action='make-table',input={"name":self.writer.table})
-		for name in columns :
-			if _info[name].dtype in ['int32','int64','int','float','float32','float64'] :
-				value = 0
-			else:
-				value = ''
-			_info[name] = _info[name].fillna(value)
-		writer = transport.factory.instance(**self.store)
-		writer.write(_info)
-		writer.close()
+            
+            # self.writer.make(fields=columns)
+            # ETL.logger.info(module='write',action='make-table',input={"name":self.writer.table})
+        self.log(module='write',action='make-table',input={"schema":columns})
+        for name in columns :
+            if _info[name].dtype in ['int32','int64','int','float','float32','float64'] :
+                value = 0
+            else:
+                value = ''
+            _info[name] = _info[name].fillna(value)
+        writer = transport.factory.instance(**self.store)
+        writer.write(_info)
+        writer.close()
 
 		
 class ETL (Process):
@@ -115,8 +120,9 @@ class ETL (Process):
         self.jobs = []
 		# self.logger = transport.factory.instance(**_args['logger'])
     def log(self,**_args) :
-        _args['name']  = self.name
-        print (_args)
+        if ETL.logger :
+            ETL.logger.info(**_args)
+        
     def run(self):
         if self.cmd :
             idf = self.reader.read(**self.cmd)
@@ -126,7 +132,7 @@ class ETL (Process):
         # idf = idf.replace({np.nan: None}, inplace = True)
 
         idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()]
-        # ETL.logger.info(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT)
+        self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT)
 
         #
         # writing the data to a designated data source 
@@ -134,7 +140,7 @@ class ETL (Process):
         try:
             
             
-            # ETL.logger.info(module='write',action='partitioning')
+            self.log(module='write',action='partitioning',jobs=self.JOB_COUNT)
             rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT)
             
             #
@@ -148,10 +154,10 @@ class ETL (Process):
                 proc = Post(target = self._oargs,rows = segment,name=str(i))
                 self.jobs.append(proc)
                 proc.start()
-
-                # ETL.logger.info(module='write',action='working',segment=str(id),table=self.name,rows=segment.shape[0])
-            # while poc :
-            # 	proc = [job for job in proc if job.is_alive()]
+                
+                self.log(module='write',action='working',segment=str(self.name),table=self.name,rows=segment.shape[0])
+            # while self.jobs :
+            # 	jobs = [job for job in proc if job.is_alive()]
             # 	time.sleep(1)
         except Exception as e:
             print (e)
@@ -166,9 +172,9 @@ def instance(**_args):
     """
     logger = _args['logger'] if 'logger' in _args else None
     _info = _args['info']
-    if logger :
+    if logger and type(logger) != str:
         ETL.logger = logger 
-    else:
+    elif logger == 'console':
         ETL.logger = transport.factory.instance(provider='console',lock=True)
     if type(_info) in [list,dict] :
         _config = _info if type(_info) != dict else [_info]
@@ -195,8 +201,6 @@ if __name__ == '__main__' :
             _config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}}
 
         _config['jobs']  = 3 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs'])
-        print (_config)
-        print ()
         etl = ETL (**_config)
         if index is None:	
             

+ 11 - 12
transport/rabbitmq.py

@@ -222,22 +222,21 @@ class QueueListener(MessageQueue):
 	def __init__(self,**args):
 		MessageQueue.__init__(self,**args)
 		self.listen = self.read
-	# def init(self,qid):
-	# 	properties = pika.ConnectionParameters(host=self.host)
-	# 	self.connection = pika.BlockingConnection(properties)
-	# 	self.channel	= self.connection.channel()
-	# 	self.channel.exchange_declare(exchange=self.exchange,type='direct',durable=True )
-
-	# 	self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid)
-		
-	# 	self.channel.queue_bind(exchange=self.exchange,queue=self.info.method.queue,routing_key=qid)
-		#self.callback = callback
+		self.apply = args['apply'] if 'apply' in args else print
 	
 	def finalize(self,channel,ExceptionReason):
 		pass
-	 
+	
 	def callback(self,channel,method,header,stream) :
-		raise Exception("....")
+		_info= {}
+		# if re.match("^\{|\[",stream) is not None:
+		
+		if stream.startswith(b"[") or stream.startswith(b"{"):
+			_info = json.loads(stream)
+		else:
+			
+			_info = stream
+		self.apply(_info)
 	def read(self):
     	
 		self.init(self.queue)

+ 5 - 3
transport/sql.py

@@ -312,9 +312,11 @@ class BigQuery:
         :param sql      sql query to be pulled,
         """
         table = _args['table'] 
-        
-        ref     = self.client.dataset(self.dataset).table(table)
-        return self.client.get_table(ref).schema
+        try:
+            ref     = self.client.dataset(self.dataset).table(table)
+            return self.client.get_table(ref).schema
+        except Exception as e:
+            return []
     def has(self,**_args):
         found = False
         try: