databricks-pipelines
PublicRepository: databricks/databricks-agent-skills
Low Risk
No security issues found
Skill manifest does not include a 'license' field. Specifying a license helps users understand usage terms.
Remediation Add 'license' field to SKILL.md frontmatter (e.g., MIT, Apache-2.0)
Description
Develop Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables) on Databricks. Use when building batch or streaming data pipelines with Python or SQL. Invoke BEFORE starting implementation.
Details
Requires databricks CLI (>= v0.292.0)
- version
- 0.1.0
Skill Files
# Lakeflow Spark Declarative Pipelines Development
**FIRST**: Use the parent `databricks` skill for CLI basics, authentication, profile selection, and data discovery commands.
## Decision Tree
Use this tree to determine which dataset type and features to use. Multiple features can apply to the same dataset — e.g., a Streaming Table can use Auto Loader for ingestion, Append Flows for fan-in, and Expectations for data quality. Choose the dataset type first, then layer on applicable features.
```
User request → What kind of output?
├── Intermediate/reusable logic (not persisted) → Temporary View
│ ├── Preprocessing/filtering before Auto CDC → Temporary View feeding CDC flow
│ ├── Shared intermediate streaming logic reused by multiple downstream tables
│ ├── Pipeline-private helper logic (not published to catalog)
│ └── Published to UC for external queries → Persistent View (SQL only)
├── Persisted dataset
│ ├── Source is streaming/incremental/continuously growing → Streaming Table
│ │ ├── File ingestion (cloud storage, Volumes) → Auto Loader
│ │ ├── Message bus (Kafka, Kinesis, Pub/Sub, Pulsar, Event Hubs) → streaming source read
│ │ ├── Existing streaming/Delta table → streaming read from table
│ │ ├── CDC / upserts / track changes / keep latest per key / SCD Type 1 or 2 → Auto CDC
│ │ ├── Multiple sources into one table → Append Flows (NOT union)
│ │ ├── Historical backfill + live stream → one-time Append Flow + regular flow
│ │ └── Windowed aggregation with watermark → stateful streaming
│ └── Source is batch/historical/full scan → Materialized View
│ ├── Aggregation/join across full dataset (GROUP BY, SUM, COUNT, etc.)
│ ├── Gold layer aggregation from streaming table → MV with batch read (spark.read / no STREAM)
│ ├── JDBC/Federation/external batch sources
│ └── Small static file load (reference data, no streaming read)
├── Output to external system (Python only) → Sink
│ ├── Existing external table not managed by this pipeline → Sink with format="delta"
│ │ (prefer fully-qualified dataset names if the pipeline should own the table — see Publishing Modes)
│ ├── Kafka / Event Hubs → Sink with format="kafka" + @dp.append_flow(target="sink_name")
│ ├── Custom destination not natively supported → Sink with custom format
│ ├── Custom merge/upsert logic per batch → ForEachBatch Sink (Public Preview)
│ └── Multiple destinations per batch → ForEachBatch Sink (Public Preview)
└── Data quality constraints → Expectations (on any dataset type)
```
## Common Traps
- **"Create a table"** without specifying type → ask whether the source is streaming or batch
- **Materialized View from streaming source** is an error → use a Streaming Table instead, or switch to a batch read
- **Streaming Table from batch source** is an error → use a Materialized View instead, or switch to a streaming read
- **Aggregation over streaming table** → use a Materialized View with batch read (`spark.read.table` / `SELECT FROM` without `STREAM`), NOT a Streaming Table. This is the correct pattern for Gold layer aggregation.
- **Aggregation over batch/historical data** → use a Materialized View, not a Streaming Table. MVs recompute or incrementally refresh aggregates to stay correct; STs are append-only and don't recompute when source data changes.
- **Preprocessing before Auto CDC** → use a Temporary View to filter/transform the source before feeding into the CDC flow. SQL: the CDC flow reads from the view via `STREAM(view_name)`. Python: use `spark.readStream.table("view_name")`.
- **Intermediate logic → default to Temporary View** → Use a Temporary View for intermediate/preprocessing logic, even when reused by multiple downstream tables. Only consider a Private MV/ST (`private=True` / `CREATE PRIVATE ...`) when the computation is expensive and materializing once would save significant reprocessing.
- **View vs Temporary View** → Persistent Views publish to Unity Catalog (SQL only), Temporary Views are pipeline-private
- **Union of streams** → use multiple Append Flows. Do NOT present UNION as an alternative — it is an anti-pattern for streaming sources.
- **Changing dataset type** → cannot change ST→MV or MV→ST without manually dropping the existing table first. Full refresh does NOT help. Rename the new dataset instead.
- **SQL `OR REFRESH`** → Prefer `CREATE OR REFRESH` over bare `CREATE` for SQL dataset definitions. Both work identically, but `OR REFRESH` is the idiomatic convention. For PRIVATE datasets: `CREATE OR REFRESH PRIVATE STREAMING TABLE` / `CREATE OR REFRESH PRIVATE MATERIALIZED VIEW`.
- **Kafka/Event Hubs sink serialization** → The `value` column is mandatory. Use `to_json(struct(*)) AS value` to serialize the entire row as JSON. Read the sink skill for details.
- **Multi-column sequencing** in Auto CDC → SQL: `SEQUENCE BY STRUCT(col1, col2)`. Python: `sequence_by=struct("col1", "col2")`. Read the auto-cdc skill for details.
- **Auto CDC supports TRUNCATE** (SCD Type 1 only) → SQL: `APPLY AS TRUNCATE WHEN condition`. Python: `apply_as_truncates=expr("condition")`. Do NOT say truncate is unsupported.
- **Python-only features** → Sinks, ForEachBatch Sinks, CDC from snapshots, and custom data sources are Python-only. When the user is working in SQL, explicitly clarify this and suggest switching to Python.
- **MV incremental refresh** → Materialized Views on **serverless** pipelines support automatic incremental refresh for aggregations. Mention the serverless requirement when discussing incremental refresh.
- **Recommend ONE clear approach** → Present a single recommended approach. Do NOT present anti-patterns or significantly inferior alternatives — it confuses users. Only mention alternatives if they are genuinely viable for different trade-offs.
## Publishing Modes
Pipelines use a **default catalog and schema** configured in the pipeline settings. All datasets are published there unless overridden.
- **Fully-qualified names**: Use `catalog.schema.table` in the dataset name to write to a different catalog/schema than the pipeline default. The pipeline creates the dataset there directly — no Sink needed.
- **USE CATALOG / USE SCHEMA**: SQL commands that change the current catalog/schema for all subsequent definitions in the same file.
- **LIVE prefix**: Deprecated. Ignored in the default publishing mode.
- When reading or defining datasets within the pipeline, use the dataset name only — do NOT use fully-qualified names unless the pipeline already does so or the user explicitly requests a different target catalog/schema.
## Comprehensive API Reference
**MANDATORY:** Before implementing, editing, or suggesting any code for a feature, you MUST read the linked reference file for that feature. NO exceptions — always look up the reference before writing code.
Some features require reading multiple skills together:
- **Auto Loader** → also read the streaming-table skill (Auto Loader produces a streaming DataFrame, so the target is a streaming table) and look up format-specific options for the file format being loaded
- **Auto CDC** → also read the streaming-table skill (Auto CDC always targets a streaming table)
- **Sinks** → also read the streaming-table skill (sinks use streaming append flows)
- **Expectations** → also read the corresponding dataset definition skill to ensure constraints are correctly placed
### Dataset Definition APIs
| Feature | Python (current) | Python (deprecated) | SQL (current) | SQL (deprecated) | Skill (Py) | Skill (SQL) |
| -------------------------- | ------------------------------------ | ------------------------------------- | ------------------------------------------- | ----------------------------- | ------------------------------------------------------------------------- | ------------------------------------------------------------------- |
| Streaming Table | `@dp.table()` returning streaming DF | `@dlt.table()` returning streaming DF | `CREATE OR REFRESH STREAMING TABLE` | `CREATE STREAMING LIVE TABLE` | [streaming-table-python](streaming-table/streaming-table-python.md) | [streaming-table-sql](streaming-table/streaming-table-sql.md) |
| Materialized View | `@dp.materialized_view()` | `@dlt.table()` returning batch DF | `CREATE OR REFRESH MATERIALIZED VIEW` | `CREATE LIVE TABLE` (batch) | [materialized-view-python](materialized-view/materialized-view-python.md) | [materialized-view-sql](materialized-view/materialized-view-sql.md) |
| Temporary View | `@dp.temporary_view()` | `@dlt.view()`, `@dp.view()` | `CREATE TEMPORARY VIEW` | `CREATE TEMPORARY LIVE VIEW` | [temporary-view-python](temporary-view/temporary-view-python.md) | [temporary-view-sql](temporary-view/temporary-view-sql.md) |
| Persistent View (UC) | N/A — SQL only | — | `CREATE VIEW` | — | — | [view-sql](view/view-sql.md) |
| Streaming Table (explicit) | `dp.create_streaming_table()` | `dlt.create_streaming_table()` | `CREATE OR REFRESH STREAMING TABLE` (no AS) | — | [streaming-table-python](streaming-table/streaming-table-python.md) | [streaming-table-sql](streaming-table/streaming-table-sql.md) |
### Flow and Sink APIs
| Feature | Python (current) | Python (deprecated) | SQL (current) | SQL (deprecated) | Skill (Py) | Skill (SQL) |
| ---------------------------- | ---------------------------- | ----------------------------- | -------------------------------------- | ---------------- | ---------------------------------------------------------------------------- | ------------------------------------------------------------- |
| Append Flow | `@dp.append_flow()` | `@dlt.append_flow()` | `CREATE FLOW ... INSERT INTO` | — | [streaming-table-python](streaming-table/streaming-table-python.md) | [streaming-table-sql](streaming-table/streaming-table-sql.md) |
| Backfill Flow | `@dp.append_flow(once=True)` | `@dlt.append_flow(once=True)` | `CREATE FLOW ... INSERT INTO ... ONCE` | — | [streaming-table-python](streaming-table/streaming-table-python.md) | [streaming-table-sql](streaming-table/streaming-table-sql.md) |
| Sink (Delta/Kafka/EH/custom) | `dp.create_sink()` | `dlt.create_sink()` | N/A — Python only | — | [sink-python](sink/sink-python.md) | — |
| ForEachBatch Sink | `@dp.foreach_batch_sink()` | — | N/A — Python only | — | [foreach-batch-sink-python](foreach-batch-sink/foreach-batch-sink-python.md) | — |
### CDC APIs
| Feature | Python (current) | Python (deprecated) | SQL (current) | SQL (deprecated) | Skill (Py) | Skill (SQL) |
| ---------------------------- | ----------------------------------------- | ------------------------------------------- | ------------------------------- | ------------------------------------ | ---------------------------------------------- | ---------------------------------------- |
| Auto CDC (streaming source) | `dp.create_auto_cdc_flow()` | `dlt.apply_changes()`, `dp.apply_changes()` | `AUTO CDC INTO ... FROM STREAM` | `APPLY CHANGES INTO ... FROM STREAM` | [auto-cdc-python](auto-cdc/auto-cdc-python.md) | [auto-cdc-sql](auto-cdc/auto-cdc-sql.md) |
| Auto CDC (periodic snapshot) | `dp.create_auto_cdc_from_snapshot_flow()` | `dlt.apply_changes_from_snapshot()` | N/A — Python only | — | [auto-cdc-python](auto-cdc/auto-cdc-python.md) | — |
### Data Quality APIs
| Feature | Python (current) | Python (deprecated) | SQL (current) | Skill (Py) | Skill (SQL) |
| ------------------ | ---------------------------- | ----------------------------- | ------------------------------------------------------ | ---------------------------------------------------------- | ---------------------------------------------------- |
| Expect (warn) | `@dp.expect()` | `@dlt.expect()` | `CONSTRAINT ... EXPECT (...)` | [expectations-python](expectations/expectations-python.md) | [expectations-sql](expectations/expectations-sql.md) |
| Expect or drop | `@dp.expect_or_drop()` | `@dlt.expect_or_drop()` | `CONSTRAINT ... EXPECT (...) ON VIOLATION DROP ROW` | [expectations-python](expectations/expectations-python.md) | [expectations-sql](expectations/expectations-sql.md) |
| Expect or fail | `@dp.expect_or_fail()` | `@dlt.expect_or_fail()` | `CONSTRAINT ... EXPECT (...) ON VIOLATION FAIL UPDATE` | [expectations-python](expectations/expectations-python.md) | [expectations-sql](expectations/expectations-sql.md) |
| Expect all (warn) | `@dp.expect_all({})` | `@dlt.expect_all({})` | Multiple `CONSTRAINT` clauses | [expectations-python](expectations/expectations-python.md) | [expectations-sql](expectations/expectations-sql.md) |
| Expect all or drop | `@dp.expect_all_or_drop({})` | `@dlt.expect_all_or_drop({})` | Multiple constraints with `DROP ROW` | [expectations-python](expectations/expectations-python.md) | [expectations-sql](expectations/expectations-sql.md) |
| Expect all or fail | `@dp.expect_all_or_fail({})` | `@dlt.expect_all_or_fail({})` | Multiple constraints with `FAIL UPDATE` | [expectations-python](expectations/expectations-python.md) | [expectations-sql](expectations/expectations-sql.md) |
### Reading Data APIs
| Feature | Python (current) | Python (deprecated) | SQL (current) | SQL (deprecated) | Skill (Py) | Skill (SQL) |
| --------------------------------- | ---------------------------------------------- | --------------------------------------------------- | ------------------------------------------------ | ---------------------------------- | ------------------------------------------------------------------- | ------------------------------------------------------------- |
| Batch read (pipeline dataset) | `spark.read.table("name")` | `dp.read("name")`, `dlt.read("name")` | `SELECT ... FROM name` | `SELECT ... FROM LIVE.name` | — | — |
| Streaming read (pipeline dataset) | `spark.readStream.table("name")` | `dp.read_stream("name")`, `dlt.read_stream("name")` | `SELECT ... FROM STREAM name` | `SELECT ... FROM STREAM LIVE.name` | — | — |
| Auto Loader (cloud files) | `spark.readStream.format("cloudFiles")` | — | `STREAM read_files(...)` | — | [auto-loader-python](auto-loader/auto-loader-python.md) | [auto-loader-sql](auto-loader/auto-loader-sql.md) |
| Kafka source | `spark.readStream.format("kafka")` | — | `STREAM read_kafka(...)` | — | — | — |
| Kinesis source | `spark.readStream.format("kinesis")` | — | `STREAM read_kinesis(...)` | — | — | — |
| Pub/Sub source | `spark.readStream.format("pubsub")` | — | `STREAM read_pubsub(...)` | — | — | — |
| Pulsar source | `spark.readStream.format("pulsar")` | — | `STREAM read_pulsar(...)` | — | — | — |
| Event Hubs source | `spark.readStream.format("kafka")` + EH config | — | `STREAM read_kafka(...)` + EH config | — | — | — |
| JDBC / Lakehouse Federation | `spark.read.format("postgresql")` etc. | — | Direct table ref via federation catalog | — | — | — |
| Custom data source | `spark.read[Stream].format("custom")` | — | N/A — Python only | — | — | — |
| Static file read (batch) | `spark.read.format("json"\|"csv"\|...).load()` | — | `read_files(...)` (no STREAM) | — | — | — |
| Skip upstream change commits | `.option("skipChangeCommits", "true")` | — | `read_stream("name", skipChangeCommits => true)` | — | [streaming-table-python](streaming-table/streaming-table-python.md) | [streaming-table-sql](streaming-table/streaming-table-sql.md) |
### Table/Schema Feature APIs
| Feature | Python (current) | SQL (current) | Skill (Py) | Skill (SQL) |
| ---------------------------- | ----------------------------------------------------- | --------------------------------------- | ------------------------------------------------------------------------- | ------------------------------------------------------------------- |
| Liquid clustering | `cluster_by=[...]` | `CLUSTER BY (col1, col2)` | [materialized-view-python](materialized-view/materialized-view-python.md) | [materialized-view-sql](materialized-view/materialized-view-sql.md) |
| Auto liquid clustering | `cluster_by_auto=True` | `CLUSTER BY AUTO` | [materialized-view-python](materialized-view/materialized-view-python.md) | [materialized-view-sql](materialized-view/materialized-view-sql.md) |
| Partition columns | `partition_cols=[...]` | `PARTITIONED BY (col1, col2)` | [materialized-view-python](materialized-view/materialized-view-python.md) | [materialized-view-sql](materialized-view/materialized-view-sql.md) |
| Table properties | `table_properties={...}` | `TBLPROPERTIES (...)` | [materialized-view-python](materialized-view/materialized-view-python.md) | [materialized-view-sql](materialized-view/materialized-view-sql.md) |
| Explicit schema | `schema="col1 TYPE, ..."` | `(col1 TYPE, ...) AS` | [materialized-view-python](materialized-view/materialized-view-python.md) | [materialized-view-sql](materialized-view/materialized-view-sql.md) |
| Generated columns | `schema="..., col TYPE GENERATED ALWAYS AS (expr)"` | `col TYPE GENERATED ALWAYS AS (expr)` | [materialized-view-python](materialized-view/materialized-view-python.md) | [materialized-view-sql](materialized-view/materialized-view-sql.md) |
| Row filter (Public Preview) | `row_filter="ROW FILTER fn ON (col)"` | `WITH ROW FILTER fn ON (col)` | [materialized-view-python](materialized-view/materialized-view-python.md) | [materialized-view-sql](materialized-view/materialized-view-sql.md) |
| Column mask (Public Preview) | `schema="..., col TYPE MASK fn USING COLUMNS (col2)"` | `col TYPE MASK fn USING COLUMNS (col2)` | [materialized-view-python](materialized-view/materialized-view-python.md) | [materialized-view-sql](materialized-view/materialized-view-sql.md) |
| Private dataset | `private=True` | `CREATE PRIVATE ...` | [materialized-view-python](materialized-view/materialized-view-python.md) | [materialized-view-sql](materialized-view/materialized-view-sql.md) |
### Import / Module APIs
| Current | Deprecated | Notes |
| ------------------------------------------------- | --------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------ |
| `from pyspark import pipelines as dp` | `import dlt` | Both work. Prefer `dp`. Do NOT change existing `dlt` imports. |
| `spark.read.table()` / `spark.readStream.table()` | `dp.read()` / `dp.read_stream()` / `dlt.read()` / `dlt.read_stream()` | Deprecated reads still work. Prefer `spark.*`. |
| — | `LIVE.` prefix | Fully deprecated. NEVER use. Causes errors in newer pipelines. |
| — | `CREATE LIVE TABLE` / `CREATE LIVE VIEW` | Fully deprecated. Use `CREATE STREAMING TABLE` / `CREATE MATERIALIZED VIEW` / `CREATE TEMPORARY VIEW`. |
## Language-specific guides
Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables / DLT) is a framework for building batch and streaming data pipelines.
## Scaffolding a New Pipeline Project
Use `databricks bundle init` with a config file to scaffold non-interactively. This creates a project in the `<project_name>/` directory:
```bash
databricks bundle init lakeflow-pipelines --config-file <(echo '{"project_name": "my_pipeline", "language": "python", "serverless": "yes"}') --profile <PROFILE> < /dev/null
```
- `project_name`: letters, numbers, underscores only
- `language`: `python` or `sql`. Ask the user which they prefer:
- SQL: Recommended for straightforward transformations (filters, joins, aggregations)
- Python: Recommended for complex logic (custom UDFs, ML, advanced processing)
After scaffolding, create `CLAUDE.md` and `AGENTS.md` in the project directory. These files are essential to provide agents with guidance on how to work with the project. Use this content:
```
# Databricks Asset Bundles Project
This project uses Databricks Asset Bundles for deployment.
## Prerequisites
Install the Databricks CLI (>= v0.288.0) if not already installed:
- macOS: `brew tap databricks/tap && brew install databricks`
- Linux: `curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh`
- Windows: `winget install Databricks.DatabricksCLI`
Verify: `databricks -v`
## For AI Agents
Read the `databricks` skill for CLI basics, authentication, and deployment workflow.
Read the `databricks-pipelines` skill for pipeline-specific guidance.
If skills are not available, install them: `databricks experimental aitools skills install`
```
## Pipeline Structure
- Follow the medallion architecture pattern (Bronze → Silver → Gold) unless the user specifies otherwise
- Use the convention of 1 dataset per file, named after the dataset
- Place transformation files in a `src/` or `transformations/` folder
```
my-pipeline-project/
├── databricks.yml # Bundle configuration
├── resources/
│ ├── my_pipeline.pipeline.yml # Pipeline definition
│ └── my_pipeline_job.job.yml # Scheduling job (optional)
└── src/
├── my_table.py (or .sql) # One dataset per file
├── another_table.py (or .sql)
└── ...
```
## Scheduling Pipelines
To schedule a pipeline, add a job that triggers it in `resources/<name>.job.yml`:
```yaml
resources:
jobs:
my_pipeline_job:
trigger:
periodic:
interval: 1
unit: DAYS
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.my_pipeline.id}
```
## Running Pipelines
**You must deploy before running.** In local development, code changes only take effect after `databricks bundle deploy`. Always deploy before any run, dry run, or selective refresh.
- Selective refresh is preferred when you only need to run one table. For selective refresh it is important that dependencies are already materialized.
- **Full refresh is the most expensive and dangerous option, and can lead to data loss**, so it should be used only when really necessary. Always suggest this as a follow-up that the user explicitly needs to select.
## Development Workflow
1. **Validate**: `databricks bundle validate --profile <profile>`
2. **Deploy**: `databricks bundle deploy -t dev --profile <profile>`
3. **Run pipeline**: `databricks bundle run <pipeline_name> -t dev --profile <profile>`
4. **Check status**: `databricks pipelines get --pipeline-id <id> --profile <profile>`
## Pipeline API Reference
Detailed reference guides for each pipeline API. **Read the relevant guide before writing pipeline code.**
- [Write Spark Declarative Pipelines](references/write-spark-declarative-pipelines.md) — Core syntax and rules ([Python](references/python-basics.md), [SQL](references/sql-basics.md))
- [Streaming Tables](references/streaming-table.md) — Continuous data stream processing ([Python](references/streaming-table-python.md), [SQL](references/streaming-table-sql.md))
- [Materialized Views](references/materialized-view.md) — Physically stored query results with incremental refresh ([Python](references/materialized-view-python.md), [SQL](references/materialized-view-sql.md))
- [Views](references/view.md) — Reusable query logic published to Unity Catalog ([SQL](references/view-sql.md))
- [Temporary Views](references/temporary-view.md) — Pipeline-private views ([Python](references/temporary-view-python.md), [SQL](references/temporary-view-sql.md))
- [Auto Loader](references/auto-loader.md) — Incrementally ingest files from cloud storage ([Python](references/auto-loader-python.md), [SQL](references/auto-loader-sql.md))
- [Auto CDC](references/auto-cdc.md) — Process Change Data Capture feeds, SCD Type 1 & 2 ([Python](references/auto-cdc-python.md), [SQL](references/auto-cdc-sql.md))
- [Expectations](references/expectations.md) — Define and enforce data quality constraints ([Python](references/expectations-python.md), [SQL](references/expectations-sql.md))
- [Sinks](references/sink.md) — Write to Kafka, Event Hubs, external Delta tables ([Python](references/sink-python.md))
- [ForEachBatch Sinks](references/foreach-batch-sink.md) — Custom streaming sink with per-batch Python logic ([Python](references/foreach-batch-sink-python.md))
Auto CDC in Spark Declarative Pipelines processes change data capture (CDC) events from streaming sources or snapshots.
**API Reference:**
**dp.create_auto_cdc_flow() / dp.apply_changes() / dlt.create_auto_cdc_flow() / dlt.apply_changes()**
Applies CDC operations (inserts, updates, deletes) from a streaming source to a target table. Supports SCD Type 1 (latest) and Type 2 (history). Does NOT return a value - call at top level without assignment.
```python
dp.create_auto_cdc_flow(
target="<target-table>",
source="<source-table-name>",
keys=["key1", "key2"],
sequence_by="<sequence-column>",
ignore_null_updates=False,
apply_as_deletes=None,
apply_as_truncates=None,
column_list=None,
except_column_list=None,
stored_as_scd_type=1,
track_history_column_list=None,
track_history_except_column_list=None,
name=None,
once=False
)
```
Parameters:
- `target` (str): Target table name (must exist, create with `dp.create_streaming_table()`). **Required.**
- `source` (str): Source table name with CDC events. **Required.**
- `keys` (list): Primary key columns for row identification. **Required.**
- `sequence_by` (str | Column): Column for ordering events (timestamp, version). **Required.** Accepts a string column name or a `Column` expression. For multi-column sequencing, use `struct("col1", "col2")` to order by multiple columns.
- `ignore_null_updates` (bool): If True, NULL values won't overwrite existing non-NULL values
- `apply_as_deletes` (str or Column): Expression identifying delete operations. Use `expr("op = 'D'")` (Column) or `"op = 'D'"` (string).
- `apply_as_truncates` (str or Column): Expression identifying truncate operations. Use `expr("op = 'TRUNCATE'")` (Column) or `"op = 'TRUNCATE'"` (string).
- `column_list` (list): Columns to include (mutually exclusive with `except_column_list`)
- `except_column_list` (list): Columns to exclude
- `stored_as_scd_type` (int): `1` for latest values (default), `2` for full history with `__START_AT`/`__END_AT` columns
- `track_history_column_list` (list): For SCD Type 2, columns to track history for (others use Type 1)
- `track_history_except_column_list` (list): For SCD Type 2, columns to exclude from history tracking
- `name` (str): Flow name (for multiple flows to same target)
- `once` (bool): Process once and stop (default: False)
**dp.create_auto_cdc_from_snapshot_flow() / dp.apply_changes_from_snapshot() / dlt.create_auto_cdc_from_snapshot_flow() / dlt.apply_changes_from_snapshot()**
Applies CDC from full snapshots by comparing to previous state. Automatically infers inserts, updates, deletes.
```python
dp.create_auto_cdc_from_snapshot_flow(
target="<target-table>",
source=<source-table-name-or-callable>,
keys=["key1", "key2"],
stored_as_scd_type=1,
track_history_column_list=None,
track_history_except_column_list=None
)
```
Parameters:
- `target` (str): Target table name (must exist). **Required.**
- `source` (str or callable): **Required.** Can be one of:
- **String**: Source table name containing the full snapshot (most common)
- **Callable**: Function for processing historical snapshots with type `SnapshotAndVersionFunction = Callable[[SnapshotVersion], SnapshotAndVersion]`
- `SnapshotVersion = Union[int, str, float, bytes, datetime.datetime, datetime.date, decimal.Decimal]`
- `SnapshotAndVersion = Optional[Tuple[DataFrame, SnapshotVersion]]`
- Function receives the latest processed snapshot version (or None for first run)
- Must return `None` when no more snapshots to process
- Must return tuple of `(DataFrame, SnapshotVersion)` for next snapshot to process
- Snapshot version is used to track progress and must be comparable/orderable
- `keys` (list): Primary key columns. **Required.**
- `stored_as_scd_type` (int): `1` for latest (default), `2` for history
- `track_history_column_list` (list): Columns to track history for (SCD Type 2)
- `track_history_except_column_list` (list): Columns to exclude from history tracking
**Use create_auto_cdc_flow when:** Processing streaming CDC events from transaction logs, Kafka, Delta change feeds
**Use create_auto_cdc_from_snapshot_flow when:** Processing periodic full snapshots (daily dumps, batch extracts)
**Common Patterns:**
**Pattern 1: Basic CDC flow from streaming source**
```python
# Step 1: Create target table
dp.create_streaming_table(name="users")
# Step 2: Define CDC flow (source must be a table name)
dp.create_auto_cdc_flow(
target="users",
source="user_changes",
keys=["user_id"],
sequence_by="updated_at"
)
```
**Pattern 2: CDC flow with upstream transformation**
```python
# Step 1: Define view with transformation (source preprocessing)
@dp.temporary_view()
def filtered_user_changes():
return (
spark.readStream.table("raw_user_changes")
.filter("user_id IS NOT NULL")
)
# Step 2: Create target table
dp.create_streaming_table(name="users")
# Step 3: Define CDC flow using the view as source
dp.create_auto_cdc_flow(
target="users",
source="filtered_user_changes", # References the view name
keys=["user_id"],
sequence_by="updated_at"
)
# Note: Use distinct names for view and target for clarity
# Note: If "raw_user_changes" is defined in the pipeline and no additional transformations or expectations are needed,
# source="raw_user_changes" can be used directly
```
**Pattern 3: CDC with explicit deletes and truncates**
```python
from pyspark.sql.functions import expr
dp.create_streaming_table(name="orders")
dp.create_auto_cdc_flow(
target="orders",
source="order_events",
keys=["order_id"],
sequence_by="event_timestamp",
apply_as_deletes=expr("operation = 'DELETE'"),
apply_as_truncates=expr("operation = 'TRUNCATE'"),
ignore_null_updates=True
)
```
**Pattern 4: SCD Type 2 (Historical tracking)**
```python
dp.create_streaming_table(name="customer_history")
dp.create_auto_cdc_flow(
target="customer_history",
source="source.customer_changes",
keys=["customer_id"],
sequence_by="changed_at",
stored_as_scd_type=2 # Track full history
)
# Target will include __START_AT and __END_AT columns
```
**Pattern 5: Snapshot-based CDC (Simple - table source)**
```python
dp.create_streaming_table(name="products")
@dp.materialized_view(name="product_snapshot")
def product_snapshot():
return spark.read.table("source.daily_product_dump")
dp.create_auto_cdc_from_snapshot_flow(
target="products",
source="product_snapshot", # String table name - most common
keys=["product_id"],
stored_as_scd_type=1
)
```
**Pattern 6: Snapshot-based CDC (Advanced - callable for historical snapshots)**
```python
dp.create_streaming_table(name="products")
# Define a callable to process historical snapshots sequentially
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[DataFrame, Optional[int]]:
if latest_snapshot_version is None:
return (spark.read.load("products.csv"), 1)
else:
return None
dp.create_auto_cdc_from_snapshot_flow(
target="products",
source=next_snapshot_and_version, # Callable function for historical processing
keys=["product_id"],
stored_as_scd_type=1
)
```
**Pattern 7: Selective column tracking**
```python
dp.create_streaming_table(name="accounts")
dp.create_auto_cdc_flow(
target="accounts",
source="account_changes",
keys=["account_id"],
sequence_by="modified_at",
stored_as_scd_type=2,
track_history_column_list=["balance", "status"], # Only track history for these columns
ignore_null_updates=True
)
```
**KEY RULES:**
- Create target with `dp.create_streaming_table()` before defining CDC flow
- `dp.create_auto_cdc_flow()` does NOT return a value - call it at top level without assigning to a variable
- `source` must be a table name (string) - use `@dp.temporary_view()` to preprocess/filter/transform data before CDC processing. A temporary view is the **preferred** approach for source preprocessing (not a streaming table)
- SCD Type 2 adds `__START_AT` and `__END_AT` columns for validity tracking
- When specifying the schema of the target table for SCD Type 2, you must also include the `__START_AT` and `__END_AT` columns with the same data type as the `sequence_by` field
- Legacy names (`apply_changes`, `apply_changes_from_snapshot`) are equivalent but deprecated - prefer `create_auto_cdc_*` variants
Auto CDC in Declarative Pipelines processes change data capture (CDC) events from streaming sources.
**API Reference:**
**CREATE FLOW ... AS AUTO CDC INTO**
Applies CDC operations (inserts, updates, deletes) from a streaming source to a target table. Supports SCD Type 1 (latest) and Type 2 (history). Must be used with a pre-created streaming table.
```sql
CREATE OR REFRESH STREAMING TABLE <target_table>;
CREATE FLOW <flow_name> AS AUTO CDC INTO <target_table>
FROM <source>
KEYS (<key1>, <key2>)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN <condition>]
[APPLY AS TRUNCATE WHEN <condition>]
SEQUENCE BY <sequence_column>
[COLUMNS {<column_list> | * EXCEPT (<except_column_list>)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {<column_list> | * EXCEPT (<except_column_list>)}]
```
Parameters:
- `target_table` (identifier): Target table name (must exist, create with `CREATE OR REFRESH STREAMING TABLE`). **Required.**
- `flow_name` (identifier): Identifier for the created flow. **Required.**
- `source` (identifier or expression): Streaming source with CDC events. Use `STREAM(<table_name>)` to read with streaming semantics. **Required.**
- `KEYS` (column list): Primary key columns for row identification. **Required.**
- `IGNORE NULL UPDATES` (optional): If specified, NULL values won't overwrite existing non-NULL values
- `APPLY AS DELETE WHEN` (optional): Condition identifying delete operations (e.g., `operation = 'DELETE'`)
- `APPLY AS TRUNCATE WHEN` (optional): Condition identifying truncate operations (supported only for SCD Type 1)
- `SEQUENCE BY` (column or struct): Column for ordering events (timestamp, version). **Required.** For multi-column sequencing, use `SEQUENCE BY STRUCT(timestamp_col, id_col)` to order by the first field first, then break ties with subsequent fields.
- `COLUMNS` (optional): Columns to include or exclude (use `column1, column2` or `* EXCEPT (column1, column2)`)
- `STORED AS` (optional): `SCD TYPE 1` for latest values (default), `SCD TYPE 2` for full history with `__START_AT`/`__END_AT` columns
- `TRACK HISTORY ON` (optional): For SCD Type 2, columns to track history for (others use Type 1)
**Common Patterns:**
**Pattern 1: Basic CDC flow from streaming source**
```sql
-- Step 1: Create target table
CREATE OR REFRESH STREAMING TABLE users;
-- Step 2: Define CDC flow using STREAM() for streaming semantics
CREATE FLOW user_flow AS AUTO CDC INTO users
FROM STREAM(user_changes)
KEYS (user_id)
SEQUENCE BY updated_at;
```
**Pattern 2: CDC with source filtering via temporary view**
```sql
-- Step 1: Create temporary view to filter/transform source data
CREATE OR REFRESH TEMPORARY VIEW filtered_changes AS
SELECT * FROM source_table WHERE status = 'active';
-- Step 2: Create target table
CREATE OR REFRESH STREAMING TABLE active_records;
-- Step 3: Define CDC flow reading from the temporary view
CREATE FLOW active_flow AS AUTO CDC INTO active_records
FROM STREAM(filtered_changes)
KEYS (record_id)
SEQUENCE BY updated_at;
```
**Pattern 3: CDC with explicit deletes**
```sql
CREATE OR REFRESH STREAMING TABLE orders;
CREATE FLOW order_flow AS AUTO CDC INTO orders
FROM STREAM(order_events)
KEYS (order_id)
IGNORE NULL UPDATES
APPLY AS DELETE WHEN operation = 'DELETE'
SEQUENCE BY event_timestamp;
```
**Pattern 4: SCD Type 2 (Historical tracking)**
```sql
CREATE OR REFRESH STREAMING TABLE customer_history;
CREATE FLOW customer_flow AS AUTO CDC INTO customer_history
FROM STREAM(customer_changes)
KEYS (customer_id)
SEQUENCE BY changed_at
STORED AS SCD TYPE 2;
-- Target will include __START_AT and __END_AT columns
```
**Pattern 5: Multi-column sequencing**
```sql
CREATE OR REFRESH STREAMING TABLE events;
CREATE FLOW event_flow AS AUTO CDC INTO events
FROM STREAM(event_changes)
KEYS (event_id)
SEQUENCE BY STRUCT(event_timestamp, event_id)
STORED AS SCD TYPE 1;
```
**Pattern 6: Selective column inclusion**
```sql
CREATE OR REFRESH STREAMING TABLE accounts;
CREATE FLOW account_flow AS AUTO CDC INTO accounts
FROM STREAM(account_changes)
KEYS (account_id)
SEQUENCE BY modified_at
COLUMNS account_id, balance, status
STORED AS SCD TYPE 1;
```
**Pattern 7: Selective column exclusion**
```sql
CREATE OR REFRESH STREAMING TABLE products;
CREATE FLOW product_flow AS AUTO CDC INTO products
FROM STREAM(product_changes)
KEYS (product_id)
SEQUENCE BY updated_at
COLUMNS * EXCEPT (internal_notes, temp_field);
```
**Pattern 8: SCD Type 2 with selective history tracking**
```sql
CREATE OR REFRESH STREAMING TABLE accounts;
CREATE FLOW account_flow AS AUTO CDC INTO accounts
FROM STREAM(account_changes)
KEYS (account_id)
IGNORE NULL UPDATES
SEQUENCE BY modified_at
STORED AS SCD TYPE 2
TRACK HISTORY ON balance, status;
-- Only balance and status changes create new history records
```
**Pattern 9: SCD Type 2 with history tracking exclusion**
```sql
CREATE OR REFRESH STREAMING TABLE accounts;
CREATE FLOW account_flow AS AUTO CDC INTO accounts
FROM STREAM(account_changes)
KEYS (account_id)
SEQUENCE BY modified_at
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (last_login, view_count);
-- Track history on all columns except last_login and view_count
```
**Pattern 10: Truncate support (SCD Type 1 only)**
```sql
CREATE OR REFRESH STREAMING TABLE inventory;
CREATE FLOW inventory_flow AS AUTO CDC INTO inventory
FROM STREAM(inventory_events)
KEYS (product_id)
APPLY AS TRUNCATE WHEN operation = 'TRUNCATE'
SEQUENCE BY event_timestamp
STORED AS SCD TYPE 1;
```
**KEY RULES:**
- Create target with `CREATE OR REFRESH STREAMING TABLE` before defining CDC flow
- `source` must be a streaming source for safe CDC change processing. Use `STREAM(<table_name>)` to read an existing table/view with streaming semantics
- The `STREAM()` function accepts ONLY a table/view identifier - NOT a subquery. Define source data as a separate streaming table or temporary view first, then reference it in the flow
- SCD Type 2 adds `__START_AT` and `__END_AT` columns for validity tracking
- When specifying the schema of the target table for SCD Type 2, you must also include the `__START_AT` and `__END_AT` columns with the same data type as the `SEQUENCE BY` field
- Legacy `APPLY CHANGES INTO` API is equivalent but deprecated - prefer `AUTO CDC INTO`
- `AUTO CDC FROM SNAPSHOT` is only available in Python, not in SQL. SQL only supports `AUTO CDC INTO` for processing CDC events from streaming sources.
# Auto CDC (apply_changes) in Spark Declarative Pipelines The `apply_changes` API enables processing Change Data Capture (CDC) feeds to automatically handle inserts, updates, and deletes in target tables. ## Key Concepts Auto CDC in Spark Declarative Pipelines: - Automatically processes CDC operations (INSERT, UPDATE, DELETE) - Supports SCD Type 1 (update in place) and Type 2 (historical tracking) - Handles ordering of changes via sequence columns - Deduplicates CDC records ## Language-Specific Implementations For detailed implementation guides: - **Python**: [auto-cdc-python.md](auto-cdc-python.md) - **SQL**: [auto-cdc-sql.md](auto-cdc-sql.md) **Note**: The API is also known as `applyChanges` in some contexts.
Auto Loader (`cloudFiles`) is recommended for ingesting from cloud storage.
**Basic Syntax:**
```python
@dp.table()
def my_table():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json") # or csv, parquet, etc.
.load("s3://bucket/path")
)
```
**Critical Spark Declarative Pipelines + Auto Loader Rules:**
- Databricks automatically manages `cloudFiles.schemaLocation` and checkpoint - do NOT specify these
- Auto Loader returns a streaming DataFrame - general API guidelines for `streamingTable` apply (MANDATORY to look up `streamingTable` guide)
- Can be used in either a streaming `@dp.table()` / `@dlt.table()` or via `@dp.append_flow()` / `@dlt.append_flow()`
- Use `spark.readStream` not `spark.read` for streaming ingestion
- If manually specifying a schema, include the rescued data column (default `_rescued_data STRING`, configurable via `rescuedDataColumn` option)
- Common Schema Options:
- `cloudFiles.inferColumnTypes`: Enable type inference (default: strings for JSON/CSV/XML)
- `cloudFiles.schemaHints`: Optionally specify known column types (e.g., `"id int, name string"`)
- File detection: File notification mode recommended for scalability
**Common Auto Loader Options**
Below are all format agnostic options for Auto Loader.
Common Auto Loader Options
| Option | Type | Notes |
| ---------------------------------------- | --------------- | ---------------------------------- |
| cloudFiles.allowOverwrites | Boolean | |
| cloudFiles.backfillInterval | Interval String | |
| cloudFiles.cleanSource | String | |
| cloudFiles.cleanSource.retentionDuration | Interval String | |
| cloudFiles.cleanSource.moveDestination | String | |
| cloudFiles.format | String | |
| cloudFiles.includeExistingFiles | Boolean | |
| cloudFiles.inferColumnTypes | Boolean | |
| cloudFiles.maxBytesPerTrigger | Byte String | |
| cloudFiles.maxFileAge | Interval String | |
| cloudFiles.maxFilesPerTrigger | Integer | |
| cloudFiles.partitionColumns | String | |
| cloudFiles.schemaEvolutionMode | String | |
| cloudFiles.schemaHints | String | |
| cloudFiles.schemaLocation | String | DO NOT SET - managed automatically |
| cloudFiles.useStrictGlobber | Boolean | |
| cloudFiles.validateOptions | Boolean | |
Directory Listing Options
| Option | Type |
| -------------------------------- | ------ |
| cloudFiles.useIncrementalListing | String |
File Notification Options
| Option | Type |
| ------------------------------- | ------------------- |
| cloudFiles.fetchParallelism | Integer |
| cloudFiles.pathRewrites | JSON String |
| cloudFiles.resourceTag | Map(String, String) |
| cloudFiles.useManagedFileEvents | Boolean |
| cloudFiles.useNotifications | Boolean |
AWS-Specific Options
| Option | Type |
| ---------------------------- | ------ |
| cloudFiles.region | String |
| cloudFiles.queueUrl | String |
| cloudFiles.awsAccessKey | String |
| cloudFiles.awsSecretKey | String |
| cloudFiles.roleArn | String |
| cloudFiles.roleExternalId | String |
| cloudFiles.roleSessionName | String |
| cloudFiles.stsEndpoint | String |
| databricks.serviceCredential | String |
Azure-Specific Options
| Option | Type |
| ---------------------------- | ------ |
| cloudFiles.resourceGroup | String |
| cloudFiles.subscriptionId | String |
| cloudFiles.clientId | String |
| cloudFiles.clientSecret | String |
| cloudFiles.connectionString | String |
| cloudFiles.tenantId | String |
| cloudFiles.queueName | String |
| databricks.serviceCredential | String |
GCP-Specific Options
| Option | Type |
| ---------------------------- | ------ |
| cloudFiles.projectId | String |
| cloudFiles.client | String |
| cloudFiles.clientEmail | String |
| cloudFiles.privateKey | String |
| cloudFiles.privateKeyId | String |
| cloudFiles.subscription | String |
| databricks.serviceCredential | String |
Generic File Format Options
| Option | Type |
| -------------------------------- | ---------------- |
| ignoreCorruptFiles | Boolean |
| ignoreMissingFiles | Boolean |
| modifiedAfter | Timestamp String |
| modifiedBefore | Timestamp String |
| pathGlobFilter / fileNamePattern | String |
| recursiveFileLookup | Boolean |
Format-Specific Options
For detailed format-specific options, refer to these files:
- **[JSON Options](options-json.md)**: Options for reading JSON files
- **[CSV Options](options-csv.md)**: Options for reading CSV files
- **[Parquet Options](options-parquet.md)**: Options for reading Parquet files
- **[Avro Options](options-avro.md)**: Options for reading Avro files
- **[ORC Options](options-orc.md)**: Options for reading ORC files
- **[XML Options](options-xml.md)**: Options for reading XML files
- **[Text Options](options-text.md)**: Options for reading text files
See the linked format option files for specific documentation.
**Auto Loader documentation:**
MANDATORY: Look up the official Databricks documentation for detailed information on any specific cloudFiles (Auto Loader) option before use. Each option has extensive documentation. No exceptions.
Auto Loader with SQL (`read_files`) is recommended for ingesting from cloud storage. **Basic Syntax:** ```sql -- Using Auto Loader with CREATE STREAMING TABLE CREATE OR REFRESH STREAMING TABLE my_table AS SELECT * FROM STREAM(read_files( 's3://bucket/path', format => 'json' )); -- Using Auto Loader directly with CREATE FLOW (no intermediate table needed) CREATE STREAMING TABLE target_table; CREATE FLOW ingest_flow AS INSERT INTO target_table BY NAME SELECT * FROM STREAM(read_files( 's3://bucket/path', format => 'json' )); ``` **Critical Spark Declarative Pipelines + Auto Loader Rules:** - **MUST use `STREAM` keyword with `read_files` in streaming contexts** (e.g., `SELECT * FROM STREAM read_files(...)`) - `inferColumnTypes` defaults to `true` - column types are automatically inferred, no need to specify unless setting to `false` - Schema inference: Samples data initially to determine structure, then adapts as new data is encountered - Use `schemaHints` to specify known column types (e.g., `schemaHints => 'id int, name string'`) - Use `schemaEvolutionMode` to control how schema adapts when encountering new columns - Unity Catalog pipelines must use external locations when loading files **Common read_files Options** Below are all format agnostic options for `read_files`. Basic Options | Option | Type | | ------------------ | ------- | | `format` | String | | `inferColumnTypes` | Boolean | | `partitionColumns` | String | | `schemaHints` | String | | `useStrictGlobber` | Boolean | Generic File Format Options | Option | Type | | ------------------------------------ | ---------------- | | `ignoreCorruptFiles` | Boolean | | `ignoreMissingFiles` | Boolean | | `modifiedAfter` | Timestamp String | | `modifiedBefore` | Timestamp String | | `pathGlobFilter` / `fileNamePattern` | String | | `recursiveFileLookup` | Boolean | Streaming Options | Option | Type | | ---------------------- | ----------- | | `allowOverwrites` | Boolean | | `includeExistingFiles` | Boolean | | `maxBytesPerTrigger` | Byte String | | `maxFilesPerTrigger` | Integer | | `schemaEvolutionMode` | String | | `schemaLocation` | String | Format-Specific Options For detailed format-specific options, refer to these files: - **[JSON Options](options-json.md)**: Options for reading JSON files - **[CSV Options](options-csv.md)**: Options for reading CSV files - **[Parquet Options](options-parquet.md)**: Options for reading Parquet files - **[Avro Options](options-avro.md)**: Options for reading Avro files - **[ORC Options](options-orc.md)**: Options for reading ORC files - **[XML Options](options-xml.md)**: Options for reading XML files - **[Text Options](options-text.md)**: Options for reading text files See the linked format option files for specific documentation. **Auto Loader documentation:** MANDATORY: Look up the official Databricks documentation for detailed information on any specific read_files (Auto Loader) option before use. Each option has extensive documentation. No exceptions.
# Auto Loader (cloudFiles) Auto Loader is the recommended approach for incrementally ingesting data from cloud storage into Delta Lake tables. It automatically processes new files as they arrive in cloud storage. ## Key Concepts Auto Loader (`cloudFiles`) provides: - Automatic file discovery and processing - Schema inference and evolution - Exactly-once processing guarantees - Scalable incremental ingestion - Support for various file formats ## Language-Specific Implementations For detailed implementation guides: - **Python**: [auto-loader-python.md](auto-loader-python.md) - **SQL**: [auto-loader-sql.md](auto-loader-sql.md) ## Format-Specific Options For format-specific configuration options, refer to: - **JSON**: [options-json.md](options-json.md) - **CSV**: [options-csv.md](options-csv.md) - **XML**: [options-xml.md](options-xml.md) - **Parquet**: [options-parquet.md](options-parquet.md) - **Avro**: [options-avro.md](options-avro.md) - **Text**: [options-text.md](options-text.md) - **ORC**: [options-orc.md](options-orc.md)
Expectations apply data quality constraints to Lakeflow Spark Declarative Pipelines tables and views in Python. They use SQL Boolean expressions to validate each record and take actions when constraints are violated.
## When to Use Expectations
- Apply to `@dp.materialized_view()`/`@dp.table()`/`@dlt.table()`/`@dp.temporary_view()`/`@dp.view()`/`@dlt.view()` decorated functions
- Use on streaming tables, materialized views, or temporary views
- Stack multiple expectation decorators above the dataset function
## Decorator Types
### Single Expectation Decorators
**@dp.expect(description, constraint)** (or **@dlt.expect(description, constraint)**)
- Logs violations but allows invalid records to pass through
- Collects metrics for monitoring
**@dp.expect_or_drop(description, constraint)** (or **@dlt.expect_or_drop(description, constraint)**)
- Removes invalid records before writing to target
- Logs dropped record metrics
**@dp.expect_or_fail(description, constraint)** (or **@dlt.expect_or_fail(description, constraint)**)
- Stops pipeline execution immediately on violation
- Requires manual intervention to resolve
### Multiple Expectations Decorators
**@dp.expect_all({description: constraint, ...})** (or **@dlt.expect_all({description: constraint, ...})**)
- Applies multiple warn-level expectations
- Takes dictionary of description-constraint pairs
**@dp.expect_all_or_drop({description: constraint, ...})** (or **@dlt.expect_all_or_drop({description: constraint, ...})**)
- Applies multiple drop-level expectations
- Records dropped if any constraint fails
**@dp.expect_all_or_fail({description: constraint, ...})** (or **@dlt.expect_all_or_fail({description: constraint, ...})**)
- Applies multiple fail-level expectations
- Pipeline stops if any constraint fails
## Parameters
**description** (str, required)
- Unique identifier for the constraint within the dataset
- Should clearly communicate what is being validated
- Can be reused across different datasets
**constraint** (str, required)
- SQL Boolean expression evaluated per record
- Must return true or false
- Cannot contain Python functions or UDFs, external calls, or subqueries
- Cannot include subqueries in constraint logic
## Usage Examples
All variants below work on both the `table`, `materialized_view` or `view` decorators.
### Basic Single Expectation
```python
@dp.materialized_view()
@dp.expect("valid_price", "price >= 0")
def sales_data():
return spark.read.table("raw_sales")
@dp.table()
@dp.expect("valid_price", "price >= 0")
def sales_data():
return spark.read.table("raw_sales")
```
### Drop Invalid Records
```python
@dp.materialized_view()
@dp.expect_or_drop("valid_email", "email IS NOT NULL AND email LIKE '%@%'")
def customer_contacts():
return spark.read.table("raw_contacts")
```
### Fail on Critical Violations
```python
@dp.materialized_view()
@dp.expect_or_fail("required_id", "customer_id IS NOT NULL")
def customer_master():
return spark.read.table("raw_customers")
```
### Multiple Expectations
```python
@dp.materialized_view()
@dp.expect_all({
"valid_age": "age >= 0 AND age <= 120",
"valid_country": "country_code IN ('US', 'CA', 'MX')",
"recent_date": "created_date >= '2020-01-01'"
})
def validated_customers():
return spark.read.table("raw_customers")
```
### Stacking Multiple Decorators
```python
@dp.materialized_view(
comment="Clean customer data with quality checks"
)
@dp.expect_or_drop("valid_email", "email LIKE '%@%'")
@dp.expect_or_fail("required_id", "id IS NOT NULL")
@dp.expect("valid_age", "age BETWEEN 0 AND 120")
def customers_clean():
return spark.read.table("raw_customers")
```
### With Views
```python
@dp.view(
name="high_value_customers",
comment="Customers with total purchases over $1000"
)
@dp.expect("valid_total", "total_purchases > 0")
def high_value_view():
return spark.read.table("orders") \
.groupBy("customer_id") \
.agg(sum("amount").alias("total_purchases")) \
.filter("total_purchases > 1000")
```
## Monitoring
- View metrics in pipeline UI
- Query the event log for detailed analytics
- Metrics unavailable if pipeline fails or no updates occur
## Best Practices
- Use unique, descriptive names for each expectation
- Apply `expect_or_fail` for critical business constraints
- Use `expect_or_drop` for data cleansing operations
- Use `expect` for monitoring optional quality metrics
- Keep constraint logic simple and SQL-based only
- Group related expectations using `expect_all` variants
Expectations apply data quality constraints to Lakeflow Spark Declarative Pipelines tables and views in SQL. They use SQL Boolean expressions to validate each record and take actions when constraints are violated.
## When to Use Expectations
- Apply within `CREATE OR REFRESH STREAMING TABLE`, `CREATE OR REFRESH MATERIALIZED VIEW`, or `CREATE LIVE VIEW` statements
- Use as optional clauses in table/view creation statements
- Stack multiple CONSTRAINT clauses (comma-separated) in a single statement
**Note on Temporary Views**: Use `CREATE LIVE VIEW` syntax when you need to include expectations with temporary views. The newer `CREATE TEMPORARY VIEW` syntax does not support CONSTRAINT clauses. `CREATE LIVE VIEW` is retained specifically for this use case, even though `CREATE TEMPORARY VIEW` is otherwise preferred for temporary views without expectations.
## Constraint Syntax
### Single Expectation (Warn)
**CONSTRAINT constraint_name EXPECT (condition)**
- Logs violations but allows invalid records to pass through
- Collects metrics for monitoring
- Invalid records are retained in target dataset
### Single Expectation (Drop)
**CONSTRAINT constraint_name EXPECT (condition) ON VIOLATION DROP ROW**
- Removes invalid records before writing to target
- Logs dropped record metrics
- Invalid records are excluded from target
### Single Expectation (Fail)
**CONSTRAINT constraint_name EXPECT (condition) ON VIOLATION FAIL UPDATE**
- Stops pipeline execution immediately on violation
- Requires manual intervention to resolve
- Transaction rolls back atomically
### Multiple Expectations
Multiple CONSTRAINT clauses can be stacked in a single CREATE statement using commas:
```sql
CREATE OR REFRESH STREAMING TABLE table_name(
CONSTRAINT name1 EXPECT (condition1),
CONSTRAINT name2 EXPECT (condition2) ON VIOLATION DROP ROW,
CONSTRAINT name3 EXPECT (condition3) ON VIOLATION FAIL UPDATE
) AS SELECT ...
```
## Parameters
**constraint_name** (required)
- Unique identifier for the constraint within the dataset
- Should clearly communicate what is being validated
- Can be reused across different datasets
**condition** (required)
- SQL Boolean expression evaluated per record
- Must return true or false
- Can include SQL functions (e.g., year(), date(), CASE statements)
- Cannot contain Python functions or UDFs, external calls, or subqueries
## Usage Examples
### Basic Single Expectation
```sql
CREATE OR REFRESH STREAMING TABLE sales_data(
CONSTRAINT valid_price EXPECT (price >= 0)
) AS
SELECT * FROM STREAM(raw_sales);
```
### Drop Invalid Records
```sql
CREATE OR REFRESH STREAMING TABLE customer_contacts(
CONSTRAINT valid_email EXPECT (
email IS NOT NULL AND email LIKE '%@%'
) ON VIOLATION DROP ROW
) AS
SELECT * FROM STREAM(raw_contacts);
```
### Fail on Critical Violations
```sql
CREATE OR REFRESH MATERIALIZED VIEW customer_master(
CONSTRAINT required_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE
) AS
SELECT * FROM raw_customers;
```
### Multiple Expectations
```sql
CREATE OR REFRESH STREAMING TABLE validated_customers(
CONSTRAINT valid_age EXPECT (age >= 0 AND age <= 120),
CONSTRAINT valid_country EXPECT (country_code IN ('US', 'CA', 'MX')),
CONSTRAINT recent_date EXPECT (created_date >= '2020-01-01')
) AS
SELECT * FROM STREAM(raw_customers);
```
### Stacking Multiple Constraints with Different Actions
```sql
CREATE OR REFRESH STREAMING TABLE customers_clean
(
CONSTRAINT valid_email EXPECT (email LIKE '%@%') ON VIOLATION DROP ROW,
CONSTRAINT required_id EXPECT (id IS NOT NULL) ON VIOLATION FAIL UPDATE,
CONSTRAINT valid_age EXPECT (age BETWEEN 0 AND 120)
)
COMMENT "Clean customer data with quality checks" AS
SELECT * FROM STREAM(raw_customers);
```
### With SQL Functions
```sql
CREATE OR REFRESH STREAMING TABLE transactions(
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020),
CONSTRAINT non_negative_price EXPECT (price >= 0),
CONSTRAINT valid_purchase_date EXPECT (transaction_date <= current_date())
) AS
SELECT * FROM STREAM(raw_transactions);
```
### Complex Business Logic
```sql
CREATE OR REFRESH MATERIALIZED VIEW active_subscriptions(
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
) ON VIOLATION DROP ROW
) AS
SELECT * FROM subscriptions WHERE status = 'active';
```
### With Temporary Views
```sql
CREATE LIVE VIEW high_value_customers(
CONSTRAINT valid_total EXPECT (total_purchases > 0)
)
COMMENT "Customers with total purchases over $1000" AS
SELECT
customer_id,
SUM(amount) AS total_purchases
FROM orders
GROUP BY customer_id
HAVING total_purchases > 1000;
```
## Monitoring
- View metrics in pipeline UI under the **Data quality** tab
- Query the event log for detailed analytics
- Metrics available for `warn` and `drop` actions
- Metrics unavailable if pipeline fails or no updates occur
## Best Practices
- Use unique, descriptive names for each constraint
- Apply `ON VIOLATION FAIL UPDATE` for critical business constraints
- Use `ON VIOLATION DROP ROW` for data cleansing operations
- Use default (warn) behavior for monitoring optional quality metrics
- Keep constraint logic simple
# Expectations (Data Quality) in Spark Declarative Pipelines Expectations enable you to define and enforce data quality constraints on your pipeline tables. ## Key Concepts Expectations in Spark Declarative Pipelines: - Define constraints on data quality - Can drop, fail, or track invalid records - Support complex validation logic - Integrated with pipeline monitoring ## Language-Specific Implementations For detailed implementation guides: - **Python**: [expectations-python.md](expectations-python.md) - **SQL**: [expectations-sql.md](expectations-sql.md)
ForEachBatch sinks in Spark Declarative Pipelines process a stream as micro-batches with custom Python logic. **Public Preview** — this API may change.
**When to use:** Use ForEachBatch when built-in sink formats (`delta`, `kafka`) are insufficient:
- Custom merge/upsert logic into a Delta table
- Writing to multiple destinations per batch
- Writing to unsupported streaming sinks (e.g., JDBC targets)
- Custom per-batch transformations
**API Reference:**
**@dp.foreach_batch_sink()**
Decorator that defines a ForEachBatch sink. The decorated function is called for each micro-batch.
```python
@dp.foreach_batch_sink(name="<name>")
def my_sink(df, batch_id):
# df: Spark DataFrame with micro-batch data
# batch_id: integer ID for the micro-batch (0 = start of stream or full refresh)
# Access SparkSession via df.sparkSession
pass
```
Parameters:
- `name` (str): Optional. Unique name for the sink within the pipeline. Defaults to function name.
The decorated function receives:
- `df` (DataFrame): Spark DataFrame containing data for the current micro-batch
- `batch_id` (int): Integer ID of the micro-batch. Spark increments this for each trigger interval. `0` means start of stream or beginning of a full refresh — the handler should properly handle a full refresh for downstream data sources.
The handler does not need to return a value.
**Writing to a ForEachBatch Sink:**
Use `@dp.append_flow()` with the `target` parameter matching the sink name:
```python
@dp.append_flow(target="my_sink")
def my_flow():
return spark.readStream.table("source_table")
```
**Common Patterns:**
**Pattern 1: Merge/upsert into a Delta table**
The target table must already exist before the MERGE runs. Create it externally or handle creation in the handler.
```python
@dp.foreach_batch_sink(name="upsert_sink")
def upsert_sink(df, batch_id):
df.createOrReplaceTempView("batch_data")
df.sparkSession.sql("""
MERGE INTO target_catalog.schema.target_table AS target
USING batch_data AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
return
@dp.append_flow(target="upsert_sink")
def upsert_flow():
return spark.readStream.table("source_events")
```
**Pattern 2: Write to multiple destinations with idempotent writes**
Use `txnVersion`/`txnAppId` for idempotent Delta writes — if a batch partially fails and retries, already-completed writes are safely skipped.
```python
app_id = "my-app-name" # must be unique per application writing to the same table
@dp.foreach_batch_sink(name="multi_target_sink")
def multi_target_sink(df, batch_id):
df.write.format("delta").mode("append") \
.option("txnVersion", batch_id).option("txnAppId", app_id) \
.saveAsTable("my_catalog.my_schema.table_a")
df.write.format("json").mode("append") \
.option("txnVersion", batch_id).option("txnAppId", app_id) \
.save("/tmp/json_target")
return
@dp.append_flow(target="multi_target_sink")
def multi_target_flow():
return spark.readStream.table("processed_events")
```
When writing to multiple destinations, use `df.persist()` or `df.cache()` inside the handler to read the source data only once instead of once per destination.
**Pattern 3: Enrich and write to an external Delta table**
```python
from pyspark.sql.functions import current_timestamp
@dp.foreach_batch_sink(name="enriched_sink")
def enriched_sink(df, batch_id):
enriched = df.withColumn("processed_timestamp", current_timestamp())
enriched.write.format("delta").mode("append") \
.saveAsTable("my_catalog.my_schema.enriched_events")
return
@dp.append_flow(target="enriched_sink")
def enriched_flow():
return spark.readStream.table("source_events")
```
**KEY RULES:**
- ForEachBatch sinks are **Python only** and in **Public Preview**
- Designed for streaming queries (`append_flow`) only — not for batch-only pipelines or Auto CDC semantics
- The pipeline does NOT track data written from a ForEachBatch sink — you manage downstream data and retention
- On full refresh, checkpoints reset and `batch_id` restarts from 0. Data in your target is NOT automatically cleaned up — you must manually drop or truncate target tables/locations if a clean slate is needed
- Multiple `@dp.append_flow()` decorators can target the same sink — each flow maintains its own checkpoint
- To access SparkSession inside the handler, use `df.sparkSession` (not `spark`)
- ForEachBatch supports all Unity Catalog features — you can write to UC managed or external tables and volumes
- When writing to multiple destinations, use `df.persist()` or `df.cache()` to avoid multiple source reads, and `txnVersion`/`txnAppId` for idempotent Delta writes
- Keep the handler function concise — avoid threading, heavy library dependencies, or large in-memory data manipulations
- **databricks-connect compatibility**: If your pipeline may run on databricks-connect, the handler function must be serializable and must not use `dbutils`. Avoid referencing local objects, classes, or unpickleable resources — use pure Python modules. Move `dbutils` calls (e.g., `dbutils.widgets.get()`) outside the handler and capture values in variables. The pipeline raises a warning in the event log for non-serializable UDFs but does not fail the pipeline. However, non-serializable logic can break at runtime in databricks-connect contexts
# ForEachBatch Sinks in Spark Declarative Pipelines > **Public Preview** — This API may change. ForEachBatch sinks process a stream as a series of micro-batches, each handled by a custom Python function. Use when built-in sink formats (Delta, Kafka) are insufficient. ## When to Use - Custom merge/upsert into a Delta table - Writing to multiple destinations per batch - Unsupported streaming sinks (e.g., JDBC targets) - Custom per-batch transformations ## Language Support - **Python only** — SQL does not support ForEachBatch sinks. ## Implementation Guide - **Python**: [foreach-batch-sink-python.md](foreach-batch-sink-python.md)
Materialized Views in Spark Declarative Pipelines enable batch processing of data with full refresh or incremental computation.
**NOTE:** This guide focuses on materialized views. For details on streaming tables (incremental processing with `spark.readStream`), use the API guide for `streamingTable` instead.
**API Reference:**
**@dp.materialized_view() (Recommended)**
Decorator to define a materialized view. This is the recommended approach for creating materialized views.
```python
@dp.materialized_view(
name="<name>",
comment="<comment>",
spark_conf={"<key>": "<value>"},
table_properties={"<key>": "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>"],
cluster_by_auto=True,
cluster_by=["<clustering-column>"],
schema="schema-definition",
row_filter="row-filter-clause",
private=False
)
def my_materialized_view():
return spark.read.table("source.data")
```
**@dp.table() / @dlt.table() (Alternative for Materialized Views)**
In the older `dlt` module, the `@dlt.table` decorator was used to create both streaming tables and materialized views. The `@dp.table()` decorator in the `pyspark.pipelines` module still works in this way, but Databricks recommends using the `@dp.materialized_view()` decorator to create materialized views. Note that `@dp.table()` remains the standard decorator for streaming tables.
```python
# Still works, but @dp.materialized_view() is preferred for materialized views
@dp.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>": "<value>"},
table_properties={"<key>": "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>"],
cluster_by_auto=True,
cluster_by=["<clustering-column>"],
schema="schema-definition",
row_filter="row-filter-clause",
private=False
)
def my_materialized_view():
return spark.read.table("source.data")
```
Parameters:
- `name` (str): Table name (defaults to function name)
- `comment` (str): Description for the table
- `spark_conf` (dict): Spark configurations for query execution
- `table_properties` (dict): Delta table properties
- `path` (str): Storage location for table data (defaults to managed location)
- `partition_cols` (list): Columns to partition the table by
- `cluster_by_auto` (bool): Enable automatic liquid clustering
- `cluster_by` (list): Columns to use as clustering keys for liquid clustering
- `schema` (str or StructType): Schema definition (SQL DDL string or StructType)
- Supports generated columns: `"order_datetime STRING, order_day STRING GENERATED ALWAYS AS (dayofweek(order_datetime))"`
- Supports constraints: Primary keys, foreign keys
- Supports column masks: `"ssn STRING MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)"`
- `row_filter` (str): (Public Preview) A row filter clause that filters rows when fetched from the table.
- Must use syntax: `"ROW FILTER func_name ON (column_name [, ...])"` where `func_name` is a SQL UDF returning `BOOLEAN`. The UDF can be defined in Unity Catalog.
- Rows are filtered out when the function returns `FALSE` or `NULL`.
- You can pass table columns or constant literals (`STRING`, numeric, `BOOLEAN`, `INTERVAL`, `NULL`) as arguments.
- The filter is applied as soon as rows are fetched from the data source.
- The function runs with pipeline owner's rights during refresh and invoker's rights during queries (allowing user-context functions like `CURRENT_USER()` and `IS_MEMBER()` for data security).
- Note: Using row filters on source tables forces full refresh of downstream materialized views.
- Note: It is NOT possible to call `CREATE FUNCTION` within a Spark Declarative Pipeline.
- `private` (bool): Restricts table to pipeline scope; prevents metastore publication
**Materialized View vs Streaming Table:**
- **Materialized View**: Use `@dp.materialized_view()` decorator with function returning `spark.read...` (batch DataFrame)
- **Streaming Table**: Use `@dp.table()` decorator with function returning `spark.readStream...` (streaming DataFrame) - see the `streamingTable` API guide
Note: When using `@dp.table()` with a batch DataFrame return type, a materialized view is created. However, `@dp.materialized_view()` is preferred for this use case. The `@dp.table()` decorator remains the standard approach for streaming tables (with streaming DataFrame return type).
**Incremental Refresh for Materialized Views:**
Materialized views on **serverless pipelines** support automatic incremental refresh, which processes only changes in underlying data since the last refresh rather than recomputing everything. This significantly reduces compute costs.
**How it works:**
- Lakeflow Spark Declarative Pipelines uses a cost model to determine whether to perform incremental refresh or full recompute
- Incremental refresh processes delta changes and appends to the table
- If incremental refresh is not feasible or more expensive, the system falls back to full recompute automatically
**Requirements for incremental refresh:**
- Must run on **serverless pipelines** (not classic compute)
- Source tables must be Delta tables, materialized views, or streaming tables
- Row-tracking must be enabled on source tables for certain operations (see Notes column)
**Supported SQL operations for incremental refresh (use PySpark DataFrame API equivalents in Python):**
| SQL Operation | Support | Notes |
| --------------------------- | ------- | ------------------------------------------------------------------------------------------------------- |
| SELECT expressions | Yes | Deterministic built-in functions and immutable UDFs. Requires row tracking |
| GROUP BY | Yes | — |
| WITH | Yes | Common table expressions |
| UNION ALL | Yes | Requires row tracking |
| FROM | Yes | Supported base tables include Delta tables, materialized views, and streaming tables |
| WHERE, HAVING | Yes | Requires row tracking |
| INNER JOIN | Yes | Requires row tracking |
| LEFT OUTER JOIN | Yes | Requires row tracking |
| FULL OUTER JOIN | Yes | Requires row tracking |
| RIGHT OUTER JOIN | Yes | Requires row tracking |
| OVER (Window functions) | Yes | Must specify PARTITION BY columns |
| QUALIFY | Yes | — |
| EXPECTATIONS | Partial | Generally supported; exceptions for views with expectations and DROP expectations with NOT NULL columns |
| Non-deterministic functions | Limited | Time functions like `current_date()` supported in WHERE clauses only |
| Non-Delta sources | No | Volumes, external locations, foreign catalogs unsupported |
**Limitations:**
- Falls back to full recompute when incremental is more expensive or query uses unsupported expressions
**Best practices:**
- Enable deletion vectors, row tracking, and change data feed on source tables for optimal incremental refresh
- Design queries with supported operations to leverage incremental refresh
- For exactly-once processing semantics (Kafka, Auto Loader), use streaming tables instead
**Common Patterns:**
**Pattern 1: Simple batch transformation**
```python
@dp.materialized_view()
def bronze_batch():
return spark.read.format("parquet").load("/path/to/data")
@dp.materialized_view()
def silver_batch():
return spark.read.table("bronze_batch").filter("id IS NOT NULL")
```
**Pattern 2: Schema with generated columns**
```python
@dp.materialized_view(
schema="""
order_datetime STRING,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
customer_id BIGINT,
amount DECIMAL(10,2)
""",
cluster_by=["order_day_of_week", "customer_id"]
)
def orders_with_day():
return spark.read.table("raw.orders")
```
**Pattern 3: Row filters for data security**
```python
# Assumes filter_by_dept is a SQL UDF defined in Unity Catalog that returns BOOLEAN
@dp.materialized_view(
name="employees",
schema="emp_id INT, emp_name STRING, dept STRING, salary DECIMAL(10,2)",
row_filter="ROW FILTER my_catalog.my_schema.filter_by_dept ON (dept)"
)
def employees():
return spark.read.table("source.employees")
```
**Pattern 4: Column masking for sensitive data**
```python
@dp.materialized_view(
schema="""
user_id BIGINT,
ssn STRING MASK catalog.schema.ssn_mask_fn USING COLUMNS (region),
region STRING
"""
)
def users_with_masked_ssn():
return spark.read.table("raw.users")
```
**KEY RULES:**
- Use `@dp.materialized_view()` for materialized views (preferred over `@dp.table()`)
- Materialized views use `spark.read` (batch reads)
- Streaming tables use `spark.readStream` (streaming reads) - see the `streamingTable` API guide
- Never use `.write`, `.save()`, `.saveAsTable()`, or `.toTable()` - Databricks manages writes automatically
- Generated columns, constraints, and masks require schema definition
- Row filters force full refresh of downstream materialized views
Materialized Views in Lakeflow Spark Declarative Pipelines enable batch processing of data with full refresh or incremental computation.
**NOTE:** This guide focuses on materialized views. For details on streaming tables (incremental processing with streaming reads), use the API guide for `streamingTable` instead.
**SQL Syntax:**
**CREATE MATERIALIZED VIEW**
Creates a materialized view for batch data processing. For streaming tables, see the `CREATE STREAMING TABLE` guide.
```sql
CREATE OR REFRESH [PRIVATE] MATERIALIZED VIEW
view_name
[ column_list ]
[ view_clauses ]
AS query
column_list
( { column_name column_type column_properties } [, ...]
[ column_constraint ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]
view_clauses
{ USING DELTA |
PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
LOCATION path |
COMMENT view_comment |
TBLPROPERTIES clause |
WITH { ROW FILTER clause } } [...]
```
**Parameters:**
- `PRIVATE`: Restricts table to pipeline scope; prevents metastore publication
- `view_name`: Unique identifier for the view (fully qualified name including catalog and schema must be unique unless marked PRIVATE)
- `column_list`: Optional schema definition with column names, types, and properties
- `column_name`: Name of the column
- `column_type`: Data type (STRING, BIGINT, DECIMAL, etc.)
- `column_properties`: Column attributes:
- `NOT NULL`: Column cannot contain null values
- `COMMENT column_comment`: Description for the column
- `column_constraint`: Data quality constraints, consult the `expectations` API guide for details.
- `MASK clause`: Column masking syntax `MASK catalog.schema.mask_fn USING COLUMNS (other_column)` (Public Preview)
- `table_constraint`: Informational table-level constraints (Unity Catalog only, **not enforced** by Databricks):
- Look up exact documentation when using
- Note: Constraints are informational metadata for documentation and query optimization hints; data validation must be performed independently
- `view_clauses`: Optional clauses for view configuration:
- `USING DELTA`: Optional format specification (only DELTA supported, can be omitted)
- `PARTITIONED BY (col [, ...])`: Columns for traditional partitioning, mutually exclusive with CLUSTER BY
- `CLUSTER BY clause`: Columns for liquid clustering (optimized query performance)
- `LOCATION path`: Storage path (Hive metastore only)
- `COMMENT view_comment`: Description for the view
- `TBLPROPERTIES clause`: Custom table properties `(key = value [, ...])`
- `WITH ROW FILTER clause`: Row-level security filtering
- Syntax: `ROW FILTER func_name ON (column_name [, ...])` (Public Preview)
- `func_name` must be a SQL UDF returning BOOLEAN (can be defined in Unity Catalog)
- Rows are filtered out when function returns FALSE or NULL
- Accepts table columns or constant literals (STRING, numeric, BOOLEAN, INTERVAL, NULL)
- Filter applies when rows are fetched from the data source
- Runs with pipeline owner's rights during refresh and invoker's rights during queries
- Note: Using row filters on source tables forces full refresh of downstream materialized views
- Note: It is NOT possible to call `CREATE FUNCTION` within a Spark Declarative Pipeline.
- `query`: A Spark SQL query that defines the dataset for the table
**Incremental Refresh for Materialized Views:**
Materialized views on **serverless pipelines** support automatic incremental refresh, which processes only changes in underlying data since the last refresh rather than recomputing everything. This significantly reduces compute costs.
**How it works:**
- Lakeflow Spark Declarative Pipelines uses a cost model to determine whether to perform incremental refresh or full recompute
- Incremental refresh processes delta changes and appends to the table
- If incremental refresh is not feasible or more expensive, the system falls back to full recompute automatically
**Requirements for incremental refresh:**
- Must run on **serverless pipelines** (not classic compute)
- Source tables must be Delta tables, materialized views, or streaming tables
- Row-tracking must be enabled on source tables for certain operations (see Notes column)
**Supported SQL operations for incremental refresh:**
| SQL Operation | Support | Notes |
| --------------------------- | ------- | ------------------------------------------------------------------------------------------------------- |
| SELECT expressions | Yes | Deterministic built-in functions and immutable UDFs. Requires row tracking |
| GROUP BY | Yes | — |
| WITH | Yes | Common table expressions |
| UNION ALL | Yes | Requires row tracking |
| FROM | Yes | Supported base tables include Delta tables, materialized views, and streaming tables |
| WHERE, HAVING | Yes | Requires row tracking |
| INNER JOIN | Yes | Requires row tracking |
| LEFT OUTER JOIN | Yes | Requires row tracking |
| FULL OUTER JOIN | Yes | Requires row tracking |
| RIGHT OUTER JOIN | Yes | Requires row tracking |
| OVER (Window functions) | Yes | Must specify PARTITION BY columns |
| QUALIFY | Yes | — |
| EXPECTATIONS | Partial | Generally supported; exceptions for views with expectations and DROP expectations with NOT NULL columns |
| Non-deterministic functions | Limited | Time functions like `current_date()` supported in WHERE clauses only |
| Non-Delta sources | No | Volumes, external locations, foreign catalogs unsupported |
**Best practices:**
- Enable deletion vectors, row tracking, and change data feed on source tables for optimal incremental refresh
- Design queries with supported operations to leverage incremental refresh
- For exactly-once processing semantics (Kafka, Auto Loader), use streaming tables instead
**Common Patterns:**
**Pattern 1: Simple batch transformation**
```sql
CREATE MATERIALIZED VIEW bronze_batch
AS SELECT * FROM delta.`/path/to/data`;
CREATE MATERIALIZED VIEW silver_batch
AS SELECT * FROM bronze_batch WHERE id IS NOT NULL;
```
**Pattern 2: Schema with generated columns**
```sql
CREATE MATERIALIZED VIEW orders_with_day (
order_datetime STRING,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
customer_id BIGINT,
amount DECIMAL(10,2)
)
CLUSTER BY (order_day_of_week, customer_id)
AS SELECT order_datetime, customer_id, amount FROM raw.orders;
```
**Pattern 3: Row filters for data security**
```sql
-- Assumes filter_by_dept is a SQL UDF defined in Unity Catalog that returns BOOLEAN
CREATE MATERIALIZED VIEW employees (
emp_id INT,
emp_name STRING,
dept STRING,
salary DECIMAL(10,2)
)
WITH ROW FILTER my_catalog.my_schema.filter_by_dept ON (dept)
AS SELECT * FROM source.employees;
```
**Pattern 4: Column masking for sensitive data**
```sql
CREATE MATERIALIZED VIEW users_with_masked_ssn (
user_id BIGINT,
ssn STRING MASK catalog.schema.ssn_mask_fn USING COLUMNS (region),
region STRING
)
AS SELECT user_id, ssn, region FROM raw.users;
```
**Pattern 5: Aggregations with liquid clustering**
```sql
CREATE MATERIALIZED VIEW daily_sales_summary
CLUSTER BY (sale_date, region)
AS
SELECT
DATE(order_timestamp) AS sale_date,
region,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue
FROM raw.orders
GROUP BY DATE(order_timestamp), region;
```
**KEY RULES:**
- Materialized views perform batch processing of data
- Streaming tables perform incremental streaming processing - see the `streamingTable` guide
- Identity columns, and default columns are not supported
- Row filters force full refresh of downstream materialized views
- Sum aggregates over nullable columns return zero instead of NULL when only nulls remain (when last non-NULL value is removed)
- Non-column expressions require explicit aliases (column references do not need aliases)
- PRIMARY KEY requires explicit NOT NULL specification to be valid
- OPTIMIZE and VACUUM commands unavailable, Lakeflow Declarative Pipelines handles maintenance automatically
- `CLUSTER BY` is recommended over `PARTITIONED BY` for most use cases
- Table renaming and ownership changes prohibited
# Materialized Views in Spark Declarative Pipelines Materialized views store the results of a query physically, enabling faster query performance for expensive transformations and aggregations. ## Key Concepts Materialized views in Spark Declarative Pipelines: - Physically store query results - Are incrementally refreshed when source data changes - Support complex transformations and aggregations - Published to Unity Catalog ## Language-Specific Implementations For detailed implementation guides: - **Python**: [materialized-view-python.md](materialized-view-python.md) - **SQL**: [materialized-view-sql.md](materialized-view-sql.md)
AVRO-Specific Options | Option | Type | | ------------------- | ------- | | avroSchema | String | | datetimeRebaseMode | String | | mergeSchema | Boolean | | readerCaseSensitive | Boolean | | rescuedDataColumn | String |
CSV-Specific Options | Option | Type | | ------------------------- | ------- | | badRecordsPath | String | | charToEscapeQuoteEscaping | Char | | columnNameOfCorruptRecord | String | | comment | Char | | dateFormat | String | | emptyValue | String | | encoding / charset | String | | enforceSchema | Boolean | | escape | Char | | header | Boolean | | ignoreLeadingWhiteSpace | Boolean | | ignoreTrailingWhiteSpace | Boolean | | inferSchema | Boolean | | lineSep | String | | locale | String | | maxCharsPerColumn | Int | | maxColumns | Int | | mergeSchema | Boolean | | mode | String | | multiLine | Boolean | | nanValue | String | | negativeInf | String | | nullValue | String | | parserCaseSensitive | Boolean | | positiveInf | String | | preferDate | Boolean | | quote | Char | | readerCaseSensitive | Boolean | | rescuedDataColumn | String | | sep / delimiter | String | | skipRows | Int | | timestampFormat | String | | timeZone | String | | unescapedQuoteHandling | String |
JSON-Specific Options | Option | Type | | ---------------------------------- | ------- | | allowBackslashEscapingAnyCharacter | Boolean | | allowComments | Boolean | | allowNonNumericNumbers | Boolean | | allowNumericLeadingZeros | Boolean | | allowSingleQuotes | Boolean | | allowUnquotedControlChars | Boolean | | allowUnquotedFieldNames | Boolean | | badRecordsPath | String | | columnNameOfCorruptRecord | String | | dateFormat | String | | dropFieldIfAllNull | Boolean | | encoding / charset | String | | inferTimestamp | Boolean | | lineSep | String | | locale | String | | mode | String | | multiLine | Boolean | | prefersDecimal | Boolean | | primitivesAsString | Boolean | | readerCaseSensitive | Boolean | | rescuedDataColumn | String | | singleVariantColumn | String | | timestampFormat | String | | timeZone | String |
ORC-Specific Options | Option | Type | | ----------- | ------- | | mergeSchema | Boolean |
PARQUET-Specific Options | Option | Type | | ------------------- | ------- | | datetimeRebaseMode | String | | int96RebaseMode | String | | mergeSchema | Boolean | | readerCaseSensitive | Boolean | | rescuedDataColumn | String |
TEXT-Specific Options | Option | Type | | --------- | ------- | | encoding | String | | lineSep | String | | wholeText | Boolean |
XML-Specific Options | Option | Type | | ------------------------- | ------- | | rowTag | String | | samplingRatio | Double | | excludeAttribute | Boolean | | mode | String | | inferSchema | Boolean | | columnNameOfCorruptRecord | String | | attributePrefix | String | | valueTag | String | | encoding | String | | ignoreSurroundingSpaces | Boolean | | rowValidationXSDPath | String | | ignoreNamespace | Boolean | | timestampFormat | String | | timestampNTZFormat | String | | dateFormat | String | | locale | String | | rootTag | String | | declaration | String | | arrayElementName | String | | nullValue | String | | compression | String | | validateName | Boolean | | readerCaseSensitive | Boolean | | rescuedDataColumn | String | | singleVariantColumn | String |
#### Setup
- `from pyspark import pipelines as dp` (preferred) or `import dlt` (deprecated but still works) is always required on top when doing Python. Prefer `dp` import style unless `dlt` was already imported, don't change existing imports unless explicitly asked.
- The SparkSession object is already available (no need to import it again) - unless in a utility file
#### Core Decorators
- `@dp.materialized_view()` - Materialized views (batch processing, recommended for materialized views)
- `@dp.table()` - Streaming tables (when returning streaming DataFrame) or materialized views (legacy, when returning batch DataFrame)
- `@dp.temporary_view()` - Temporary views (non-materialized, private to pipeline)
- `@dp.expect*()` - Data quality constraints (expect, expect_or_drop, expect_or_fail, expect_all, expect_all_or_drop, expect_all_or_fail)
#### Core Functions
- `dp.create_streaming_table()` - Continuous processing
- `dp.create_auto_cdc_flow()` - Change data capture
- `dp.create_auto_cdc_from_snapshot_flow()` - Change data capture from database snapshots
- `dp.create_sink()` - Write to alternative targets (Kafka, Event Hubs, external Delta tables)
- `@dp.foreach_batch_sink()` - Custom streaming sink with per-batch Python logic (Public Preview)
- `dp.append_flow()` - Append-only patterns
- `dp.read()`/`dp.read_stream()` - Read from other pipeline datasets (deprecated - always use `spark.read.table()` or `spark.readStream.table()` instead)
#### Critical Rules
- ✅ Dataset functions MUST return Spark DataFrames
- ✅ Use `spark.read.table`/`spark.readStream.table` (NOT dp.read* and NOT dlt.read*)
- ✅ Use `auto_cdc` API (NOT apply_changes)
- ✅ Look up documentation for decorator/function parameters when unsure
- ❌ Do not use star imports
- ❌ NEVER use .collect(), .count(), .toPandas(), .save(), .saveAsTable(), .start(), .toTable()
- ❌ AVOID custom monitoring in dataset definitions
- ❌ Keep functions pure (evaluated multiple times)
- ❌ NEVER use the "LIVE." prefix when reading other datasets (deprecated)
- ❌ No arbitrary Python logic in dataset definitions - focus on DataFrame operations only
#### Python-Specific Considerations
**Reading Pipeline Datasets:**
When reading from other datasets defined in the pipeline, use the dataset's **dataset name directly** - NEVER use the `LIVE.` prefix:
```python
# ✅ CORRECT - use the function name directly
customers = spark.read.table("bronze_customers")
transactions = spark.readStream.table("bronze_transactions")
# ❌ WRONG - do NOT use "LIVE." prefix (deprecated)
customers = spark.read.table("LIVE.bronze_customers")
transactions = spark.readStream.table("LIVE.bronze_transactions")
```
The `LIVE.` prefix is deprecated and should never be used. The pipeline automatically resolves dataset references by dataset name.
**Streaming vs. Batch Semantics:**
- Use `spark.read.table()` (or deprecated `dp.read()`/`dlt.read()`) for batch processing (materialized views with full refresh or incremental computation)
- Use `spark.readStream.table()` (or deprecated `dp.read_stream()`/`dlt.read_stream()`) for streaming tables to enable continuous incremental processing
- **Materialized views**: Use `@dp.materialized_view()` decorator (recommended) with batch DataFrame (`spark.read`)
- **Streaming tables**: Use `@dp.table()` decorator with streaming DataFrame (`spark.readStream`)
- Note: The `@dp.table()` decorator can create both batch and streaming tables based on return type, but `@dp.materialized_view()` is preferred for materialized views
#### skipChangeCommits
When a downstream streaming table reads from an upstream streaming table that has updates or deletes (e.g., GDPR compliance, Auto CDC targets), use `skipChangeCommits` to ignore those change commits:
```python
@dp.table()
def downstream():
return spark.readStream.option("skipChangeCommits", "true").table("upstream_table")
```
Sinks enable writing pipeline data to alternative targets like event streaming services (Apache Kafka, Azure Event Hubs), external Delta tables, or custom data sources using Python code. Sinks are Python-only and work exclusively with streaming append flows.
## Creating Sinks
**dp.create_sink() / dlt.create_sink()**
Defines a sink for writing to alternative targets (Kafka, Event Hubs, external Delta tables). Call at top level before using in append flows.
```python
dp.create_sink(
name="<sink-name>",
format="<format>",
options={"<key>": "<value>"}
)
```
Parameters:
- `name` (str): Unique identifier for the sink within the pipeline. Used to reference the sink in append flows. **Required.**
- `format` (str): Output format (`"kafka"`, `"delta"`, or custom format). Determines required options. **Required.**
- `options` (dict): Configuration dictionary with format-specific key-value pairs. Required options depend on the format. **Required.**
## Writing to Sinks
After creating a sink, use `@dp.append_flow()` (or `@dlt.append_flow()`) decorator to write streaming data to it. The `target` parameter specifies which sink to write to (must match a sink name created with `dp.create_sink()`).
For complete documentation on append flows, see [streaming-table-python.md](../streaming-table/streaming-table-python.md).
## Supported Sink Formats
### Delta Sinks
Write to Unity Catalog external/managed tables or file paths.
**Options for Unity Catalog tables:**
```python
{
"tableName": "catalog_name.schema_name.table_name" # Fully qualified table name
}
```
**Options for file paths:**
```python
{
"path": "/Volumes/catalog_name/schema_name/path/to/data"
}
```
**Example:**
```python
# Create Delta sink with table name
dp.create_sink(
name="delta_sink",
format="delta",
options={"tableName": "main.sales.transactions"}
)
# Write to sink using append flow
@dp.append_flow(name="write_to_delta", target="delta_sink")
def write_transactions():
return spark.readStream.table("bronze_transactions") \
.select("transaction_id", "customer_id", "amount", "timestamp")
```
### Kafka and Azure Event Hubs Sinks
Write to Apache Kafka or Azure Event Hubs topics for real-time event streaming.
**Important**: This code works for both Apache Kafka and Azure Event Hubs sinks.
**Required options:**
```python
{
"kafka.bootstrap.servers": "host:port", # Kafka/Event Hubs endpoint
"topic": "topic_name", # Target topic
"databricks.serviceCredential": "credential_name" # Unity Catalog service credential
}
```
**Authentication**: Use `databricks.serviceCredential` to reference a Unity Catalog service credential for connecting to external cloud services.
**Data format requirements**:
- The `value` parameter is mandatory for Kafka and Azure Event Hubs sinks
- Optional parameters: `key`, `partition`, `headers`, and `topic`
**Example (works for both Kafka and Event Hubs):**
```python
# Define credentials and connection details
credential_name = "<service-credential>"
bootstrap_servers = "kafka-broker:9092" # or "{eh-namespace}.servicebus.windows.net:9093" for Event Hubs
topic_name = "customer_events"
# Create Kafka/Event Hubs sink
dp.create_sink(
name="kafka_sink",
format="kafka",
options={
"databricks.serviceCredential": credential_name,
"kafka.bootstrap.servers": bootstrap_servers,
"topic": topic_name
}
)
# Write to sink with required value parameter
@dp.append_flow(name="stream_to_kafka", target="kafka_sink")
def kafka_flow():
return spark.readStream.table("customer_events") \
.selectExpr(
"cast(customer_id as string) as key",
"to_json(struct(*)) AS value"
)
```
## Limitations and Considerations
- Sinks only work with streaming queries and cannot be used with batch DataFrames
- Only compatible with `@dp.append_flow()` decorator
- Full refresh updates don't clean existing sink data
- Reprocessed data will be appended to the sink
- Consider idempotency: Design for duplicate writes since full refresh appends data
- Delta sink table names must be fully qualified (catalog.schema.table), use three-part names for Unity Catalog tables
- Volume file paths are supported as an alternative
- Pipeline expectations cannot be applied to sinks
- Apply data quality checks before writing to sinks
- Validate data in upstream tables/views instead
- Sinks are Python-only in Spark Declarative Pipelines, SQL does not support sink creation or usage
- Handle serialization: For Kafka/Event Hubs, convert data to JSON or appropriate format
# Sinks in Spark Declarative Pipelines Sinks enable writing pipeline data to alternative targets beyond Databricks-managed Delta tables, including event streaming services and external tables. ## Key Concepts Sinks in Spark Declarative Pipelines: - Write to event streaming services (Apache Kafka, Azure Event Hubs) - Write to externally-managed Delta tables (Unity Catalog external/managed tables) - Enable reverse ETL into systems outside Databricks - Support custom Python data sources - Work exclusively with streaming queries and append flows ## Language-Specific Implementations For detailed implementation guides: - **Python**: [sink-python.md](sink-python.md) **Important**: Sinks are only available in Python. SQL does not support sinks in Spark Declarative Pipelines.
#### Core SQL Statements
- `CREATE MATERIALIZED VIEW` - Batch processing with full refresh or incremental computation
- `CREATE STREAMING TABLE` - Continuous incremental processing
- `CREATE TEMPORARY VIEW` - Non-materialized views (pipeline lifetime only)
- `CREATE VIEW` - Non-materialized catalog views (Unity Catalog only)
- `AUTO CDC INTO` - Change data capture flows
- `CREATE FLOW` - Define flows or backfills for streaming tables
#### Message Bus Ingestion Functions
- `read_kafka(bootstrapServers => '...', subscribe => '...')` - Apache Kafka
- `read_kinesis(streamName => '...', region => '...')` - AWS Kinesis
- `read_pubsub(subscriptionId => '...', topicId => '...')` - Google Cloud Pub/Sub
- `read_pulsar(serviceUrl => '...', topics => '...')` - Apache Pulsar
- Event Hubs: Use `read_kafka()` with Kafka-compatible Event Hubs config
#### Critical Rules
- ✅ Prefer `CREATE OR REFRESH` syntax for defining datasets (bare `CREATE` also works, but `OR REFRESH` is the idiomatic convention)
- ✅ Use `STREAM` keyword when reading sources for streaming tables
- ✅ Use `read_files()` function for Auto Loader (cloud storage ingestion)
- ✅ Look up documentation for statement parameters when unsure
- ❌ NEVER use `LIVE.` prefix when reading other datasets (deprecated)
- ❌ NEVER use `CREATE LIVE TABLE` or `CREATE LIVE VIEW` (deprecated - use `CREATE STREAMING TABLE`, `CREATE MATERIALIZED VIEW`, or `CREATE TEMPORARY VIEW` instead)
- ❌ Do not use `PIVOT` clause (unsupported)
#### SQL-Specific Considerations
**Streaming vs. Batch Semantics:**
- Omit `STREAM` keyword for materialized views (batch processing)
- Use `STREAM` keyword for streaming tables to enable streaming semantics
**GROUP BY Best Practices:**
- Prefer `GROUP BY ALL` over explicitly listing individual columns unless the user specifically requests explicit grouping
- Benefits: more maintainable when adding/removing columns, less verbose, reduces risk of missing columns in the GROUP BY clause
- Example: `SELECT category, region, SUM(sales) FROM table GROUP BY ALL` instead of `GROUP BY category, region`
**Python UDFs:**
- You can use Python user-defined functions (UDFs) in SQL queries
- UDFs must be defined in Python files before calling them in SQL source files
**Configuration:**
- Use `SET` statements and `${}` string interpolation for dynamic values and Spark configurations
#### skipChangeCommits
When a downstream streaming table reads from an upstream streaming table that has updates or deletes, use `skipChangeCommits` to ignore change commits:
```sql
CREATE OR REFRESH STREAMING TABLE downstream
AS SELECT * FROM STREAM read_stream("upstream_table", skipChangeCommits => true)
```
Streaming Tables in Spark Declarative Pipelines enable incremental processing of continuously arriving data.
**NOTE:** This guide focuses on streaming tables. For details on materialized views (batch processing with `spark.read`), use the API guide for `materializedView` instead.
**API Reference:**
**@dp.table() / @dlt.table()**
Decorator to define a streaming table or materialized view. Returns streaming table when function returns `spark.readStream`. For materialized views using `spark.read`, see the `materializedView` API guide.
```python
@dp.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>": "<value>"},
table_properties={"<key>": "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>"],
cluster_by_auto=True,
cluster_by=["<clustering-column>"],
schema="schema-definition",
row_filter="row-filter-clause",
private=False
)
def my_append_flow():
return spark.readStream.table("source.data")
```
Parameters:
- `name` (str): Table name (defaults to function name)
- `comment` (str): Description for the table
- `spark_conf` (dict): Spark configurations for query execution
- `table_properties` (dict): Delta table properties
- `path` (str): Storage location for table data (defaults to managed location)
- `partition_cols` (list): Columns to partition the table by
- `cluster_by_auto` (bool): Enable automatic liquid clustering
- `cluster_by` (list): Columns to use as clustering keys for liquid clustering
- `schema` (str or StructType): Schema definition (SQL DDL string or StructType)
- Supports generated columns: `"order_datetime STRING, order_day STRING GENERATED ALWAYS AS (dayofweek(order_datetime))"`
- Supports constraints: Primary keys, foreign keys
- Supports column masks: `"ssn STRING MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)"`
- `row_filter` (str): (Public Preview) A row filter clause that filters rows when fetched from the table.
- Must use syntax: `"ROW FILTER func_name ON (column_name [, ...])"` where `func_name` is a SQL UDF returning `BOOLEAN`. The UDF can be defined in Unity Catalog.
- Rows are filtered out when the function returns `FALSE` or `NULL`.
- You can pass table columns or constant literals (`STRING`, numeric, `BOOLEAN`, `INTERVAL`, `NULL`) as arguments.
- The filter is applied as soon as rows are fetched from the data source.
- The function runs with pipeline owner's rights during refresh and invoker's rights during queries (allowing user-context functions like `CURRENT_USER()` and `IS_MEMBER()` for data security).
- Note: Using row filters on source tables forces full refresh of downstream materialized views.
- Note: It is NOT possible to call `CREATE FUNCTION` within a Spark Declarative Pipeline.
- `private` (bool): Restricts table to pipeline scope; prevents metastore publication
**dp.create_streaming_table() / dlt.create_streaming_table()**
Creates an empty streaming table as target for CDC flows or append flows. Does NOT return a value - call at top level without assignment.
```python
dp.create_streaming_table(
name="<table-name>",
comment="<comment>",
spark_conf={"<key>": "<value>"},
table_properties={"<key>": "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>"],
cluster_by_auto=True,
cluster_by=["<clustering-column>"],
schema="schema-definition",
expect_all={"<key>": "<value>"},
expect_all_or_drop={"<key>": "<value>"},
expect_all_or_fail={"<key>": "<value>"},
row_filter="row-filter-clause"
)
```
Parameters: Same as @dp.table() except `private`, plus:
- `expect_all` (dict): Data quality expectations (warn on failure, include in target)
- `expect_all_or_drop` (dict): Expectations that drop failing rows from target
- `expect_all_or_fail` (dict): Expectations that fail pipeline on violation
**@dp.append_flow() / @dlt.append_flow()**
Decorator to define a flow that appends data from a source to an existing target table. Multiple append flows can write to the same target table.
```python
@dp.append_flow(
target="<target-table-name>",
name="<flow-name>", # optional, defaults to function name
once=<boolean>, # optional, defaults to False
spark_conf={"<key>": "<value>", "<key>": "<value>"}, # optional
comment="<comment>" # optional
)
def my_append_flow():
# For once=False (streaming): use spark.readStream
return spark.readStream.table("source.data")
# For once=True (batch): use spark.read
return spark.read.table("source.data")
```
Parameters:
- `target` (str): The name of the target streaming table where data will be appended. Target must exist (created with `dp.create_streaming_table()`). **Required.**
- `name` (str): The name of the flow. If not specified, defaults to the function name. Use distinct names when multiple flows target the same table.
- `once` (bool): Controls whether the flow runs continuously or once:
- **False (default)**: Flow continuously processes new data as it arrives in streaming mode. **Must return a streaming DataFrame using `spark.readStream`**, CAN use `cloudFiles` (Auto Loader).
- **True**: Flow processes data only once during pipeline execution and then stops. **Must return a batch DataFrame using `spark.read`**. Do NOT use `cloudFiles` (Auto Loader) with `once=True` - use regular batch reads like `spark.read.format("<format>")` instead.
- `spark_conf` (dict): A dictionary of Spark configuration key-value pairs to apply specifically to this flow's query execution (e.g., `{"spark.sql.shuffle.partitions": "10"}`).
- `comment` (str): A description of the flow that appears in the pipeline metadata and documentation.
**Two Ways to Define Streaming Tables:**
1. **@dp.table decorator (MOST COMMON)**
- Returns a streaming DataFrame using `spark.readStream`
- Automatically inferred as a streaming table when returning a streaming DataFrame
```python
@dp.table(name="events_stream")
def events_stream():
return spark.readStream.table("source_catalog.schema.events")
```
2. **dp.create_streaming_table()**
- Creates an empty streaming table target
- Required as target for Auto CDC flows and append flows
- Does NOT return a value (do not assign to a variable)
```python
dp.create_streaming_table(
name="users",
schema="user_id INT, name STRING, updated_at TIMESTAMP"
)
```
**WHEN TO USE WHICH:**
Use **@dp.table with readStream** when:
- Reading and transforming streaming data
- Creating streaming tables from sources (Auto Loader, Delta tables, etc.)
- This is the standard pattern for most streaming use cases
Use **dp.create_streaming_table()** when:
- Creating a target table for `dp.create_auto_cdc_flow()`
- Creating a target table for `@dp.append_flow` from multiple sources
- Need to explicitly define table schema before data flows in
**Common Patterns:**
**Pattern 1: Simple streaming transformation**
```python
@dp.table()
def bronze():
return spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.load("/path/to/data")
@dp.table()
def silver():
return spark.readStream.table("bronze").filter("id IS NOT NULL")
```
**Pattern 2: Multi-source aggregation**
```python
dp.create_streaming_table(name="all_events")
@dp.append_flow(target="all_events", name="mobile")
def mobile():
return spark.readStream.table("mobile.events")
@dp.append_flow(target="all_events", name="web")
def web():
return spark.readStream.table("web.events")
```
**Pattern 3: One-time backfill with append flow**
```python
dp.create_streaming_table(name="transactions")
# Continuous streaming flow for new data
@dp.append_flow(target="transactions", name="live_stream")
def live_transactions():
return spark.readStream.table("source.transactions")
# One-time backfill flow for historical data (uses spark.read for batch)
@dp.append_flow(
target="transactions",
name="historical_backfill",
once=True,
comment="Backfill historical transactions from archive"
)
def backfill_transactions():
return spark.read.table("archive.historical_transactions")
```
**Pattern 4: Row filters for data security**
```python
# Assumes filter_by_dept is a SQL UDF defined in Unity Catalog that returns BOOLEAN
# Apply row filter to streaming table
@dp.table(
name="employees",
schema="emp_id INT, emp_name STRING, dept STRING, salary DECIMAL(10,2)",
row_filter="ROW FILTER my_catalog.my_schema.filter_by_dept ON (dept)"
)
def employees():
return spark.readStream.table("source.employees")
```
**Pattern 5: Stream-static join (enrich streaming data with dimension table)**
```python
@dp.table()
def enriched_transactions():
transactions = spark.readStream.table("transactions")
customers = spark.read.table("customers")
return transactions.join(customers, transactions.customer_id == customers.id)
```
The dimension table (`customers`) is read as a static snapshot at stream start, while the streaming source (`transactions`) is read incrementally.
**Pattern 6: Reading from upstream ST with updates/deletes (skipChangeCommits)**
```python
@dp.table()
def downstream():
return spark.readStream.option("skipChangeCommits", "true").table("upstream_with_deletes")
```
Use `skipChangeCommits` when reading from a streaming table that has updates/deletes (e.g., GDPR compliance, Auto CDC targets). Without this flag, change commits cause errors.
**KEY RULES:**
- Streaming tables use `spark.readStream` (streaming reads)
- Materialized views use `spark.read` (batch reads) - see the `materializedView` API guide
- Never use `.writeStream`, `.start()`, or checkpoint options - Databricks manages these automatically
- For streaming flows (`once=False`): Use `spark.readStream` to return a streaming DataFrame
- For one-time flows (`once=True`): Use `spark.read` to return a batch DataFrame
- Generated columns, constraints, and masks require schema definition
- Row filters force full refresh of downstream materialized views
- Use `skipChangeCommits` when reading from STs that have updates/deletes
Streaming Tables in SQL Declarative Pipelines enable incremental processing of continuously arriving data.
**NOTE:** This guide focuses on streaming tables in SQL. For details on materialized views (batch processing), use the API guide for `materializedView` instead.
**API Reference:**
**CREATE STREAMING TABLE**
Creates a streaming table that processes data incrementally using `STREAM()` for streaming reads. For materialized views using batch reads (without `STREAM()`), see the `materializedView` API guide.
```sql
CREATE OR REFRESH [PRIVATE] STREAMING TABLE
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ column_constraint ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]
table_clauses
{ USING DELTA
PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
LOCATION path |
COMMENT view_comment |
TBLPROPERTIES clause |
WITH { ROW FILTER clause } } [ ... ]
```
**Parameters:**
- `PRIVATE`: Restricts table to pipeline scope; prevents metastore publication
- `table_name`: Unique identifier for the table (fully qualified name including catalog and schema must be unique unless marked PRIVATE)
- `table_specification`: Optional schema definition with column names, types, and properties
- `column_identifier`: Name of the column
- `column_type`: Data type (STRING, BIGINT, DECIMAL, etc.)
- `column_properties`: Column attributes:
- `NOT NULL`: Column cannot contain null values
- `COMMENT column_comment`: Description for the column
- `column_constraint`: Data quality constraints, consult the `expectations` API guide for details.
- `MASK clause`: Column masking syntax `MASK catalog.schema.mask_fn USING COLUMNS (other_column)` (Public Preview)
- `table_constraint`: Informational table-level constraints (Unity Catalog only, **not enforced** by Databricks):
- Look up exact documentation when using
- Note: Constraints are informational metadata for documentation and query optimization hints; data validation must be performed independently
- `table_clauses`: Optional clauses for table configuration:
- `USING DELTA`: Optional format specification (only DELTA supported, can be omitted)
- `PARTITIONED BY (col [, ...])`: Columns for traditional partitioning, mutually exclusive with CLUSTER BY
- `CLUSTER BY clause`: Columns for liquid clustering (optimized query performance, recommended over partitioning)
- `LOCATION path`: Storage path (defaults to pipeline storage location)
- `COMMENT view_comment`: Description for the table
- `TBLPROPERTIES clause`: Custom table properties `(key = value [, ...])`
- `WITH ROW FILTER clause`: Row-level security filtering
- Syntax: `ROW FILTER func_name ON (column_name [, ...])` (Public Preview)
- `func_name` must be a SQL UDF returning BOOLEAN (can be defined in Unity Catalog)
- Rows are filtered out when function returns FALSE or NULL
- Accepts table columns or constant literals (STRING, numeric, BOOLEAN, INTERVAL, NULL)
- Filter applies when rows are fetched from the data source
- Runs with pipeline owner's rights during refresh and invoker's rights during queries
- Note: Using row filters on source tables forces full refresh of downstream materialized views
- Note: It is NOT possible to call `CREATE FUNCTION` within a Spark Declarative Pipeline.
- `query`: A Spark SQL query that defines the streaming dataset. Must use `STREAM()` function for streaming semantics.
**STREAM() Function:**
Provides streaming read semantics for the source table. Required for streaming queries.
```sql
SELECT * FROM STREAM(source_catalog.schema.source_table);
```
**CREATE FLOW with INSERT INTO**
Creates a flow that appends data from a source to an existing target streaming table. Multiple flows can write to the same target table.
```sql
CREATE FLOW flow_name [COMMENT comment] AS
INSERT INTO [ONCE] target_table BY NAME query
```
**Parameters:**
- `flow_name`: Unique identifier for the flow. Use distinct names when multiple flows target the same table.
- `ONCE`: Controls whether the flow runs continuously or once:
- **Omitted (default)**: Flow continuously processes new data as it arrives in streaming mode. **Query must use `STREAM()` for streaming reads**.
- **ONCE**: Flow processes data only once during pipeline execution and then stops. **Query uses non-streaming reads (without `STREAM()`)** for batch processing. Re-executes during pipeline complete refreshes to recreate data.
- `target_table_name`: The name of the target streaming table where data will be appended. Target must exist (created with `CREATE STREAMING TABLE`). **Required.**
- `SELECT ... FROM STREAM(source_table)`: The query to read source data
- For continuous flows (no ONCE): Use `STREAM()` to return streaming data
- For one-time flows (with ONCE): Omit `STREAM()` to return batch data
**Two Ways to Define Streaming Tables:**
1. **CREATE STREAMING TABLE with AS SELECT (MOST COMMON)**
- Defines schema and query in one statement
- Schema can be inferred from query or explicitly defined
- **This automatically creates a continuous streaming pipeline - no separate flow needed**
```sql
CREATE STREAMING TABLE events_stream
AS SELECT * FROM STREAM(source_catalog.schema.events);
```
2. **CREATE STREAMING TABLE without AS SELECT**
- Creates an empty streaming table target
- Required for multi-source append patterns
- Schema definition is optional
- **Requires separate `CREATE FLOW` statements to populate the table**
```sql
CREATE STREAMING TABLE users (
user_id INT,
name STRING,
updated_at TIMESTAMP
);
```
**CRITICAL: WHEN TO USE WHICH:**
Use **CREATE STREAMING TABLE with AS SELECT** when:
- Reading and transforming streaming data from a single source
- Creating streaming tables from Delta tables, Auto Loader sources, etc.
- This is the standard pattern for most streaming use cases
- **DO NOT add a separate `CREATE FLOW` - the AS SELECT clause already handles continuous processing**
Use **CREATE STREAMING TABLE without AS SELECT + CREATE FLOW** when:
- Creating a target table for multiple `INSERT INTO` flows from different sources
- Need to explicitly define table schema before data flows in
- Using `AUTO CDC INTO` for CDC. See 'autoCdc' API guide for details.
- **In this case, you MUST create separate flows - the table definition alone does not process data**
**NEVER:**
- Create both `CREATE STREAMING TABLE ... AS SELECT` AND `CREATE FLOW` for the same source - this is redundant and incorrect
- The AS SELECT clause already provides continuous streaming; adding a flow duplicates the work
**Common Patterns:**
**Pattern 1: Simple streaming transformation**
```sql
-- Bronze layer: ingest raw data with Auto Loader
CREATE STREAMING TABLE bronze
AS SELECT * FROM STREAM(read_files(
'/path/to/data',
format => 'json'
));
-- Silver layer: filter and clean data
CREATE STREAMING TABLE silver
AS SELECT *
FROM STREAM(bronze)
WHERE id IS NOT NULL;
```
**Pattern 2: Multi-source aggregation with flows**
```sql
-- Create target table for multiple sources. Schema is optional.
CREATE STREAMING TABLE all_events (
event_id STRING,
event_type STRING,
event_timestamp TIMESTAMP,
source STRING
);
-- Flow from mobile source
CREATE FLOW mobile_flow
AS INSERT INTO all_events BY NAME
SELECT event_id, event_type, event_timestamp, 'mobile' as source
FROM STREAM(mobile.events);
-- Flow from web source
CREATE FLOW web_flow
AS INSERT INTO all_events BY NAME
SELECT event_id, event_type, event_timestamp, 'web' as source
FROM STREAM(web.events);
```
**Pattern 3: Row filters for data security**
```sql
-- Assumes filter_by_dept is a SQL UDF defined in Unity Catalog that returns BOOLEAN
CREATE STREAMING TABLE employees (
emp_id INT,
emp_name STRING,
dept STRING,
salary DECIMAL(10,2)
)
WITH ROW FILTER my_catalog.my_schema.filter_by_dept ON (dept)
AS SELECT * FROM STREAM(source.employees);
```
**Pattern 4: Partitioning and clustering**
```sql
-- Using partitioning (traditional approach)
CREATE STREAMING TABLE orders_partitioned
PARTITIONED BY (order_date)
AS SELECT * FROM STREAM(source.orders);
-- Using liquid clustering (recommended)
CREATE STREAMING TABLE orders_clustered
CLUSTER BY (order_date, customer_id)
AS SELECT * FROM STREAM(source.orders);
```
**Pattern 5: Sensitive data masking**
```sql
CREATE STREAMING TABLE customers (
customer_id INT,
name STRING,
email STRING,
ssn STRING MASK catalog.schema.ssn_mask USING COLUMNS (customer_id)
)
AS SELECT * FROM STREAM(source.customers);
```
**Pattern 6: Private streaming table (pipeline-internal staging)**
```sql
CREATE OR REFRESH PRIVATE STREAMING TABLE staging_events
AS SELECT *
FROM STREAM(raw_events)
WHERE event_type IS NOT NULL;
```
Use `PRIVATE` for internal staging datasets that should not be published to the catalog. Private tables are only accessible within the pipeline.
**Pattern 7: One-time backfill with flow**
```sql
CREATE STREAMING TABLE transactions (
transaction_id STRING,
customer_id STRING,
amount DECIMAL(10,2),
transaction_date TIMESTAMP
);
-- Continuous streaming flow for new data
CREATE FLOW live_stream
AS INSERT INTO transactions
SELECT * FROM STREAM(source.transactions);
-- One-time backfill flow for historical data (uses batch read without STREAM)
CREATE FLOW historical_backfill
AS INSERT INTO ONCE transactions
SELECT * FROM archive.historical_transactions;
```
**Pattern 8: Stream-static join (enrich streaming data with dimension table)**
```sql
CREATE OR REFRESH STREAMING TABLE enriched_transactions
AS SELECT t.*, c.name, c.email
FROM STREAM(transactions) t
JOIN customers c ON t.customer_id = c.id;
```
The dimension table (`customers`) is read as a static snapshot at stream start, while the streaming source (`transactions`) is read incrementally. This is the standard pattern for enriching streaming data with lookup/dimension tables.
**Pattern 9: Reading from upstream ST with updates/deletes (skipChangeCommits)**
```sql
CREATE OR REFRESH STREAMING TABLE downstream
AS SELECT * FROM STREAM read_stream("upstream_with_deletes", skipChangeCommits => true)
```
Use `skipChangeCommits` when reading from a streaming table that has updates/deletes (e.g., GDPR compliance, Auto CDC targets). Without this flag, change commits cause errors.
**KEY RULES:**
- Streaming tables require `STREAM()` keyword for streaming reads
- Never use batch reads (`SELECT * FROM table` without `STREAM()`) in streaming table definitions
- `ALTER TABLE` commands are not supported - use `CREATE OR REFRESH` or `ALTER STREAMING TABLE` instead
- Generated columns, identity columns, and default columns are not currently supported
- Row filters force full refresh of downstream materialized views
- Only table owners can refresh streaming tables
- Table renaming and ownership changes prohibited
- `CLUSTER BY` is recommended over `PARTITIONED BY` for most use cases
- For batch processing, use materialized views instead (see the `materializedView` API guide)
- Use `skipChangeCommits` when reading from STs that have updates/deletes
# Streaming Tables in Spark Declarative Pipelines Streaming tables enable continuous processing of data streams with exactly-once semantics and automatic checkpointing. ## Key Concepts Streaming tables in Spark Declarative Pipelines: - Process data continuously as it arrives - Provide exactly-once processing guarantees - Support stateful operations (aggregations, joins, deduplication) - Automatically manage checkpoints and state ## Language-Specific Implementations For detailed implementation guides: - **Python**: [streaming-table-python.md](streaming-table-python.md) - **SQL**: [streaming-table-sql.md](streaming-table-sql.md)
Temporary Views in Spark Declarative Pipelines create temporary logical datasets without persisting data to storage. Use views for intermediate transformations that drive downstream workloads but don't need materialization.
**API Reference:**
**@dp.temporary_view() (preferred) / @dp.view() (alias) / @dlt.view() (deprecated)**
Decorator to define a temporary view.
```python
@dp.temporary_view(
name="<name>",
comment="<comment>"
)
def my_view():
return spark.read.table("source.data")
```
Parameters:
- `name` (str): View name (defaults to function name)
- `comment` (str): Description for the view
**Common Patterns:**
**Pattern 1: Intermediate transformation layer**
```python
# View for shared filtering logic
@dp.temporary_view()
def valid_events():
return spark.read.table("raw.events") \
.filter("event_type IS NOT NULL") \
.filter("timestamp IS NOT NULL")
# Multiple tables consume the view
@dp.materialized_view()
def user_events():
return spark.read.table("valid_events") \
.filter("event_type = 'user_action'")
@dp.materialized_view()
def system_events():
return spark.read.table("valid_events") \
.filter("event_type = 'system_event'")
```
**Pattern 2: Streaming views**
```python
# Views work with streaming DataFrames too
@dp.temporary_view()
def streaming_events():
return spark.readStream.table("bronze.events") \
.filter("event_id IS NOT NULL")
@dp.table()
def filtered_stream():
return spark.readStream.table("streaming_events") \
.filter("event_type = 'critical'")
```
**KEY RULES:**
- Views can return either batch (`spark.read`) or streaming (`spark.readStream`) DataFrames
- Views are not materialized - they're computed on demand when referenced
- Reference views using `spark.read.table("view_name")` or `spark.readStream.table("view_name")`
- Views prevent code duplication when multiple downstream tables need the same transformation
Temporary Views in Spark Declarative Pipelines create temporary logical datasets without persisting data to storage. Use views for intermediate transformations that drive downstream workloads but don't need materialization. **API Reference:** **CREATE TEMPORARY VIEW** SQL statement to define a temporary view. ```sql CREATE TEMPORARY VIEW view_name [(col_name [COMMENT col_comment] [, ...])] [COMMENT view_comment] [TBLPROPERTIES (key = value [, ...])] AS query ``` Parameters: - `view_name` (identifier): Name of the temporary view - `col_name` (identifier): Optional column name specifications - `col_comment` (string): Optional description for individual columns - `view_comment` (string): Optional description for the view - `TBLPROPERTIES` (key-value pairs): Optional table properties - `query` (SELECT statement): Query that defines the view's data **Common Patterns:** **Pattern 1: Intermediate transformation layer** ```sql -- View for shared filtering logic CREATE TEMPORARY VIEW valid_events AS SELECT * FROM raw.events WHERE event_type IS NOT NULL AND timestamp IS NOT NULL; -- Multiple tables consume the view CREATE MATERIALIZED VIEW user_events AS SELECT * FROM valid_events WHERE event_type = 'user_action'; CREATE MATERIALIZED VIEW system_events AS SELECT * FROM valid_events WHERE event_type = 'system_event'; ``` **Pattern 2: Views with streaming sources** ```sql -- Temporary views work with streaming sources too CREATE TEMPORARY VIEW streaming_events AS SELECT * FROM STREAM(bronze.events) WHERE event_id IS NOT NULL; -- Downstream streaming table consuming the view CREATE STREAMING TABLE filtered_stream AS SELECT * FROM STREAM(streaming_events) WHERE event_type = 'critical'; ``` **KEY RULES:** - Views are not materialized - they're computed on demand when referenced - Views exist only during the pipeline execution lifetime and are private to the pipeline - Reference views in downstream tables using `FROM view_name` or `FROM STREAM(view_name)` for streaming - Views prevent code duplication when multiple downstream tables need the same transformation - Temporary views work with both batch and streaming data sources (using `STREAM()` function) - Views can share names with catalog objects; within the pipeline, references resolve to the temporary view **IMPORTANT - Using Expectations with Temporary Views:** `CREATE TEMPORARY VIEW` does not support CONSTRAINT clauses for expectations. If you need to include expectations (data quality constraints) with a temporary view, use `CREATE LIVE VIEW` syntax instead: ```sql CREATE LIVE VIEW view_name( CONSTRAINT constraint_name EXPECT (condition) [ON VIOLATION DROP ROW | FAIL UPDATE] ) AS query ``` `CREATE LIVE VIEW` is the older syntax for temporary views, retained specifically for this use case. Use `CREATE TEMPORARY VIEW` for views without expectations, and `CREATE LIVE VIEW` when you need to add CONSTRAINT clauses. For detailed information on using expectations with temporary views, see the "expectations" API guide.
# Temporary Views in Spark Declarative Pipelines Temporary views are pipeline-private views that exist only within the context of the pipeline and are not published to Unity Catalog. ## Key Concepts Temporary views in Spark Declarative Pipelines: - Are private to the pipeline (not published to Unity Catalog) - Can be referenced by other tables/views in the same pipeline - Do not persist after pipeline execution - Useful for organizing complex transformations ## Language-Specific Implementations For detailed implementation guides: - **Python**: [temporary-view-python.md](temporary-view-python.md) - **SQL**: [temporary-view-sql.md](temporary-view-sql.md)
Views in Spark Declarative Pipelines create virtual tables published to the Unity Catalog metastore. Unlike temporary views (which are private to the pipeline), views created with CREATE VIEW are accessible outside the pipeline and persist in the catalog. **API Reference:** **CREATE VIEW** SQL statement to define a persistent view in Unity Catalog. ```sql CREATE VIEW view_name [COMMENT view_comment] [TBLPROPERTIES (key = value [, ...])] AS query ``` Parameters: - `view_name` (identifier): Unique identifier within the catalog and schema - `view_comment` (string): Optional description for the view - `TBLPROPERTIES` (key-value pairs): Optional table properties - `query` (SELECT statement): Query that defines the view's data (must be batch, not streaming) **Common Patterns:** **Pattern 1: Filtered view for reusable logic** ```sql -- View with filtering logic published to catalog CREATE VIEW valid_orders COMMENT 'Orders with valid data for analysis' AS SELECT * FROM raw.orders WHERE order_id IS NOT NULL AND customer_id IS NOT NULL AND order_date IS NOT NULL; -- Multiple downstream tables can reference this view CREATE MATERIALIZED VIEW orders_by_region AS SELECT region, COUNT(*) AS order_count, SUM(amount) AS total_revenue FROM valid_orders GROUP BY region; ``` **Pattern 2: View with custom properties** ```sql -- View with table properties for metadata CREATE VIEW customer_summary COMMENT 'Aggregated customer metrics' TBLPROPERTIES ( 'quality' = 'silver', 'owner' = 'analytics-team', 'refresh_frequency' = 'daily' ) AS SELECT customer_id, COUNT(DISTINCT order_id) AS total_orders, SUM(amount) AS lifetime_value, MAX(order_date) AS last_order_date FROM valid_orders GROUP BY customer_id; ``` **KEY RULES:** - Views are virtual tables - not materialized, computed on demand when referenced - Views are published to Unity Catalog and accessible outside the pipeline - Views require Unity Catalog pipelines with default publishing mode - Does not support explicit column definitions with COMMENT - Cannot use `STREAM()` function - views must use batch queries only - Cannot define expectations (CONSTRAINT clauses) on views - Views require appropriate permissions: SELECT on source tables, CREATE TABLE on target schema - For pipeline-private views, use `CREATE TEMPORARY VIEW` instead - For materialized data persistence, use `CREATE MATERIALIZED VIEW` instead
# Views in Spark Declarative Pipelines Views provide a way to define reusable query logic and publish datasets to Unity Catalog for broader consumption. ## Key Concepts Views in Spark Declarative Pipelines: - Are published to Unity Catalog when the pipeline runs - Can reference other tables and views in the pipeline - Support both SQL and Python (with limitations) - Are refreshed when the pipeline updates ## Language-Specific Implementations For detailed implementation guides: - **SQL**: [view-sql.md](view-sql.md) **Important**: Python in Spark Declarative Pipelines only supports temporary views (private to the pipeline), not persistent views published to Unity Catalog. For Unity Catalog-published views, use SQL syntax with `CREATE VIEW`.
# Write Spark Declarative Pipelines Core syntax and rules for writing Spark Declarative Pipelines datasets. ## Language-specific guides - [Python basics](python-basics.md) - Python decorators, functions, and critical rules - [SQL basics](sql-basics.md) - SQL statements and critical rules