transport.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758
  1. """
  2. This file implements data transport stuctures in order to allow data to be moved to and from anywhere
  3. We can thus read data from disk and write to the cloud,queue, or couchdb or SQL
  4. """
  5. from flask import request, session
  6. import os
  7. import pika
  8. import json
  9. import numpy as np
  10. from couchdbkit import Server
  11. import re
  12. from csv import reader
  13. from datetime import datetime
  14. import boto
  15. import botocore
  16. from smart_open import smart_open
  17. """
  18. @TODO: Write a process by which the class automatically handles reading and creating a preliminary sample and discovers the meta data
  19. """
  20. class Reader:
  21. def __init__(self):
  22. self.nrows = 0
  23. self.xchar = None
  24. def row_count(self):
  25. content = self.read()
  26. return np.sum([1 for row in content])
  27. """
  28. This function determines the most common delimiter from a subset of possible delimiters. It uses a statistical approach to guage the distribution of columns for a given delimiter
  29. """
  30. def delimiter(self,sample):
  31. m = {',':[],'\t':[],'|':[],'\x3A':[]}
  32. delim = m.keys()
  33. for row in sample:
  34. for xchar in delim:
  35. if row.split(xchar) > 1:
  36. m[xchar].append(len(row.split(xchar)))
  37. else:
  38. m[xchar].append(0)
  39. #
  40. # The delimiter with the smallest variance, provided the mean is greater than 1
  41. # This would be troublesome if there many broken records sampled
  42. #
  43. m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1}
  44. index = m.values().index( min(m.values()))
  45. xchar = m.keys()[index]
  46. return xchar
  47. """
  48. This function determines the number of columns of a given sample
  49. @pre self.xchar is not None
  50. """
  51. def col_count(self,sample):
  52. m = {}
  53. i = 0
  54. for row in sample:
  55. row = self.format(row)
  56. id = str(len(row))
  57. #id = str(len(row.split(self.xchar)))
  58. if id not in m:
  59. m[id] = 0
  60. m[id] = m[id] + 1
  61. index = m.values().index( max(m.values()) )
  62. ncols = int(m.keys()[index])
  63. return ncols;
  64. """
  65. This function will clean records of a given row by removing non-ascii characters
  66. @pre self.xchar is not None
  67. """
  68. def format (self,row):
  69. if isinstance(row,list) == False:
  70. #
  71. # We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary)
  72. cols = self.split(row)
  73. #cols = row.split(self.xchar)
  74. else:
  75. cols = row ;
  76. return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols]
  77. #if isinstance(row,list) == False:
  78. # return (self.xchar.join(r)).format('utf-8')
  79. #else:
  80. # return r
  81. """
  82. This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes.
  83. @pre : self.xchar is not None
  84. """
  85. def split (self,row):
  86. pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"])
  87. return re.findall(pattern,row.replace('\n',''))
  88. class Writer:
  89. def format(self,row,xchar):
  90. if xchar is not None and isinstance(row,list):
  91. return xchar.join(row)+'\n'
  92. elif xchar is None and isinstance(row,dict):
  93. row = json.dumps(row)
  94. return row
  95. """
  96. It is important to be able to archive data so as to insure that growth is controlled
  97. Nothing in nature grows indefinitely neither should data being handled.
  98. """
  99. def archive(self):
  100. pass
  101. def flush(self):
  102. pass
  103. """
  104. This class is designed to read data from an Http request file handler provided to us by flask
  105. The file will be heald in memory and processed accordingly
  106. NOTE: This is inefficient and can crash a micro-instance (becareful)
  107. """
  108. class HttpRequestReader(Reader):
  109. def __init__(self,**params):
  110. self.file_length = 0
  111. try:
  112. #self.file = params['file']
  113. #self.file.seek(0, os.SEEK_END)
  114. #self.file_length = self.file.tell()
  115. #print 'size of file ',self.file_length
  116. self.content = params['file'].readlines()
  117. self.file_length = len(self.content)
  118. except Exception, e:
  119. print "Error ... ",e
  120. pass
  121. def isready(self):
  122. return self.file_length > 0
  123. def read(self,size =-1):
  124. i = 1
  125. for row in self.content:
  126. i += 1
  127. if size == i:
  128. break
  129. yield row
  130. """
  131. This class is designed to write data to a session/cookie
  132. """
  133. class HttpSessionWriter(Writer):
  134. """
  135. @param key required session key
  136. """
  137. def __init__(self,**params):
  138. self.session = params['queue']
  139. self.session['sql'] = []
  140. self.session['csv'] = []
  141. self.tablename = re.sub('..+$','',params['filename'])
  142. self.session['uid'] = params['uid']
  143. #self.xchar = params['xchar']
  144. def format_sql(self,row):
  145. values = "','".join([col.replace('"','').replace("'",'') for col in row])
  146. return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename)
  147. def isready(self):
  148. return True
  149. def write(self,**params):
  150. label = params['label']
  151. row = params ['row']
  152. if label == 'usable':
  153. self.session['csv'].append(self.format(row,','))
  154. self.session['sql'].append(self.format_sql(row))
  155. """
  156. This class is designed to read data from disk (location on hard drive)
  157. @pre : isready() == True
  158. """
  159. class DiskReader(Reader) :
  160. """
  161. @param path absolute path of the file to be read
  162. """
  163. def __init__(self,**params):
  164. Reader.__init__(self)
  165. self.path = params['path'] ;
  166. def isready(self):
  167. return os.path.exists(self.path)
  168. """
  169. This function reads the rows from a designated location on disk
  170. @param size number of rows to be read, -1 suggests all rows
  171. """
  172. def read(self,size=-1):
  173. f = open(self.path,'rU')
  174. i = 1
  175. for row in f:
  176. i += 1
  177. if size == i:
  178. break
  179. yield row
  180. f.close()
  181. """
  182. This function writes output to disk in a designated location
  183. """
  184. class DiskWriter(Writer):
  185. def __init__(self,**params):
  186. if 'path' in params:
  187. self.path = params['path']
  188. else:
  189. self.path = None
  190. if 'name' in params:
  191. self.name = params['name'];
  192. else:
  193. self.name = None
  194. if os.path.exists(self.path) == False:
  195. os.mkdir(self.path)
  196. """
  197. This function determines if the class is ready for execution or not
  198. i.e it determines if the preconditions of met prior execution
  199. """
  200. def isready(self):
  201. p = self.path is not None and os.path.exists(self.path)
  202. q = self.name is not None
  203. return p and q
  204. """
  205. This function writes a record to a designated file
  206. @param label <passed|broken|fixed|stats>
  207. @param row row to be written
  208. """
  209. def write(self,**params):
  210. label = params['label']
  211. row = params['row']
  212. xchar = None
  213. if 'xchar' is not None:
  214. xchar = params['xchar']
  215. path = ''.join([self.path,os.sep,label])
  216. if os.path.exists(path) == False:
  217. os.mkdir(path) ;
  218. path = ''.join([path,os.sep,self.name])
  219. f = open(path,'a')
  220. row = self.format(row,xchar);
  221. f.write(row)
  222. f.close()
  223. """
  224. This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)
  225. """
  226. class MessageQueue:
  227. def __init__(self,**params):
  228. self.host= params['host']
  229. self.uid = params['uid']
  230. self.qid = params['qid']
  231. def isready(self):
  232. #self.init()
  233. resp = self.connection is not None and self.connection.is_open
  234. self.close()
  235. return resp
  236. def close(self):
  237. if self.connection.is_closed == False :
  238. self.channel.close()
  239. self.connection.close()
  240. """
  241. This class is designed to publish content to an AMQP (Rabbitmq)
  242. The class will rely on pika to implement this functionality
  243. We will publish information to a given queue for a given exchange
  244. """
  245. class QueueWriter(MessageQueue,Writer):
  246. def __init__(self,**params):
  247. #self.host= params['host']
  248. #self.uid = params['uid']
  249. #self.qid = params['queue']
  250. MessageQueue.__init__(self,**params);
  251. def init(self,label=None):
  252. properties = pika.ConnectionParameters(host=self.host)
  253. self.connection = pika.BlockingConnection(properties)
  254. self.channel = self.connection.channel()
  255. self.info = self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True)
  256. if label is None:
  257. self.qhandler = self.channel.queue_declare(queue=self.qid,durable=True)
  258. else:
  259. self.qhandler = self.channel.queue_declare(queue=label,durable=True)
  260. self.channel.queue_bind(exchange=self.uid,queue=self.qhandler.method.queue)
  261. """
  262. This function writes a stream of data to the a given queue
  263. @param object object to be written (will be converted to JSON)
  264. @TODO: make this less chatty
  265. """
  266. def write(self,**params):
  267. xchar = None
  268. if 'xchar' in params:
  269. xchar = params['xchar']
  270. object = self.format(params['row'],xchar)
  271. label = params['label']
  272. self.init(label)
  273. _mode = 2
  274. if isinstance(object,str):
  275. stream = object
  276. _type = 'text/plain'
  277. else:
  278. stream = json.dumps(object)
  279. if 'type' in params :
  280. _type = params['type']
  281. else:
  282. _type = 'application/json'
  283. self.channel.basic_publish(
  284. exchange=self.uid,
  285. routing_key=label,
  286. body=stream,
  287. properties=pika.BasicProperties(content_type=_type,delivery_mode=_mode)
  288. );
  289. self.close()
  290. def flush(self,label):
  291. self.init(label)
  292. _mode = 1 #-- Non persistent
  293. self.channel.queue_delete( queue=label);
  294. self.close()
  295. """
  296. This class will read from a queue provided an exchange, queue and host
  297. @TODO: Account for security and virtualhosts
  298. """
  299. class QueueReader(MessageQueue,Reader):
  300. """
  301. @param host host
  302. @param uid exchange identifier
  303. @param qid queue identifier
  304. """
  305. def __init__(self,**params):
  306. #self.host= params['host']
  307. #self.uid = params['uid']
  308. #self.qid = params['qid']
  309. MessageQueue.__init__(self,**params);
  310. if 'durable' in params :
  311. self.durable = True
  312. else:
  313. self.durable = False
  314. self.size = -1
  315. self.data = {}
  316. def init(self,qid):
  317. properties = pika.ConnectionParameters(host=self.host)
  318. self.connection = pika.BlockingConnection(properties)
  319. self.channel = self.connection.channel()
  320. self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True)
  321. self.info = self.channel.queue_declare(queue=qid,durable=True)
  322. """
  323. This is the callback function designed to process the data stream from the queue
  324. """
  325. def callback(self,channel,method,header,stream):
  326. r = []
  327. if re.match("^\{|\[",stream) is not None:
  328. r = json.loads(stream)
  329. else:
  330. r = stream
  331. qid = self.info.method.queue
  332. if qid not in self.data :
  333. self.data[qid] = []
  334. self.data[qid].append(r)
  335. #
  336. # We stop reading when the all the messages of the queue are staked
  337. #
  338. if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count:
  339. self.close()
  340. """
  341. This function will read, the first message from a queue
  342. @TODO:
  343. Implement channel.basic_get in order to retrieve a single message at a time
  344. Have the number of messages retrieved be specified by size (parameter)
  345. """
  346. def read(self,size=-1):
  347. r = {}
  348. self.size = size
  349. #
  350. # We enabled the reader to be able to read from several queues (sequentially for now)
  351. # The qid parameter will be an array of queues the reader will be reading from
  352. #
  353. if isinstance(self.qid,basestring) :
  354. self.qid = [self.qid]
  355. for qid in self.qid:
  356. self.init(qid)
  357. # r[qid] = []
  358. if self.info.method.message_count > 0:
  359. self.channel.basic_consume(self.callback,queue=qid,no_ack=False);
  360. self.channel.start_consuming()
  361. else:
  362. pass
  363. #self.close()
  364. # r[qid].append( self.data)
  365. return self.data
  366. class QueueListener(QueueReader):
  367. def init(self,qid):
  368. properties = pika.ConnectionParameters(host=self.host)
  369. self.connection = pika.BlockingConnection(properties)
  370. self.channel = self.connection.channel()
  371. self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True )
  372. self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid)
  373. self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid)
  374. #self.callback = callback
  375. def read(self):
  376. self.init(self.qid)
  377. self.channel.basic_consume(self.callback,queue=self.qid,no_ack=True);
  378. self.channel.start_consuming()
  379. """
  380. This class is designed to write output as sql insert statements
  381. The class will inherit from DiskWriter with minor adjustments
  382. @TODO: Include script to create the table if need be using the upper bound of a learner
  383. """
  384. class SQLDiskWriter(DiskWriter):
  385. def __init__(self,**args):
  386. DiskWriter.__init__(self,**args)
  387. self.tablename = re.sub('\..+$','',self.name).replace(' ','_')
  388. """
  389. @param label
  390. @param row
  391. @param xchar
  392. """
  393. def write(self,**args):
  394. label = args['label']
  395. row = args['row']
  396. if label == 'usable':
  397. values = "','".join([col.replace('"','').replace("'",'') for col in row])
  398. row = "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename)
  399. args['row'] = row
  400. DiskWriter.write(self,**args)
  401. class Couchdb:
  402. """
  403. @param uri host & port reference
  404. @param uid user id involved
  405. @param dbname database name (target)
  406. """
  407. def __init__(self,**args):
  408. uri = args['uri']
  409. self.uid = args['uid']
  410. dbname = args['dbname']
  411. self.server = Server(uri=uri)
  412. self.dbase = self.server.get_db(dbname)
  413. if self.dbase.doc_exist(self.uid) == False:
  414. self.dbase.save_doc({"_id":self.uid})
  415. """
  416. Insuring the preconditions are met for processing
  417. """
  418. def isready(self):
  419. p = self.server.info() != {}
  420. if p == False or self.dbase.dbname not in self.server.all_dbs():
  421. return False
  422. #
  423. # At this point we are sure that the server is connected
  424. # We are also sure that the database actually exists
  425. #
  426. q = self.dbase.doc_exist(self.uid)
  427. if q == False:
  428. return False
  429. return True
  430. def view(self,id,**args):
  431. r =self.dbase.view(id,**args)
  432. r = r.all()
  433. return r[0]['value'] if len(r) > 0 else []
  434. """
  435. This function will read an attachment from couchdb and return it to calling code. The attachment must have been placed before hand (otherwise oops)
  436. @T: Account for security & access control
  437. """
  438. class CouchdbReader(Couchdb,Reader):
  439. """
  440. @param filename filename (attachment)
  441. """
  442. def __init__(self,**args):
  443. #
  444. # setting the basic parameters for
  445. Couchdb.__init__(self,**args)
  446. if 'filename' in args :
  447. self.filename = args['filename']
  448. else:
  449. self.filename = None
  450. def isready(self):
  451. #
  452. # Is the basic information about the database valid
  453. #
  454. p = Couchdb.isready(self)
  455. if p == False:
  456. return False
  457. #
  458. # The database name is set and correct at this point
  459. # We insure the document of the given user has the requested attachment.
  460. #
  461. doc = self.dbase.get(self.uid)
  462. if '_attachments' in doc:
  463. r = self.filename in doc['_attachments'].keys()
  464. else:
  465. r = False
  466. return r
  467. def stream(self):
  468. content = self.dbase.fetch_attachment(self.uid,self.filename).split('\n') ;
  469. i = 1
  470. for row in content:
  471. yield row
  472. if size > 0 and i == size:
  473. break
  474. i = i + 1
  475. def read(self,size=-1):
  476. if self.filename is not None:
  477. self.stream()
  478. else:
  479. return self.basic_read()
  480. def basic_read(self):
  481. document = self.dbase.get(self.uid)
  482. del document['_id'], document['_rev']
  483. return document
  484. """
  485. This class will write on a couchdb document provided a scope
  486. The scope is the attribute that will be on the couchdb document
  487. """
  488. class CouchdbWriter(Couchdb,Writer):
  489. """
  490. @param uri host & port reference
  491. @param uid user id involved
  492. @param filename filename (attachment)
  493. @param dbname database name (target)
  494. """
  495. def __init__(self,**args):
  496. Couchdb.__init__(self,**args)
  497. uri = args['uri']
  498. self.uid = args['uid']
  499. if 'filename' in args:
  500. self.filename = args['filename']
  501. else:
  502. self.filename = None
  503. dbname = args['dbname']
  504. self.server = Server(uri=uri)
  505. self.dbase = self.server.get_db(dbname)
  506. #
  507. # If the document doesn't exist then we should create it
  508. #
  509. """
  510. write a given attribute to a document database
  511. @param label scope of the row repair|broken|fixed|stats
  512. @param row row to be written
  513. """
  514. def write(self,**params):
  515. document = self.dbase.get(self.uid)
  516. label = params['label']
  517. row = params['row']
  518. if label not in document :
  519. document[label] = []
  520. document[label].append(row)
  521. self.dbase.save_doc(document)
  522. def flush(self,**params) :
  523. size = params['size'] if 'size' in params else 0
  524. has_changed = False
  525. document = self.dbase.get(self.uid)
  526. for key in document:
  527. if key not in ['_id','_rev','_attachments'] :
  528. content = document[key]
  529. else:
  530. continue
  531. if isinstance(content,list) and size > 0:
  532. index = len(content) - size
  533. content = content[index:]
  534. document[key] = content
  535. else:
  536. document[key] = {}
  537. has_changed = True
  538. self.dbase.save_doc(document)
  539. def archive(self,params=None):
  540. document = self.dbase.get(self.uid)
  541. content = {}
  542. _doc = {}
  543. for id in document:
  544. if id in ['_id','_rev','_attachments'] :
  545. _doc[id] = document[id]
  546. else:
  547. content[id] = document[id]
  548. content = json.dumps(content)
  549. document= _doc
  550. now = str(datetime.today())
  551. name = '-'.join([document['_id'] , now,'.json'])
  552. self.dbase.save_doc(document)
  553. self.dbase.put_attachment(document,content,name,'application/json')
  554. class s3 :
  555. """
  556. @TODO: Implement a search function for a file given a bucket??
  557. """
  558. def __init__(self,args) :
  559. """
  560. This function will extract a file or set of files from s3 bucket provided
  561. @param access_key
  562. @param secret_key
  563. @param path location of the file
  564. @param filter filename or filtering elements
  565. """
  566. try:
  567. self.s3 = boto.connect_s3(args['access_key'],args['secret_key'])
  568. self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None
  569. # self.path = args['path']
  570. self.filter = args['filter'] if 'filter' in args else None
  571. self.filename = args['file'] if 'file' in args else None
  572. except Exception as e :
  573. self.s3 = None
  574. self.bucket = None
  575. print e
  576. def buckets(self):
  577. """
  578. This function is a wrapper around the bucket list of buckets for s3
  579. """
  580. return self.s3.get_all_buckets()
  581. class s3Reader(s3,Reader) :
  582. """
  583. Because s3 contains buckets and files, reading becomes a tricky proposition :
  584. - list files if file is None
  585. - stream content if file is Not None
  586. @TODO: support read from all buckets, think about it
  587. """
  588. def __init__(self,args) :
  589. s3.__init__(self,args)
  590. def files(self):
  591. r = []
  592. try:
  593. return [item.name for item in self.bucket if item.size > 0]
  594. except Exception as e:
  595. pass
  596. return r
  597. def stream(self,limit=-1):
  598. """
  599. At this point we should stream a file from a given bucket
  600. """
  601. key = self.bucket.get_key(self.filename.strip())
  602. if key is None :
  603. yield None
  604. else:
  605. count = 0
  606. with smart_open(key) as remote_file:
  607. for line in remote_file:
  608. if count == limit and limit > 0 :
  609. break
  610. yield line
  611. count += 1
  612. def read(self,limit=-1) :
  613. if self.filename is None :
  614. #
  615. # returning the list of files because no one file was specified.
  616. return self.files()
  617. else:
  618. return self.stream(10)
  619. """
  620. This class acts as a factory to be able to generate an instance of a Reader/Writer
  621. Against a Queue,Disk,Cloud,Couchdb
  622. The class doesn't enforce parameter validation, thus any error with the parameters sent will result in a null Object
  623. """
  624. class Factory:
  625. def instance(self,**args):
  626. source = args['type']
  627. params = args['args']
  628. anObject = None
  629. if source in ['HttpRequestReader','HttpSessionWriter']:
  630. #
  631. # @TODO: Make sure objects are serializable, be smart about them !!
  632. #
  633. aClassName = ''.join([source,'(**params)'])
  634. else:
  635. stream = json.dumps(params)
  636. aClassName = ''.join([source,'(**',stream,')'])
  637. try:
  638. anObject = eval( aClassName)
  639. #setattr(anObject,'name',source)
  640. except Exception,e:
  641. print ['Error ',e]
  642. return anObject
  643. class s3Writer(s3,Writer) :
  644. def __init__(self,args) :
  645. s3.__init__(self,args)
  646. """
  647. This class implements a data-source handler that is intended to be used within the context of data processing, it allows to read/write anywhere transparently.
  648. The class is a facade to a heterogeneous class hierarchy and thus simplifies how the calling code interacts with the class hierarchy
  649. """
  650. class DataSource:
  651. def __init__(self,sourceType='Disk',outputType='Disk',params={}):
  652. self.Input = DataSourceFactory.instance(type=sourceType,args=params)
  653. self.Output= DataSourceFactory.instance(type=outputType,args=params)
  654. def read(self,size=-1):
  655. return self.Input.read(size)
  656. def write(self,**args):
  657. self.Output.write(**args)
  658. conf = json.loads(open('config.json').read())
  659. #x = s3Reader( dict(conf,**{'bucket':'com.phi.sample.data','file':'Sample-Spreadsheet-5000-rows.csv'}))
  660. x = s3Reader(conf)
  661. print conf
  662. print x.bucket.get_all_keys()
  663. # r = x.read()
  664. # for item in r :
  665. # print item
  666. #print buckets[1].get_key('Sample-Spreadsheet-5000-rows.csv')