123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- """
- dependency:
- - spark and SPARK_HOME environment variable must be set
- NOTE:
- When using streaming option, insure that it is inline with default (1000 rows) or increase it in spark-defaults.conf
- """
- from pyspark.sql import SparkSession
- from pyspark import SparkContext
- from pyspark.sql.types import *
- from pyspark.sql.functions import col, to_date, to_timestamp
- import copy
- class Iceberg :
- def __init__(self,**_args):
- """
- providing catalog meta information (you must get this from apache iceberg)
- """
- #
- # Turning off logging (it's annoying & un-professional)
- #
- # _spconf = SparkContext()
- # _spconf.setLogLevel("ERROR")
- #
- # @TODO:
- # Make arrangements for additional configuration elements
- #
- self._session = SparkSession.builder.appName("data-transport").getOrCreate()
- self._session.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
- # self._session.sparkContext.setLogLevel("ERROR")
- self._catalog = self._session.catalog
- self._table = _args['table'] if 'table' in _args else None
-
- if 'catalog' in _args :
- #
- # Let us set the default catalog
- self._catalog.setCurrentCatalog(_args['catalog'])
-
- else:
- # No current catalog has been set ...
- pass
- if 'database' in _args :
- self._database = _args['database']
- self._catalog.setCurrentDatabase(self._database)
- else:
- #
- # Should we set the default as the first one if available ?
- #
- pass
- self._catalogName = self._catalog.currentCatalog()
- self._databaseName = self._catalog.currentDatabase()
- def meta (self,**_args) :
- """
- This function should return the schema of a table (only)
- """
- _schema = []
- try:
- _table = _args['table'] if 'table' in _args else self._table
- _tableName = self._getPrefix(**_args) + f".{_table}"
- _tmp = self._session.table(_tableName).schema
- _schema = _tmp.jsonValue()['fields']
- for _item in _schema :
- del _item['nullable'],_item['metadata']
- except Exception as e:
-
- pass
- return _schema
- def _getPrefix (self,**_args):
- _catName = self._catalogName if 'catalog' not in _args else _args['catalog']
- _datName = self._databaseName if 'database' not in _args else _args['database']
-
- return '.'.join([_catName,_datName])
- def apply(self,_query):
- """
- sql query/command to run against apache iceberg
- """
- return self._session.sql(_query).toPandas()
- def has (self,**_args):
- try:
- _prefix = self._getPrefix(**_args)
- if _prefix.endswith('.') :
- return False
- return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)]
- except Exception as e:
- print (e)
- return False
-
- def close(self):
- self._session.stop()
- class Reader(Iceberg) :
- def __init__(self,**_args):
- super().__init__(**_args)
- def read(self,**_args):
- _table = self._table
- _prefix = self._getPrefix(**_args)
- if 'table' in _args or _table:
- _table = _args['table'] if 'table' in _args else _table
- _table = _prefix + f'.{_table}'
- return self._session.table(_table).toPandas()
- else:
- sql = _args['sql']
- return self._session.sql(sql).toPandas()
- pass
- class Writer (Iceberg):
- """
- Writing data to an Apache Iceberg data warehouse (using pyspark)
- """
- def __init__(self,**_args):
- super().__init__(**_args)
- self._mode = 'append' if 'mode' not in _args else _args['mode']
- self._table = None if 'table' not in _args else _args['table']
- def format (self,_schema) :
- _iceSchema = StructType([])
- _map = {'integer':IntegerType(),'float':DoubleType(),'double':DoubleType(),'date':DateType(),
- 'timestamp':TimestampType(),'datetime':TimestampType(),'string':StringType(),'varchar':StringType()}
- for _item in _schema :
- _name = _item['name']
- _type = _item['type'].lower()
- if _type not in _map :
- _iceType = StringType()
- else:
- _iceType = _map[_type]
-
- _iceSchema.add (StructField(_name,_iceType,True))
- return _iceSchema if len(_iceSchema) else []
- def write(self,_data,**_args):
- _prefix = self._getPrefix(**_args)
- if 'table' not in _args and not self._table :
- raise Exception (f"Table Name should be specified for catalog/database {_prefix}")
- _schema = self.format(_args['schema']) if 'schema' in _args else []
- if not _schema :
- rdd = self._session.createDataFrame(_data,verifySchema=False)
- else :
- rdd = self._session.createDataFrame(_data,schema=_schema,verifySchema=True)
- _mode = self._mode if 'mode' not in _args else _args['mode']
- _table = self._table if 'table' not in _args else _args['table']
-
- # print (_data.shape,_mode,_table)
-
- if not self._session.catalog.tableExists(_table):
- # # @TODO:
- # # add partitioning information here
- rdd.writeTo(_table).using('iceberg').create()
-
- # # _mode = 'overwrite'
- # # rdd.write.format('iceberg').mode(_mode).saveAsTable(_table)
- else:
- # rdd.writeTo(_table).append()
- # # _table = f'{_prefix}.{_table}'
- rdd.coalesce(10).write.format('iceberg').mode('append').save(_table)
|