AI for Data Engineers 2026: dbt, SQL, Airflow
dbt model generation, SQL refactoring, Airflow DAG scaffolding, data lineage extraction
Last updated: April 19, 2026
Data engineering is configuration-heavy, repetitive, and full of context that AI can hold better than any human — table schemas, naming conventions, query patterns. In 2026, Indian data teams use AI across five recurring tasks: dbt model generation, SQL refactoring, Airflow DAG scaffolding, lineage extraction, and schema evolution. This guide covers each with runnable prompts, real code, and the guardrails that prevent disasters.
Key Takeaways
- dbt model generation with Opus 4.7 produces 70-80% correct staging models from source tables.
- SQL refactoring collapses gnarly 400-line queries into readable CTEs with measurable perf gains.
- Airflow DAGs scaffold cleanly; apply your execution-model conventions manually.
- Lineage extraction from raw SQL finally replaces stale hand-drawn diagrams.
- Schema evolution workflows take hours to minutes with AI finding all downstream impacts.
The 2026 Data Engineering Stack
+-----------------+ +-----------------+ +-----------------+
| Source systems |---> | Raw (Bronze) |---> | Staged (Silver)|
| Postgres, APIs | | Snowflake/BQ | | dbt models |
+-----------------+ +-----------------+ +-----------------+
|
v
+-----------------+ +-----------------+ +-----------------+
| BI / ML consumers|<---| Marts (Gold) |<----| Intermediate |
| Tableau, dashboards| | dbt models | | dbt models |
+-----------------+ +-----------------+ +-----------------+
^ ^ ^
| | |
+---- Airflow DAGs orchestrate all of this ----+
^
|
+----------+----------+
| AI assists at every |
| layer above this |
+---------------------+
Part 1: Generating dbt Models
Give the agent three things: source schema, your conventions, the target model purpose.
Example: Generating a staging model
# Cursor Agent Mode prompt
claude-code --effort default "Generate a dbt staging model
stg_orders from the source table raw.shopify_orders. Follow
our conventions in dbt_project.yml. The model should:
- Rename columns to snake_case
- Cast timestamps to TIMESTAMPTZ in Asia/Kolkata
- Filter out rows where is_test_order = true
- Add a surrogate key stg_order_id as MD5 of source order_id
- Handle NULL customer_id with COALESCE to -1
- Include generated_at timestamp
- Emit tests for not_null on order_id and customer_id,
unique on stg_order_id"
Output:
-- models/staging/shopify/stg_orders.sql
{{ config(materialized='view') }}
with source as (
select * from {{ source('shopify', 'orders') }}
where is_test_order = false
),
renamed as (
select
{{ dbt_utils.generate_surrogate_key(['order_id']) }} as stg_order_id,
order_id as source_order_id,
coalesce(customer_id, -1) as customer_id,
order_number,
order_status,
cast(created_at as timestamptz)
at time zone 'Asia/Kolkata' as order_created_at,
cast(updated_at as timestamptz)
at time zone 'Asia/Kolkata' as order_updated_at,
total_price_inr,
subtotal_price_inr,
tax_inr,
current_timestamp as generated_at
from source
)
select * from renamed
And the accompanying schema YAML:
# models/staging/shopify/_shopify__models.yml
version: 2
models:
- name: stg_orders
description: "Shopify orders, cleaned and cast to IST."
columns:
- name: stg_order_id
description: "Surrogate key hashed from order_id."
tests:
- not_null
- unique
- name: source_order_id
tests:
- not_null
- name: customer_id
tests:
- not_null
Run:
dbt compile && dbt run --select stg_orders && dbt test --select stg_orders
If tests pass, commit. If not, feed the error back to the agent.
Generating intermediate and mart models
The pattern scales. For an intermediate model, give the agent your staging model schemas. For marts, give it both staging and intermediate. Opus 4.7's 1M context window (model overview) means you can fit all your source schemas in a single prompt.
Part 2: SQL Refactoring
The worst SQL in every company looks like this:
-- 380-line query with subqueries in select, correlated subqueries,
-- inconsistent aliases, no CTEs, mixed DATE/TIMESTAMP arithmetic
SELECT a.*, (SELECT COUNT(*) FROM orders WHERE user = a.id) AS c,
(SELECT MAX(created) FROM orders WHERE user = a.id
AND status = 'completed') AS last_order,
...
Refactor prompt:
Refactor this 380-line SQL query. Requirements:
- Use CTEs with descriptive names
- Replace correlated subqueries with window functions where possible
- Standardise on TIMESTAMPTZ
- Fix N+1 patterns (one JOIN instead of one subquery per column)
- Add inline comments explaining each CTE's purpose
- Preserve the output schema exactly
- Target engine: Snowflake (use QUALIFY, LATERAL FLATTEN as needed)
Typical result: 380 lines -> 90-120 lines, 5-10x faster in the query planner, readable.
Verifying the refactor
Never trust AI to tell you the refactor is equivalent. Verify:
-- Run both queries, write to temp tables, diff
CREATE TEMP TABLE old_output AS SELECT * FROM (<old query>);
CREATE TEMP TABLE new_output AS SELECT * FROM (<new query>);
SELECT COUNT(*) FROM (
SELECT * FROM old_output
EXCEPT
SELECT * FROM new_output
) AS only_in_old
UNION ALL
SELECT COUNT(*) FROM (
SELECT * FROM new_output
EXCEPT
SELECT * FROM old_output
) AS only_in_new;
-- Both should return 0.
Part 3: Airflow DAG Scaffolding
# Prompt: "Write an Airflow 2.10 DAG that runs at 02:00 IST daily,
# pulls yesterday's Shopify orders via Airbyte, runs dbt build
# for stg_orders -> int_orders -> fct_daily_revenue, and posts
# a Slack message with the row count. Use TaskFlow API."
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
import pendulum
IST = pendulum.timezone("Asia/Kolkata")
@dag(
dag_id="daily_revenue_pipeline",
schedule="0 2 * * *",
start_date=datetime(2026, 4, 1, tzinfo=IST),
catchup=False,
default_args={
"owner": "data-platform",
"retries": 2,
"retry_delay": timedelta(minutes=10),
},
tags=["revenue", "shopify", "dbt"],
)
def daily_revenue_pipeline():
airbyte_sync = AirbyteTriggerSyncOperator(
task_id="airbyte_shopify_sync",
airbyte_conn_id="airbyte_default",
connection_id="{{ var.value.shopify_airbyte_conn_id }}",
asynchronous=False,
timeout=3600,
)
@task
def dbt_build():
import subprocess
subprocess.run(
["dbt", "build", "--select",
"stg_orders+ int_orders+ fct_daily_revenue"],
check=True,
)
@task
def row_count() -> int:
from google.cloud import bigquery
client = bigquery.Client()
result = client.query(
"SELECT COUNT(*) AS c FROM `analytics.fct_daily_revenue` "
"WHERE date = CURRENT_DATE('Asia/Kolkata') - 1"
).result()
return next(result)["c"]
slack_notify = SlackWebhookOperator(
task_id="slack_notify",
slack_webhook_conn_id="slack_data_platform",
message="Daily revenue pipeline complete. "
"Yesterday rows: {{ ti.xcom_pull(task_ids='row_count') }}",
)
airbyte_sync >> dbt_build() >> row_count() >> slack_notify
daily_revenue_pipeline()
Review for:
- Retry count and delay — AI defaults may not match your SRE conventions.
- Pool assignment — AI rarely sets pools; assign manually.
- SLA / miss callbacks — add your team's convention.
- XCom size — row counts are fine; large DataFrames in XCom crash Airflow.
Part 4: Data Lineage Extraction
Given a folder of SQL files, extract a lineage graph.
import os, glob, json
from anthropic import Anthropic
client = Anthropic()
sql_files = glob.glob("models/**/*.sql", recursive=True)
corpus = "\n\n".join(
f"-- FILE: {f}\n{open(f).read()}" for f in sql_files
)
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=8192,
messages=[{
"role": "user",
"content": f"""Extract data lineage from these dbt SQL
files. For each model, identify:
1. All upstream dependencies (source tables, {{{{ ref() }}}}
calls, {{{{ source() }}}} calls)
2. All downstream consumers (models that ref this one)
3. Column-level lineage where possible
Return JSON:
{{
"nodes": [
{{"id": "model_name", "type": "staging|intermediate|mart|source"}}
],
"edges": [
{{"from": "upstream", "to": "downstream", "columns": ["col1"]}}
]
}}
{corpus}""",
}],
)
lineage = json.loads(response.content[0].text)
# Feed to dbt-docs, Graphviz, or your data catalog.
This replaces spreadsheet-based lineage tracking most teams still do.
Part 5: Schema Evolution
Source systems change. Upstream adds a column, renames one, drops another. Finding all downstream impact is tedious.
# Prompt
claude-code "The source table raw.shopify_orders had these
changes this week:
- Added column shipping_country_code (string)
- Renamed total_price to total_price_with_tax
- Dropped column deprecated_order_tag
Find all dbt models under models/ that reference any of the
affected columns. For each, tell me:
1. Which column is affected
2. What change is needed (rename, remove, or add handling)
3. Whether downstream tests will still pass
Produce a migration PR plan."
Output is a structured diff plan. Apply it as a single PR with the migration and test updates together.
Tool Comparison
| Task | Cursor + Opus 4.7 | Claude Code CLI | Copilot | dbt Cloud AI | |------|-------------------|-----------------|---------|--------------| | dbt model generation | Best (iterative) | Best (batch) | Good (inline) | Good | | SQL refactoring | Best | Good | OK | Good | | Airflow DAG scaffolding | Good | Best | Good | N/A | | Lineage extraction | Via API | Best (scripted) | Limited | Native | | Schema evolution | Good | Best (cross-file) | Limited | Good | | Explaining legacy SQL | Best | Best | Good | Good |
India Data-Engineering Gotchas
- Currency — store in paise (integer), display as INR. AI often generates float columns; fix manually.
- Timezone — always TIMESTAMPTZ, default storage in UTC, display in Asia/Kolkata.
- PAN, Aadhaar, GSTIN columns — these are PII. Mask in non-prod. AI rarely applies PII rules unless you tell it.
- Date formats — DD/MM/YYYY from source systems must convert cleanly.
TO_DATE(col, 'DD/MM/YYYY'). - Number formatting — lakh/crore in labels, standard digits in storage. Don't store formatted strings.
Tell your AI tool these patterns explicitly in CLAUDE.md or your .cursorrules; default behaviour is US-centric.
Where to Go Next
- Claude Code Skills & Superpowers — write a custom /dbt-generate skill
- Cursor IDE Tutorial India — Cursor basics for data work
- MCP Servers Tutorial — expose your Snowflake/BigQuery as an MCP tool
- GitHub Copilot Free Setup — inline SQL in VS Code
- AI-first workflow 2026 — the autocomplete-to-review loop for SQL/dbt work
- Agentic dev: multi-step agents — build a data-platform agent for pipeline ops
Community Questions
0No questions yet. Be the first to ask!