Skip to content

Instantly share code, notes, and snippets.

@brannn
Created January 14, 2025 18:16
Show Gist options
  • Select an option

  • Save brannn/8c4f9017ba64f33d0083e1da309d3ca8 to your computer and use it in GitHub Desktop.

Select an option

Save brannn/8c4f9017ba64f33d0083e1da309d3ca8 to your computer and use it in GitHub Desktop.
import json
import polars as pl
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
NestedField,
LongType,
StringType,
TimestampType,
DoubleType
)
import pyarrow as pa
from datetime import datetime
import random
# Generate sample JSON data
def generate_sample_data(num_records=10):
data = []
products = ["Widget A", "Widget B", "Widget C", "Widget D"]
regions = ["North", "South", "East", "West"]
for _ in range(num_records):
record = {
"order_id": random.randint(1000, 9999),
"product_name": random.choice(products),
"quantity": random.randint(1, 100),
"unit_price": round(random.uniform(10.0, 500.0), 2),
"region": random.choice(regions),
"timestamp": datetime.now().isoformat()
}
data.append(record)
return data
# Define Iceberg schema
iceberg_schema = Schema(
NestedField(1, "order_id", LongType(), required=True),
NestedField(2, "product_name", StringType(), required=True),
NestedField(3, "quantity", LongType(), required=True),
NestedField(4, "unit_price", DoubleType(), required=True),
NestedField(5, "region", StringType(), required=True),
NestedField(6, "timestamp", TimestampType(), required=True)
)
def write_to_iceberg(json_data, catalog_name, database_name, table_name):
# Convert JSON to Polars DataFrame
df = pl.DataFrame(json_data)
# Convert to Arrow Table with specific schema mapping
arrow_table = df.to_arrow()
# Initialize Iceberg catalog
catalog_properties = {
"uri": "thrift://localhost:9083", # Hive metastore URI
"warehouse": "s3://your-bucket/warehouse/", # S3 warehouse location
"s3.endpoint": "s3.amazonaws.com",
"s3.access-key-id": "your-access-key",
"s3.secret-access-key": "your-secret-key"
}
catalog = load_catalog(catalog_name, **catalog_properties)
# Create table if it doesn't exist
if not catalog.table_exists(f"{database_name}.{table_name}"):
catalog.create_table(
identifier=f"{database_name}.{table_name}",
schema=iceberg_schema,
location=f"s3://your-bucket/warehouse/{database_name}.db/{table_name}"
)
# Get table reference
table = catalog.load_table(f"{database_name}.{table_name}")
# Write Arrow table to Iceberg
table.append(arrow_table)
def main():
# Generate sample data
json_data = generate_sample_data(100)
# Configuration
catalog_name = "hive_catalog"
database_name = "sales"
table_name = "orders"
# Write to Iceberg
write_to_iceberg(json_data, catalog_name, database_name, table_name)
print("Successfully wrote data to Iceberg table")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment