iceberg.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. """
  2. dependency:
  3. - spark and SPARK_HOME environment variable must be set
  4. NOTE:
  5. When using streaming option, insure that it is inline with default (1000 rows) or increase it in spark-defaults.conf
  6. """
  7. from pyspark.sql import SparkSession
  8. from pyspark import SparkContext
  9. from pyspark.sql.types import *
  10. from pyspark.sql.functions import col, to_date, to_timestamp
  11. import copy
  12. class Iceberg :
  13. def __init__(self,**_args):
  14. """
  15. providing catalog meta information (you must get this from apache iceberg)
  16. """
  17. #
  18. # Turning off logging (it's annoying & un-professional)
  19. #
  20. # _spconf = SparkContext()
  21. # _spconf.setLogLevel("ERROR")
  22. #
  23. # @TODO:
  24. # Make arrangements for additional configuration elements
  25. #
  26. self._session = SparkSession.builder.appName("data-transport").getOrCreate()
  27. self._session.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
  28. # self._session.sparkContext.setLogLevel("ERROR")
  29. self._catalog = self._session.catalog
  30. self._table = _args['table'] if 'table' in _args else None
  31. if 'catalog' in _args :
  32. #
  33. # Let us set the default catalog
  34. self._catalog.setCurrentCatalog(_args['catalog'])
  35. else:
  36. # No current catalog has been set ...
  37. pass
  38. if 'database' in _args :
  39. self._database = _args['database']
  40. self._catalog.setCurrentDatabase(self._database)
  41. else:
  42. #
  43. # Should we set the default as the first one if available ?
  44. #
  45. pass
  46. self._catalogName = self._catalog.currentCatalog()
  47. self._databaseName = self._catalog.currentDatabase()
  48. def meta (self,**_args) :
  49. """
  50. This function should return the schema of a table (only)
  51. """
  52. _schema = []
  53. try:
  54. _table = _args['table'] if 'table' in _args else self._table
  55. _tableName = self._getPrefix(**_args) + f".{_table}"
  56. _tmp = self._session.table(_tableName).schema
  57. _schema = _tmp.jsonValue()['fields']
  58. for _item in _schema :
  59. del _item['nullable'],_item['metadata']
  60. except Exception as e:
  61. pass
  62. return _schema
  63. def _getPrefix (self,**_args):
  64. _catName = self._catalogName if 'catalog' not in _args else _args['catalog']
  65. _datName = self._databaseName if 'database' not in _args else _args['database']
  66. return '.'.join([_catName,_datName])
  67. def apply(self,_query):
  68. """
  69. sql query/command to run against apache iceberg
  70. """
  71. return self._session.sql(_query).toPandas()
  72. def has (self,**_args):
  73. try:
  74. _prefix = self._getPrefix(**_args)
  75. if _prefix.endswith('.') :
  76. return False
  77. return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)]
  78. except Exception as e:
  79. print (e)
  80. return False
  81. def close(self):
  82. self._session.stop()
  83. class Reader(Iceberg) :
  84. def __init__(self,**_args):
  85. super().__init__(**_args)
  86. def read(self,**_args):
  87. _table = self._table
  88. _prefix = self._getPrefix(**_args)
  89. if 'table' in _args or _table:
  90. _table = _args['table'] if 'table' in _args else _table
  91. _table = _prefix + f'.{_table}'
  92. return self._session.table(_table).toPandas()
  93. else:
  94. sql = _args['sql']
  95. return self._session.sql(sql).toPandas()
  96. pass
  97. class Writer (Iceberg):
  98. """
  99. Writing data to an Apache Iceberg data warehouse (using pyspark)
  100. """
  101. def __init__(self,**_args):
  102. super().__init__(**_args)
  103. self._mode = 'append' if 'mode' not in _args else _args['mode']
  104. self._table = None if 'table' not in _args else _args['table']
  105. def format (self,_schema) :
  106. _iceSchema = StructType([])
  107. _map = {'integer':IntegerType(),'float':DoubleType(),'double':DoubleType(),'date':DateType(),
  108. 'timestamp':TimestampType(),'datetime':TimestampType(),'string':StringType(),'varchar':StringType()}
  109. for _item in _schema :
  110. _name = _item['name']
  111. _type = _item['type'].lower()
  112. if _type not in _map :
  113. _iceType = StringType()
  114. else:
  115. _iceType = _map[_type]
  116. _iceSchema.add (StructField(_name,_iceType,True))
  117. return _iceSchema if len(_iceSchema) else []
  118. def write(self,_data,**_args):
  119. _prefix = self._getPrefix(**_args)
  120. if 'table' not in _args and not self._table :
  121. raise Exception (f"Table Name should be specified for catalog/database {_prefix}")
  122. _schema = self.format(_args['schema']) if 'schema' in _args else []
  123. if not _schema :
  124. rdd = self._session.createDataFrame(_data,verifySchema=False)
  125. else :
  126. rdd = self._session.createDataFrame(_data,schema=_schema,verifySchema=True)
  127. _mode = self._mode if 'mode' not in _args else _args['mode']
  128. _table = self._table if 'table' not in _args else _args['table']
  129. # print (_data.shape,_mode,_table)
  130. if not self._session.catalog.tableExists(_table):
  131. # # @TODO:
  132. # # add partitioning information here
  133. rdd.writeTo(_table).using('iceberg').create()
  134. # # _mode = 'overwrite'
  135. # # rdd.write.format('iceberg').mode(_mode).saveAsTable(_table)
  136. else:
  137. # rdd.writeTo(_table).append()
  138. # # _table = f'{_prefix}.{_table}'
  139. rdd.coalesce(10).write.format('iceberg').mode('append').save(_table)