API Reference

Complete API reference for Teal's Go packages, interfaces, and functions.

Overview

This page provides a comprehensive API reference for Teal’s core packages and interfaces. Use this reference when extending Teal with custom functionality or integrating it into your Go applications.

Template Functions

Teal uses the pongo2 template engine (v6), which is Django-compatible. This means you can use familiar Django/Jinja2 template syntax in your SQL models.

Template Engine Features

  • Django-compatible syntax: If you know Django templates or Jinja2, you already know pongo2
  • Control structures: {% if condition %}...{% endif %}, {% for item in items %}...{% endfor %}
  • Variables and filters: {{ variable }}, {{ variable|upper }}, {{ variable|safe }}
  • Template inheritance: {% extends %} and {% block %} (for advanced use cases)
  • Comments: {# This is a comment #}

Static and Dynamic Evaluation

Teal template functions are evaluated at different stages:

Generation-time evaluation (during teal gen):

  • {{ Ref("staging.model") }} - Replaced with actual table name and establishes DAG dependencies
  • {{ this() }} - Replaced with current model’s table name

Runtime evaluation (during DAG execution):

  • {{ TaskID }} - Current task identifier
  • {{ TaskUUID }} - Unique task UUID
  • {{ InstanceName }} - DAG instance name
  • {{ InstanceUUID }} - DAG instance UUID
  • {{ ENV("VAR_NAME", "default") }} - Environment variable value
  • {% if IsIncremental() %}...{% endif %} - Control structures

Example showing both:

-- Generation time: Ref() resolves to "staging.raw_orders"
-- Runtime: TaskID gets actual value during execution
SELECT *
FROM {{ Ref("staging.raw_orders") }}
WHERE task_id = '{{ TaskID }}'

Processing Flow:

  1. During teal gen: {{ Ref(...) }} and {{ this() }} are evaluated and replaced with actual table names. All other template syntax is preserved in the generated Go code.
  2. At runtime: {{ TaskID }}, {{ ENV(...) }}, and control structures {% %} are evaluated when SQL executes during DAG execution.

List of Functions

NameInput ParametersOutputWhen EvaluatedDescriptionExample
Ref"<stage>.<model>"stringGeneration-timeMain function for DAG dependencies. Replaced with actual table name during teal gen.{{ Ref("staging.customers") }}
thisNonestringGeneration-timeReturns the name of the current table.{{ this() }}
ENVenvName, defaultValuestringRuntimeGets environment variable value at runtime.{{ ENV("DB_SCHEMA", "public") }}
IsIncrementalNonebooleanRuntimeReturns true if model is in incremental mode. Use in control structures.{% if IsIncremental() %}...{% endif %}
TaskID(variable)stringRuntimeThe task identifier from the Push method.{{ TaskID }}
TaskUUID(variable)stringRuntimeThe unique UUID assigned for task tracking.{{ TaskUUID }}
InstanceName(variable)stringRuntimeThe DAG instance name.{{ InstanceName }}
InstanceUUID(variable)stringRuntimeThe unique UUID assigned to the DAG instance.{{ InstanceUUID }}

Complete Example

{{ define "profile.yaml" }}
    materialization: 'incremental'
    is_data_framed: true
{{ end }}

SELECT
    order_id,
    customer_id,
    order_date,
    total_amount,
    '{{ TaskID }}' as etl_task_id,
    '{{ TaskUUID }}' as etl_run_id,
    current_timestamp as processed_at
FROM {{ Ref("staging.raw_orders") }}  -- Resolved at generation-time
{% if IsIncremental() %}
    WHERE order_date > (SELECT COALESCE(MAX(order_date), '1900-01-01') FROM {{ this() }})
{% endif %}

Core Packages

pkg/core

The core package provides the singleton instance that manages configuration and global state.

GetInstance()

Returns the singleton core instance.

import "github.com/go-teal/teal/pkg/core"

core := core.GetInstance()

Init(configPath, projectPath string)

Initializes the core instance with configuration.

core.GetInstance().Init("config.yaml", ".")

Parameters:

  • configPath - Path to config.yaml file
  • projectPath - Root path of the project

pkg/dags

The dags package provides DAG implementations for orchestrating asset execution.

DAG Interface

type DAG interface {
    Run() *sync.WaitGroup
    Push(taskName string, inputData map[string]interface{}, done chan map[string]interface{}) chan map[string]interface{}
    Stop()
}

Methods:

Run() *sync.WaitGroup

Builds and starts the DAG execution. Returns a WaitGroup that can be used to wait for completion.

wg := dag.Run()
Push(taskName, inputData, done) chan

Triggers DAG execution with a specific task name and input data.

result := <-dag.Push("task_123", inputDataMap, make(chan map[string]interface{}))

Parameters:

  • taskName - Unique identifier for this execution
  • inputData - Map of input data to pass to the DAG
  • done - Channel to signal completion

Returns: Channel that will receive the final result

Stop()

Signals the DAG to stop execution.

dag.Stop()

InitChannelDag

Creates a production Channel DAG instance.

func InitChannelDag(
    dagConfig map[string][]string,
    projectAssets map[string]processing.Asset,
    config *configs.Config,
    instanceName string
) DAG

Parameters:

  • dagConfig - Map of asset names to their dependencies
  • projectAssets - Map of asset names to Asset implementations
  • config - Configuration object
  • instanceName - Name for this DAG instance

Example:

import (
    "github.com/go-teal/teal/pkg/dags"
    "github.com/my_user/my_project/internal/assets"
)

dag := dags.InitChannelDag(
    assets.DAG,
    assets.ProjectAssets,
    config,
    "my-pipeline-instance"
)

InitChannelDagWithTests

Creates a Channel DAG with integrated testing.

func InitChannelDagWithTests(
    dagConfig map[string][]string,
    projectAssets map[string]processing.Asset,
    projectTests map[string]processing.Asset,
    config *configs.Config,
    instanceName string
) DAG

Parameters:

  • Same as InitChannelDag, plus:
  • projectTests - Map of test names to test Asset implementations

InitDebugDag

Creates a Debug DAG instance with REST API and visualization support.

func InitDebugDag(
    dagConfig map[string][]string,
    projectAssets map[string]processing.Asset,
    config *configs.Config,
    instanceName string
) *DebugDag

pkg/processing

The processing package defines interfaces and types for asset execution.

Asset Interface

type Asset interface {
    Execute(ctx *TaskContext) (interface{}, error)
    GetUpstreams() []string
    GetDownstreams() []string
    GetName() string
}

Methods:

Execute(ctx *TaskContext) (interface{}, error)

Executes the asset with the given task context.

Parameters:

  • ctx - Task context containing runtime information and input data

Returns:

  • Result of execution (can be nil, DataFrame, or custom type)
  • Error if execution failed
GetUpstreams() []string

Returns list of upstream dependency names.

upstreams := asset.GetUpstreams()
// Returns: []string{"staging.model1", "staging.model2"}
GetDownstreams() []string

Returns list of downstream dependent names.

downstreams := asset.GetDownstreams()
GetName() string

Returns the asset name.

name := asset.GetName()
// Returns: "dds.fact_transactions"

TaskContext

Provides runtime information for asset execution.

type TaskContext struct {
    TaskID       string
    TaskUUID     string
    InstanceName string
    InstanceUUID string
    Input        map[string]interface{}
}

Fields:

  • TaskID - Task identifier from Push method
  • TaskUUID - Unique UUID for this task execution
  • InstanceName - Name of the DAG instance
  • InstanceUUID - UUID of the DAG instance
  • Input - Map of upstream results (key: asset name, value: result data)

Example:

func MyRawAsset(ctx *processing.TaskContext, modelProfile *configs.ModelProfile) (interface{}, error) {
    // Access task information
    log.Info().Str("taskID", ctx.TaskID).Msg("Executing")

    // Access upstream data
    if upstream, ok := ctx.Input["dds.model1"]; ok {
        df := upstream.(*dataframe.DataFrame)
        // Process dataframe
    }

    return result, nil
}

ExecutorFunc

Type definition for raw asset functions.

type ExecutorFunc func(ctx *TaskContext, modelProfile *configs.ModelProfile) (interface{}, error)

GetExecutors()

Returns the global executors registry for raw assets.

import "github.com/go-teal/teal/pkg/processing"

processing.GetExecutors().Executors["staging.my_raw_asset"] = MyRawAssetFunc

pkg/configs

The configs package defines configuration structures.

Config

Main configuration structure.

type Config struct {
    Version     string
    Module      string
    Connections []ConnectionConfig
}

ConnectionConfig

Database connection configuration.

type ConnectionConfig struct {
    Name   string
    Type   string
    Config map[string]interface{}
}

ModelProfile

Model-specific configuration.

type ModelProfile struct {
    Name              string
    Description       string
    Connection        string
    Materialization   string
    IsDataFramed      bool
    PersistInputs     bool
    PrimaryKeyFields  []string
    Indexes           []Index
}

Index

Index configuration for models.

type Index struct {
    Name   string
    Unique bool
    Fields []string
}

pkg/drivers

The drivers package provides database driver interfaces and implementations.

DBDriver Interface

type DBDriver interface {
    Connect() error
    Begin() (interface{}, error)
    Commit(tx interface{}) error
    Rollback(tx interface{}) error
    Close() error
    Exec(tx interface{}, sql string) error
    GetListOfFields(tx interface{}, tableName string) ([]string, error)
    CheckTableExists(tx interface{}, tableName string) bool
    CheckSchemaExists(tx interface{}, schemaName string) bool
    ToDataFrame(sql string) (*dataframe.DataFrame, error)
    PersistDataFrame(tx interface{}, name string, df *dataframe.DataFrame) error
    SimpleTest(sql string) (string, error)
    GetRawConnection() interface{}
    ConcurrencyLock()
    ConcurrencyUnlock()
}

Custom Asset Development

Creating a SQL Model Asset

SQL model assets are automatically generated from .sql files in assets/models/<stage>/.

Example: assets/models/staging/customers.sql

{{ define "profile.yaml" }}
    connection: 'default'
    materialization: 'table'
    description: 'Staging table for customer data'
{{ end }}

SELECT
    id,
    name,
    email,
    created_at
FROM read_csv('data/customers.csv')

Creating a Raw Asset

Raw assets are custom Go functions for complex transformations.

Step 1: Create the function

package custom

import (
    "github.com/go-teal/teal/pkg/processing"
    "github.com/go-teal/teal/pkg/configs"
    "github.com/go-gota/gota/dataframe"
)

func ProcessCustomers(ctx *processing.TaskContext, profile *configs.ModelProfile) (interface{}, error) {
    // Get upstream data
    rawData := ctx.Input["staging.raw_customers"].(*dataframe.DataFrame)

    // Process data
    processed := rawData.Filter(
        dataframe.F{Colname: "active", Comparator: "==", Comparando: true},
    )

    // Return dataframe
    return processed, nil
}

Step 2: Register in main.go

import "github.com/my_user/my_project/pkg/custom"

processing.GetExecutors().Executors["dds.customers_processed"] = custom.ProcessCustomers

Step 3: Configure in profile.yaml

models:
  stages:
    - name: dds
      models:
        - name: customers_processed
          materialization: 'raw'
          raw_upstreams:
            - "staging.raw_customers"

Error Handling

Asset Execution Errors

When an asset execution fails, the error is propagated through the DAG and logged.

func MyAsset(ctx *processing.TaskContext, profile *configs.ModelProfile) (interface{}, error) {
    if err := validateInput(ctx.Input); err != nil {
        return nil, fmt.Errorf("input validation failed: %w", err)
    }

    // Process...

    return result, nil
}

Test Failures

When tests fail (return non-zero row count), the DAG execution status is set to TESTS_FAILED.

Performance Optimization

Concurrency Control

Database drivers implement concurrency locks for thread-safe operations:

driver.ConcurrencyLock()
defer driver.ConcurrencyUnlock()

// Perform thread-safe operation

DataFrame Caching

Enable is_data_framed to allow downstream models to query the asset’s data using SQL. When enabled, the asset’s output is cached as a DataFrame, enabling seamless database queries.

{{ define "profile.yaml" }}
    is_data_framed: true  # Enable SQL queries on this asset's data
{{ end }}

Use persist_inputs: true to create temporary tables in the destination database for upstream DataFrames, enabling cross-database queries:

{{ define "profile.yaml" }}
    persist_inputs: true  # Creates temporary tables for upstream DataFrames
{{ end }}

Incremental Processing

Use incremental materialization for large datasets:

{{ define "profile.yaml" }}
    materialization: 'incremental'
{{ end }}

SELECT *
FROM source_table
{% if IsIncremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this() }})
{% endif %}

Best Practices

  1. Use Stages Wisely: Organize models into logical stages (staging, dds, mart)
  2. Test Everything: Write tests for data quality checks
  3. Document Models: Add descriptions to model profiles
  4. Environment Variables: Use _env suffixes for sensitive configuration
  5. Incremental When Possible: Use incremental materialization for large, append-only datasets
  6. Monitor Performance: Use Debug UI to visualize and monitor DAG execution
  7. Version Control: Commit config.yaml, profile.yaml, and SQL models to git
  8. CI/CD Integration: Run teal gen in CI pipeline to validate models

Further Reading