Close Menu
AI News TodayAI News Today

    Subscribe to Updates

    Get the latest creative news from FooBar about art, design and business.

    What's Hot

    LLM 0.32a0 is a major backwards-compatible refactor

    All the evidence unveiled so far in Musk v. Altman

    OpenAI Codex system prompt includes explicit directive to “never talk about goblins”

    Facebook X (Twitter) Instagram
    • About Us
    • Contact Us
    Facebook X (Twitter) Instagram Pinterest Vimeo
    AI News TodayAI News Today
    • Home
    • Shop
    • AI News
    • AI Reviews
    • AI Tools
    • AI Tutorials
    • Chatbots
    • Free AI Tools
    AI News TodayAI News Today
    Home»AI Tools»4 YAML Files Instead of PySpark: How We Let Analysts Build Data Pipelines Without Engineers
    AI Tools

    4 YAML Files Instead of PySpark: How We Let Analysts Build Data Pipelines Without Engineers

    By No Comments11 Mins Read
    Share Facebook Twitter Pinterest LinkedIn Tumblr Reddit Telegram Email
    4 YAML Files Instead of PySpark: How We Let Analysts Build Data Pipelines Without Engineers
    Share
    Facebook Twitter LinkedIn Pinterest Email

    us three weeks to ship a single data pipeline. Today, an analyst with zero Python experience does it in a day. Here’s how we got there.

    I’m Kiril Kazlou, a data engineer at Mindbox. Our team regularly recalculates business metrics for clients — which means we’re constantly building data marts for billing and analytics, pulling from dozens of different sources.

    For a long time, we relied on PySpark for all our data processing. The problem? You can’t really work with PySpark without Python experience. Every new pipeline required a developer. And that meant waiting — sometimes for weeks.

    In this post, I’ll walk you through how we built an internal data platform where an analyst or product manager can spin up a regularly updated pipeline by writing just four YAML files.

    Why PySpark Was Slowing Us Down

    Let me illustrate the pain with a textbook example — calculating MAU (Monthly Active Users).

    On the surface, this feels like a simple SQL job: COUNT(DISTINCT customerId) across a few tables over a time window. But because of all the infrastructure overhead — PySpark, Airflow DAG setup, Spark resource allocation, testing — we had to hand it off to developers. The result? A full week just to ship a MAU counter.

    Every new metric took one to three weeks to deliver. And every single time, the process looked the same:

    1. An analyst defined the business requirements, found an available developer, and handed over the context.
    2. The developer clarified details, wrote PySpark code, went through code review, configured the DAG, and deployed.

    What we actually wanted was for analysts and product managers — the people who understand the business logic best and are fluent in SQL and YAML — to handle this themselves. No Python. No PySpark.

    How pipelines were built with PySpark

    What We Replaced PySpark With: YAML and SQL Are All You Need

    To take a declarative approach, we split our data layer into three parts and picked the right tool for each:

    • dlt (data load tool) — ingests data from external APIs and databases into object storage. Configured entirely through a YAML file. No code required.
    • dbt (data build tool) on Trino — transforms data using pure SQL. It links models via ref(), automatically builds a dependency graph, and handles incremental updates.
    • Airflow + Cosmos — orchestrates the pipelines. The Airflow DAG is auto-generated from dag.yaml and the dbt project.

    We were already using Trino as a query engine for ad-hoc queries and had it plugged into Superset for BI. It had already proven itself: for queries with standard logic, it processed massive datasets faster and with fewer resources than Spark. On top of that, Trino natively supports federated access to multiple data stores from a single SQL query. For 90% of our pipelines, Trino was a perfect fit.

    Diagram of the new pipeline workflow: an analyst writes YAML configs and SQL models directly. dbt and Trino handle execution automatically through Airflow. No developer involvement required. The full process takes one day.
    After: analyst-owned pipelines with dbt + Trino

    How We Load Data: dlt.yaml

    The first YAML file describes where and how to load data for downstream processing. Here’s a real-world example — loading billing data from an internal API:

    product: sg-team
    feature: billing
    schema: billing_tarification
    
    dag:
      dag_id: dlt_billing_tarification
      schedule: "0 4 * * *"
      description: "Daily refresh of tarification data"
      tags:
        - billing
    
    alerts:
      enabled: true
      severity: warning
    
    source:
      type: rest_api
      client:
        base_url: "https://internal-api.example.com"
        auth:
          type: bearer
          token: dlt-billing.token
      resources:
        - name: tarification_data
          endpoint:
            path: /tarificationData
            method: POST
            json:
              firstPeriod: "{{ previous_month_date }}"
              lastPeriod: "{{ previous_month_date }}"
              pricingPlanLine: CurrentPlan
          write_disposition: replace
          processing_steps:
            - map: dlt_custom.billing_tarification_data.map
    
        - name: charges_raw
          columns:
            staffUserName:
              data_type: text
              nullable: true
          endpoint:
            path: /data-feed/charges
            method: POST
            json:
              firstPeriod: "{{ previous_month_date }}"
              lastPeriod: "{{ previous_month_date }}"
          write_disposition: replace
    
        - name: discounts_raw
          endpoint:
            path: /data-feed/discounts
            method: POST
            json:
              firstPeriod: "{{ previous_month_date }}"
              lastPeriod: "{{ previous_month_date }}"
          write_disposition: replace

    This config defines four resources from a single API. For each one, we specify the endpoint, request parameters, and a write strategy — in our case, replace means “overwrite every time.” You can also add processing steps, define column types, and configure alerts.

    The entire config is 40 lines of YAML. Without dlt, each connector would be a Python script handling requests, pagination, retries, serialization to Delta Table format, and uploads to storage.

    How We Transform Data With SQL: dbt_project.yaml and sources.yaml

    The next step is configuring the dbt model. With Trino, that means SQL queries.

    Here’s an example of how we set up the MAU calculation. This is what event preparation from a single source looks like:

    -- int_mau_events_visits.sql (simplified)
    {{ config(materialized='table') }}
    
    WITH period AS (
        -- Rolling window: last 5 months to current
        SELECT
            YEAR(CURRENT_DATE - INTERVAL '5' MONTH) AS start_year,
            MONTH(CURRENT_DATE - INTERVAL '5' MONTH) AS start_month,
            YEAR(CURRENT_DATE) AS end_year,
            MONTH(CURRENT_DATE) AS end_month
    ),
    
    events AS (
        -- Pull visit events within the period window
        SELECT src._tenant, src.unmergedCustomerId,
               'visits' AS src_type, src.endpoint
        FROM {{ source('final', 'customerstracking_visits') }} src
        CROSS JOIN period p
        WHERE src.unmergedCustomerId IS NOT NULL
          AND /* ...timestamp filtering by year/month bounds... */
    ),
    
    events_with_customer AS (
        -- Resolve merged customer IDs
        SELECT e._tenant,
               COALESCE(mc.mergedCustomerId, e.unmergedCustomerId) AS customerId,
               e.src_type, e.endpoint
        FROM events e
        LEFT JOIN {{ ref('int_merged_customers') }} mc
          ON e._tenant = mc._tenant
          AND e.unmergedCustomerId = mc.unmergedCustomerId
    )
    
    -- Keep only actual (non-deleted) customers
    SELECT ewc._tenant, ewc.customerId, ewc.src_type, ewc.endpoint
    FROM events_with_customer ewc
    WHERE EXISTS (
        SELECT 1 FROM {{ ref('int_actual_customers') }} ac
        WHERE ewc._tenant = ac._tenant
          AND ewc.customerId = ac.customerId
    )

    All 10 event sources follow the exact same pattern. The only differences are the source table and the filters. Then the models merge into a single stream:

    -- int_mau_events.sql (union of all sources)
    SELECT * FROM {{ ref('int_mau_events_inapps_targetings') }}
    UNION ALL
    SELECT * FROM {{ ref('int_mau_events_inapps_clicks') }}
    UNION ALL
    SELECT * FROM {{ ref('int_mau_events_visits') }}
    UNION ALL
    SELECT * FROM {{ ref('int_mau_events_orders') }}
    -- ...plus 6 more sources

    And finally, the data mart where everything gets aggregated:

    -- mau_period_datamart.sql
    {{ config(
        materialized='incremental',
        incremental_strategy='merge',
        unique_key=['_tenant', 'start_year', 'start_month', 'end_year', 'end_month']
    ) }}
    
    {%- set months_back = var('months_back', 5) | int -%}
    
    WITH period AS (
        SELECT
            YEAR(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_year,
            MONTH(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_month,
            YEAR(CURRENT_DATE) AS end_year,
            MONTH(CURRENT_DATE) AS end_month
    ),
    events_resolved AS (
        SELECT * FROM {{ ref('int_mau_events') }}
    ),
    metrics_by_tenant AS (
        SELECT
            er._tenant,
            COUNT(DISTINCT CASE WHEN src_type = 'visits'
                  THEN customerId END) AS CustomersTracking_Visits,
            COUNT(DISTINCT CASE WHEN src_type = 'orders'
                  THEN customerId END) AS ProcessingOrders_Orders,
            COUNT(DISTINCT CASE WHEN src_type = 'mailings'
                  THEN customerId END) AS Mailings_MessageStatuses,
            -- ...other metrics
            COUNT(DISTINCT customerId) AS MAU
        FROM events_resolved er
        GROUP BY er._tenant
    )
    SELECT m.*, p.start_year, p.start_month, p.end_year, p.end_month
    FROM metrics_by_tenant m
    CROSS JOIN period p

    For the data mart configuration, we use incremental_strategy='merge'. dbt automatically generates the merge query, substituting the unique_key for upsert. No need to manually implement incremental loading.

    To tie the models into a single project, we set up dbt_project.yaml:

    name: mau_period
    version: '1.0.0'
    
    models:
      mau_period:
        +on_table_exists: replace
        +on_schema_change: append_new_columns

    And sources.yaml, which describes the input tables:

    sources:
      - name: final
        database: data_platform
        schema: final
        tables:
          - name: inapps_targetings_v2
          - name: inapps_clicks_v2
          - name: customerstracking_visits
          - name: processingorders_orders
          - name: cdp_mergedcustomers_v2
          # ...

    The result is the same business logic we had in PySpark, but in pure SQL: sources.yaml replaces typedspark schemas, {{ ref() }} and {{ source() }} replace .get_table(), and automatic execution order via the dependency graph replaces manual Spark resource tuning.

    How We Configure Airflow: dag.yaml

    The fourth configuration file defines when and how Airflow runs the pipeline:

    product: sg-team
    feature: billing
    schema: mau
    schedule: "15 21 * * *"  # every day at 00:15 MSK
    
    params:
      - name: start_date
        description: "Start date (YYYY-MM-DD). Leave empty for auto"
        default: ""
      - name: end_date
        description: "End date (YYYY-MM-DD). Leave empty for auto"
        default: ""
      - name: months_back
        description: "Months to look back (default: 5)"
        default: 5
    
    alerts:
      enabled: true
      severity: warning

    Then our Python script parses dag.yaml and dbt_project.yaml and uses the Cosmos library to generate a fully functional Airflow DAG. This is the only piece of Python code in the entire setup. It’s written once and works for every dbt project. Here’s the key part:

    def _build_dbt_project_dags(project_path: Path, environ: dict) -> list[DbtDag]:
        config_dict = yaml.safe_load(dag_config_path.read_text())
        config = DagConfig.model_validate(config_dict)
    
        # YAML params → Airflow Params
        params = {}
        operator_vars = {}
        for param in config.params:
            params[param.name] = Param(
                default=param.default if param.default is not None else "",
                description=param.description,
            )
            operator_vars[param.name] = f"{{{{ params.{param.name} }}}}"
    
        # Cosmos creates the DAG from the dbt project
        with DbtDag(
            dag_id=f"dbt_{project_path.name}",
            schedule=config.schedule,
            params=params,
            project_config=ProjectConfig(dbt_project_path=project_path),
            profile_config=ProfileConfig(
                profile_name="default",
                target_name=project_name,
                profile_mapping=TrinoLDAPProfileMapping(
                    conn_id="trino_default",
                    profile_args={
                        "database": profile_database,
                        "schema": profile_schema,
                    },
                ),
            ),
            operator_args={"vars": operator_vars},
        ) as dag:
            # Create schema before running models
            create_schema = SQLExecuteQueryOperator(
                task_id="create_schema",
                conn_id="trino_default",
                sql=f"CREATE SCHEMA IF NOT EXISTS {profile_database}.{profile_schema} ...",
            )
            # Attach to root tasks
            for unique_id, _ in dag.dbt_graph.filtered_nodes.items():
                task = dag.tasks_map[unique_id]
                if not task.upstream_task_ids:
                    create_schema >> task

    Cosmos reads manifest.json from the dbt project, parses the model dependency graph, and creates a separate Airflow task for each model. Task dependencies are built automatically based on ref() calls in the SQL.

    How Analysts Build Pipelines Without Developers

    Now when an analyst needs a new recurring pipeline, they can put it together in a few steps:

    Step 1. Create a folder in the repo: dbt-projects/my_new_pipeline/.

    Step 2. If external data ingestion is needed, write a YAML config for dlt.

    Step 3. Write SQL models in the models/ folder and describe the sources in sources.yaml.

    Step 4. Create dbt_project.yaml and dag.yaml.

    Step 5. Push to Git, go through review, merge.

    CI/CD builds the dbt project and ships artifacts to S3. Airflow reads the DAG files from there, Cosmos parses the dbt project and generates the task graph. On schedule, dbt runs the models on Trino in the correct order. The end result is an updated data mart in the warehouse, accessible through Superset.

    What Changed After the Migration

    Before-and-after comparison showing pipeline delivery time dropping from one to three weeks under PySpark to one day with the YAML-based stack, and pipeline ownership shifting from developers to analysts.
    What changed: from weeks to one day, from developers to analysts

    For analysts to build pipelines on their own, they need to understand ref() and source() concepts, the difference between table and incremental materialization, and the basics of Git. We ran a few internal workshops and put together step-by-step guides for each task type.

    Why the New Stack Doesn’t Fully Replace PySpark

    For about 10% of our pipelines, PySpark is still the only option — when a transformation simply doesn’t fit into SQL. dbt supports Jinja macros, but that’s no substitute for full-blown Python. And it would be dishonest to skip over the limitations of the new tools.

    dlt + Delta: experimental upsert support. We use the Delta format in our storage layer. dlt’s Delta connector is marked as experimental, so the merge strategy didn’t work out of the box. We had to find workarounds — in some cases we used replace instead of merge (sacrificing incrementality), and in others we wrote custom processing_steps.

    Trino’s limited fault tolerance. Trino does have a fault tolerance mechanism, but it works by writing intermediate results to S3. At our terabyte-scale data volumes, this is impractical — the sheer number of S3 operations makes it prohibitively expensive. Without fault tolerance enabled, if a Trino worker goes down, the entire query fails. Spark, by contrast, restarts just the failed task. We addressed this with DAG-level retries and by decomposing heavy models into chains of intermediate ones.

    UDFs and custom logic. In Spark, you can write custom logic in Python right inside the pipeline — super convenient. With the new architecture, this is much harder. dbt on top of Trino doesn’t help: Jinja only generates SQL, and dbt’s Python models only work with Snowflake, Databricks, and BigQuery. You can write UDFs in Trino, but only in Java — with all the overhead that entails: a separate repo, a build pipeline, deploying JARs across all workers. So when a transformation doesn’t fit into SQL, you either end up with an unmaintainable SQL monster or a standalone script that breaks the lineage.

    What’s Next: Tests, Model Templates, and Training

    Better testing. We had solid pipeline testing in PySpark, but the new architecture is still catching up. Recent dbt versions introduced unit testing — you can now validate SQL model logic against mock data without spinning up the full pipeline. We want to add dbt tests both at the model level and as a separate monitoring layer.

    Reusable templates for common patterns. Many of our dbt models look alike. A single config could describe a dozen models with the same pattern — only the source table and filters differ. We plan to extract the shared logic into dbt macros.

    Expanding the platform’s user base. We want more engineers and analysts to work with data independently. We’re planning regular internal training sessions, documentation, and onboarding guides so new users can get up to speed quickly and start building their own models.

    If your team is stuck in the same “analysts wait for developers” loop, I’d love to hear how you’re solving it. Connect with me on LinkedIn and let’s compare notes.


    All images in this article are by the author unless otherwise noted.

    Analysts build Data Engineers files Pipelines PySpark YAML
    Share. Facebook Twitter Pinterest LinkedIn Tumblr Email
    Previous ArticleGoogle Photos launches an AI try-on feature for clothes you already have
    Next Article The Moto Razr and Razr Plus are victims of shrinkflation
    • Website

    Related Posts

    AI Tools

    Ensembles of Ensembles of Ensembles: A Guide to Stacking

    AI Tools

    System Design Series: Apache Flink from 10,000 Feet, and Building a Flink-powered Recommendation Engine

    AI Tools

    Let the AI Do the Experimenting

    Add A Comment
    Leave A Reply Cancel Reply

    Top Posts

    LLM 0.32a0 is a major backwards-compatible refactor

    0 Views

    All the evidence unveiled so far in Musk v. Altman

    0 Views

    OpenAI Codex system prompt includes explicit directive to “never talk about goblins”

    0 Views
    Stay In Touch
    • Facebook
    • YouTube
    • TikTok
    • WhatsApp
    • Twitter
    • Instagram
    Latest Reviews
    AI Tutorials

    Quantization from the ground up

    AI Tools

    David Sacks is done as AI czar — here’s what he’s doing instead

    AI Reviews

    Judge sides with Anthropic to temporarily block the Pentagon’s ban

    Subscribe to Updates

    Get the latest tech news from FooBar about tech, design and biz.

    Most Popular

    LLM 0.32a0 is a major backwards-compatible refactor

    0 Views

    All the evidence unveiled so far in Musk v. Altman

    0 Views

    OpenAI Codex system prompt includes explicit directive to “never talk about goblins”

    0 Views
    Our Picks

    Quantization from the ground up

    David Sacks is done as AI czar — here’s what he’s doing instead

    Judge sides with Anthropic to temporarily block the Pentagon’s ban

    Subscribe to Updates

    Get the latest creative news from FooBar about art, design and business.

    Facebook X (Twitter) Instagram Pinterest
    • About Us
    • Contact Us
    • Terms & Conditions
    • Privacy Policy
    • Disclaimer

    © 2026 ainewstoday.co. All rights reserved. Designed by DD.

    Type above and press Enter to search. Press Esc to cancel.