|  Steve Nyemba
				
				55317e5257
				bug fix | 2 yıl önce | |
|---|---|---|
| bin | 2 yıl önce | |
| transport | 2 yıl önce | |
| .gitignore | 6 yıl önce | |
| README.md | 2 yıl önce | |
| requirements.txt | 8 yıl önce | |
| setup.py | 2 yıl önce | |
| version.py | 2 yıl önce | 
This project implements an abstraction of objects that can have access to a variety of data stores, implementing read/write with a simple and expressive interface. This abstraction works with NoSQL and SQL data stores and leverages pandas.
The supported data store providers :
| Provider | Underlying Drivers | Description | 
|---|---|---|
| sqlite | Native SQLite | SQLite3 | 
| postgresql | psycopg2 | PostgreSQL | 
| redshift | psycopg2 | Amazon Redshift | 
| s3 | boto3 | Amazon Simple Storage Service | 
| netezza | nzpsql | IBM Neteeza | 
| Files: CSV, TSV | pandas | pandas data-frame | 
| Couchdb | cloudant | Couchbase/Couchdb | 
| mongodb | pymongo | Mongodb | 
| mysql | mysql | Mysql | 
| bigquery | google-bigquery | Google BigQuery | 
| mariadb | mysql | Mariadb | 
| rabbitmq | pika | RabbitMQ Publish/Subscribe | 
Mostly data scientists that don't really care about the underlying database and would like to manipulate data transparently.
Within the virtual environment perform the following :
pip install git+https://github.com/lnyemba/data-transport.git
Once installed data-transport can be used as a library in code or a command line interface (CLI), as a CLI it is used for ETL and requires a configuration file.
The data-transport can be used within code as a library, and offers the following capabilities:
The read/write functions make data-transport a great candidate for data-science; data-engineering or all things pertaining to data. It enables operations across multiple data-stores(relational or not)
Embedded in Code
It is possible to perform ETL within custom code as follows :
    import transport
    import time
    
    _info = [{source:{'provider':'sqlite','path':'/home/me/foo.csv','table':'me',"pipeline":{"pre":[],"post":[]}},target:{provider:'bigquery',private_key='/home/me/key.json','table':'me','dataset':'mydataset'}}, ...]    
    procs = transport.factory.instance(provider='etl',info=_info)
    #
    #
    while procs:
        procs = [pthread for pthread in procs if pthread.is_alive()]
        time.sleep(1)
The CLI program is called transport and it requires a configuration file. The program is intended to move data from one location to another. Supported data stores are in the above paragraphs.
[
    {
    "id":"logs",
    "source":{
        "provider":"postgresql","context":"read","database":"mydb",
        "cmd":{"sql":"SELECT * FROM logs limit 10"}
        },
    "target":{
        "provider":"bigquery","private_key":"/bgqdrive/account/bq-service-account-key.json",
        "dataset":"mydataset"
        }    
    },
    
]
Assuming the above content is stored in a file called etl-config.json, we would perform the following in a terminal window:
[steve@data-transport]$ transport --config ./etl-config.json [--index <value>]
Reading/Writing Mongodb
For this example we assume here we are tunneling through port 27018 and there is not access control:
import transport
reader = factory.instance(provider='mongodb',context='read',host='localhost',port='27018',db='example',doc='logs')
df = reader.read() #-- reads the entire collection
print (df.head())
#
#-- Applying mongodb command
PIPELINE = [{"$group":{"_id":None,"count":{"$sum":1}}}]
_command_={"cursor":{},"allowDiskUse":True,"aggregate":"logs","pipeline":PIPLINE}
df = reader.read(mongo=_command)
print (df.head())
reader.close()
Scenario 1: Mongodb with security in place
Define an authentication file on disk
The semantics of the attributes are provided by mongodb, please visit mongodb documentation. In this example the file is located on /transport/mongo.json
{
    "username":"me","password":"changeme",
    "mechanism":"SCRAM-SHA-1",
    "authSource":"admin"
}
Connecting to Mongodb
import transport
PIPELINE = ... #-- do this yourself
MONGO_KEY = '/transport/mongo.json'
mreader = transport.factory.instance(provider=transport.providers.MONGODB,auth_file=MONGO_KEY,context='read',db='mydb',doc='logs')
_aggregateDF = mreader.read(mongo=PIPELINE) #--results of a aggregate pipeline
_collectionDF= mreader.read()
In order to enable write, change context attribute to 'read'.
| parameters | description | 
|---|---|
| db | Name of the database | 
| port | Port number to connect to | 
| doc | Name of the collection of documents | 
| username | Username | 
| password | password | 
| authSource | user database that has authentication info | 
| mechanism | Mechnism used for authentication | 
NOTE
Arguments like db or doc can be placed in the authentication file
Limitations
Reads and writes aren't encapsulated in the same object, this is to allow the calling code to deliberately perform actions and hopefully minimize accidents associated with data wrangling.
import transport
improt pandas as pd
writer = factory.instance(provider=transport.providers.MONGODB,context='write',host='localhost',port='27018',db='example',doc='logs')
df = pd.DataFrame({"names":["steve","nico"],"age":[40,30]})
writer.write(df)
writer.close()
#
# reading from postgresql
pgreader     = factory.instance(type='postgresql',database=<database>,table=<table_name>)
pg.read()   #-- will read the table by executing a SELECT
pg.read(sql=<sql query>)
#
# Reading a document and executing a view
#
document    = dreader.read()    
result      = couchdb.view(id='<design_doc_id>',view_name=<view_name',<key=value|keys=values>)