|
| 1 | +### Spark Configuration (Best Practices) |
| 2 | + |
| 3 | +```python |
| 4 | +# Enable Fabric optimizations |
| 5 | +spark.conf.set("spark.sql.parquet.vorder.enabled", "true") |
| 6 | +spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true") |
| 7 | +``` |
| 8 | + |
| 9 | +### Reading Data |
| 10 | + |
| 11 | +```python |
| 12 | +# Read CSV file |
| 13 | +df = spark.read.format("csv") \ |
| 14 | + .option("header", "true") \ |
| 15 | + .option("inferSchema", "true") \ |
| 16 | + .load("Files/bronze/data.csv") |
| 17 | + |
| 18 | +# Read JSON file |
| 19 | +df = spark.read.format("json").load("Files/bronze/data.json") |
| 20 | + |
| 21 | +# Read Parquet file |
| 22 | +df = spark.read.format("parquet").load("Files/bronze/data.parquet") |
| 23 | + |
| 24 | +# Read Delta table |
| 25 | +df = spark.read.table("my_delta_table") |
| 26 | + |
| 27 | +# Read from SQL endpoint |
| 28 | +df = spark.sql("SELECT * FROM lakehouse.my_table") |
| 29 | +``` |
| 30 | + |
| 31 | +### Writing Delta Tables |
| 32 | + |
| 33 | +```python |
| 34 | +# Write DataFrame as managed Delta table |
| 35 | +df.write.format("delta") \ |
| 36 | + .mode("overwrite") \ |
| 37 | + .saveAsTable("silver_customers") |
| 38 | + |
| 39 | +# Write with partitioning |
| 40 | +df.write.format("delta") \ |
| 41 | + .mode("overwrite") \ |
| 42 | + .partitionBy("year", "month") \ |
| 43 | + .saveAsTable("silver_transactions") |
| 44 | + |
| 45 | +# Append to existing table |
| 46 | +df.write.format("delta") \ |
| 47 | + .mode("append") \ |
| 48 | + .saveAsTable("silver_events") |
| 49 | +``` |
| 50 | + |
| 51 | +### Delta Table Operations (CRUD) |
| 52 | + |
| 53 | +```python |
| 54 | +# UPDATE |
| 55 | +spark.sql(""" |
| 56 | + UPDATE silver_customers |
| 57 | + SET status = 'active' |
| 58 | + WHERE last_login > '2024-01-01' -- Example date, adjust as needed |
| 59 | +""") |
| 60 | + |
| 61 | +# DELETE |
| 62 | +spark.sql(""" |
| 63 | + DELETE FROM silver_customers |
| 64 | + WHERE is_deleted = true |
| 65 | +""") |
| 66 | + |
| 67 | +# MERGE (Upsert) |
| 68 | +spark.sql(""" |
| 69 | + MERGE INTO silver_customers AS target |
| 70 | + USING staging_customers AS source |
| 71 | + ON target.customer_id = source.customer_id |
| 72 | + WHEN MATCHED THEN UPDATE SET * |
| 73 | + WHEN NOT MATCHED THEN INSERT * |
| 74 | +""") |
| 75 | +``` |
| 76 | + |
| 77 | +### Schema Definition |
| 78 | + |
| 79 | +```python |
| 80 | +from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DecimalType |
| 81 | + |
| 82 | +schema = StructType([ |
| 83 | + StructField("id", IntegerType(), False), |
| 84 | + StructField("name", StringType(), True), |
| 85 | + StructField("email", StringType(), True), |
| 86 | + StructField("amount", DecimalType(18, 2), True), |
| 87 | + StructField("created_at", TimestampType(), True) |
| 88 | +]) |
| 89 | + |
| 90 | +df = spark.read.format("csv") \ |
| 91 | + .schema(schema) \ |
| 92 | + .option("header", "true") \ |
| 93 | + .load("Files/bronze/customers.csv") |
| 94 | +``` |
| 95 | + |
| 96 | +### SQL Magic in Notebooks |
| 97 | + |
| 98 | +```sql |
| 99 | +%%sql |
| 100 | +-- Query Delta table directly |
| 101 | +SELECT |
| 102 | + customer_id, |
| 103 | + COUNT(*) as order_count, |
| 104 | + SUM(amount) as total_amount |
| 105 | +FROM gold_orders |
| 106 | +GROUP BY customer_id |
| 107 | +ORDER BY total_amount DESC |
| 108 | +LIMIT 10 |
| 109 | +``` |
| 110 | + |
| 111 | +### V-Order Optimization |
| 112 | + |
| 113 | +```python |
| 114 | +# Enable V-Order for read optimization |
| 115 | +spark.conf.set("spark.sql.parquet.vorder.enabled", "true") |
| 116 | +``` |
| 117 | + |
| 118 | +### Table Optimization |
| 119 | + |
| 120 | +```sql |
| 121 | +%%sql |
| 122 | +-- Optimize table (compact small files) |
| 123 | +OPTIMIZE silver_transactions |
| 124 | + |
| 125 | +-- Optimize with Z-ordering on query columns |
| 126 | +OPTIMIZE silver_transactions ZORDER BY (customer_id, transaction_date) |
| 127 | + |
| 128 | +-- Vacuum old files (default 7 days retention) |
| 129 | +VACUUM silver_transactions |
| 130 | + |
| 131 | +-- Vacuum with custom retention |
| 132 | +VACUUM silver_transactions RETAIN 168 HOURS |
| 133 | + |
| 134 | +``` |
| 135 | + |
| 136 | +### Incremental Load Pattern |
| 137 | + |
| 138 | +```python |
| 139 | +from pyspark.sql.functions import col |
| 140 | + |
| 141 | +# Get last processed watermark |
| 142 | +last_watermark = spark.sql(""" |
| 143 | + SELECT MAX(processed_timestamp) as watermark |
| 144 | + FROM silver_orders |
| 145 | +""").collect()[0]["watermark"] |
| 146 | + |
| 147 | +# Load only new records |
| 148 | +new_records = spark.read.format("delta") \ |
| 149 | + .table("bronze_orders") \ |
| 150 | + .filter(col("created_at") > last_watermark) |
| 151 | + |
| 152 | +# Merge new records |
| 153 | +new_records.createOrReplaceTempView("staging_orders") |
| 154 | +spark.sql(""" |
| 155 | + MERGE INTO silver_orders AS target |
| 156 | + USING staging_orders AS source |
| 157 | + ON target.order_id = source.order_id |
| 158 | + WHEN MATCHED THEN UPDATE SET * |
| 159 | + WHEN NOT MATCHED THEN INSERT * |
| 160 | +""") |
| 161 | +``` |
| 162 | + |
| 163 | +### SCD Type 2 Pattern |
| 164 | + |
| 165 | +```python |
| 166 | +from pyspark.sql.functions import current_timestamp, lit |
| 167 | + |
| 168 | +# Close existing records |
| 169 | +spark.sql(""" |
| 170 | + UPDATE dim_customer |
| 171 | + SET is_current = false, end_date = current_timestamp() |
| 172 | + WHERE customer_id IN (SELECT customer_id FROM staging_customer) |
| 173 | + AND is_current = true |
| 174 | +""") |
| 175 | + |
| 176 | +# Insert new versions |
| 177 | +spark.sql(""" |
| 178 | + INSERT INTO dim_customer |
| 179 | + SELECT |
| 180 | + customer_id, |
| 181 | + name, |
| 182 | + email, |
| 183 | + address, |
| 184 | + current_timestamp() as start_date, |
| 185 | + null as end_date, |
| 186 | + true as is_current |
| 187 | + FROM staging_customer |
| 188 | +""") |
| 189 | +``` |
0 commit comments