123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- """
- Data Transport - 1.0
- Steve L. Nyemba, The Phi Technology LLC
- This module is designed to serve as a wrapper to a set of supported data stores :
- - couchdb
- - mongodb
- - Files (character delimited)
- - Queues (Rabbmitmq)
- - Session (Flask)
- - s3
- The supported operations are read/write and providing meta data to the calling code
- Requirements :
- pymongo
- boto
- couldant
- @TODO:
- Enable read/writing to multiple reads/writes
- """
- __author__ = 'The Phi Technology'
- import numpy as np
- import json
- import importlib
- from multiprocessing import RLock
- import queue
- # import couch
- # import mongo
- class IO:
- def init(self,**args):
- """
- This function enables attributes to be changed at runtime. Only the attributes defined in the class can be changed
- Adding attributes will require sub-classing otherwise we may have an unpredictable class ...
- """
- allowed = list(vars(self).keys())
- for field in args :
- if field not in allowed :
- continue
- value = args[field]
- setattr(self,field,value)
- class Reader (IO):
- """
- This class is an abstraction of a read functionalities of a data store
- """
- def __init__(self):
- pass
- def meta(self,**_args):
- """
- This function is intended to return meta-data associated with what has just been read
- @return object of meta data information associated with the content of the store
- """
- raise Exception ("meta function needs to be implemented")
- def read(self,**args):
- """
- This function is intended to read the content of a store provided parameters to be used at the discretion of the subclass
- """
- raise Exception ("read function needs to be implemented")
- class Writer(IO):
- def __init__(self):
- self.cache = {"default":[]}
- def log(self,**args):
- self.cache[id] = args
- def meta (self,id="default",**args):
- raise Exception ("meta function needs to be implemented")
- def format(self,row,xchar):
- if xchar is not None and isinstance(row,list):
- return xchar.join(row)+'\n'
- elif xchar is None and isinstance(row,dict):
- row = json.dumps(row)
- return row
- def write(self,**args):
- """
- This function will write content to a store given parameters to be used at the discretion of the sub-class
- """
- raise Exception ("write function needs to be implemented")
- def archive(self):
- """
- It is important to be able to archive data so as to insure that growth is controlled
- Nothing in nature grows indefinitely neither should data being handled.
- """
- raise Exception ("archive function needs to be implemented")
- def close(self):
- """
- This function will close the persistent storage connection/handler
- """
- pass
- class ReadWriter(Reader,Writer) :
- """
- This class implements the read/write functions aggregated
- """
- pass
- class Console(Writer):
- lock = RLock()
- def __init__(self,**_args):
- self.lock = _args['lock'] if 'lock' in _args else False
- self.info = self.write
- self.debug = self.write
- self.log = self.write
- pass
- def write (self,logs=None,**_args):
- if self.lock :
- Console.lock.acquire()
- try:
- _params = _args if logs is None and _args else logs
- if type(_params) == list:
- for row in _params :
- print (row)
- else:
- print (_params)
- except Exception as e :
- print (e)
- finally:
- if self.lock :
- Console.lock.release()
- """
- @NOTE : Experimental !!
- """
- class Proxy :
- """
- This class will forward a call to a function that is provided by the user code
- """
- def __init__(self,**_args):
- self.callback = _args['callback']
- def read(self,**_args) :
- try:
- return self.callback(**_args)
- except Exception as e:
- return self.callback()
- pass
- def write(self,data,**_args):
- self.callback(data,**_args)
|