Created
January 14, 2025 18:16
-
-
Save brannn/8c4f9017ba64f33d0083e1da309d3ca8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import polars as pl | |
| from pyiceberg.catalog import load_catalog | |
| from pyiceberg.schema import Schema | |
| from pyiceberg.types import ( | |
| NestedField, | |
| LongType, | |
| StringType, | |
| TimestampType, | |
| DoubleType | |
| ) | |
| import pyarrow as pa | |
| from datetime import datetime | |
| import random | |
| # Generate sample JSON data | |
| def generate_sample_data(num_records=10): | |
| data = [] | |
| products = ["Widget A", "Widget B", "Widget C", "Widget D"] | |
| regions = ["North", "South", "East", "West"] | |
| for _ in range(num_records): | |
| record = { | |
| "order_id": random.randint(1000, 9999), | |
| "product_name": random.choice(products), | |
| "quantity": random.randint(1, 100), | |
| "unit_price": round(random.uniform(10.0, 500.0), 2), | |
| "region": random.choice(regions), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| data.append(record) | |
| return data | |
| # Define Iceberg schema | |
| iceberg_schema = Schema( | |
| NestedField(1, "order_id", LongType(), required=True), | |
| NestedField(2, "product_name", StringType(), required=True), | |
| NestedField(3, "quantity", LongType(), required=True), | |
| NestedField(4, "unit_price", DoubleType(), required=True), | |
| NestedField(5, "region", StringType(), required=True), | |
| NestedField(6, "timestamp", TimestampType(), required=True) | |
| ) | |
| def write_to_iceberg(json_data, catalog_name, database_name, table_name): | |
| # Convert JSON to Polars DataFrame | |
| df = pl.DataFrame(json_data) | |
| # Convert to Arrow Table with specific schema mapping | |
| arrow_table = df.to_arrow() | |
| # Initialize Iceberg catalog | |
| catalog_properties = { | |
| "uri": "thrift://localhost:9083", # Hive metastore URI | |
| "warehouse": "s3://your-bucket/warehouse/", # S3 warehouse location | |
| "s3.endpoint": "s3.amazonaws.com", | |
| "s3.access-key-id": "your-access-key", | |
| "s3.secret-access-key": "your-secret-key" | |
| } | |
| catalog = load_catalog(catalog_name, **catalog_properties) | |
| # Create table if it doesn't exist | |
| if not catalog.table_exists(f"{database_name}.{table_name}"): | |
| catalog.create_table( | |
| identifier=f"{database_name}.{table_name}", | |
| schema=iceberg_schema, | |
| location=f"s3://your-bucket/warehouse/{database_name}.db/{table_name}" | |
| ) | |
| # Get table reference | |
| table = catalog.load_table(f"{database_name}.{table_name}") | |
| # Write Arrow table to Iceberg | |
| table.append(arrow_table) | |
| def main(): | |
| # Generate sample data | |
| json_data = generate_sample_data(100) | |
| # Configuration | |
| catalog_name = "hive_catalog" | |
| database_name = "sales" | |
| table_name = "orders" | |
| # Write to Iceberg | |
| write_to_iceberg(json_data, catalog_name, database_name, table_name) | |
| print("Successfully wrote data to Iceberg table") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment