Data Flow
Data flow is the mechanism by which information moves through Probe workflows. Understanding data flow patterns enables you to build sophisticated workflows that pass information between steps, jobs, and even external systems. This guide explores the complete data flow system in Probe.
Data Flow Overview
Probe uses a structured approach to data flow:
- Input Sources: Environment variables, configuration files, user inputs
- Processing: Actions generate responses and outputs
- Storage: Outputs are stored for later use
- Propagation: Data flows between steps and jobs
- Consumption: Other steps use the data for dynamic configuration
Data Sources
Environment Variables
Environment variables provide external configuration and runtime context.
# Access environment variables
steps:
- name: Environment-based Configuration
action: http
with:
url: "{{env.API_BASE_URL}}/{{env.API_VERSION}}/users"
headers:
Authorization: "Bearer {{env.API_TOKEN}}"
X-Environment: "{{env.DEPLOYMENT_ENV}}"
test: res.status == 200
Configuration Merging
Data can come from merged configuration files:
base-config.yml:
defaults:
api:
timeout: 30s
retry_count: 3
env:
API_BASE_URL: https://api.example.com
production.yml:
env:
API_BASE_URL: https://api.production.example.com
API_TOKEN: ${PROD_API_TOKEN}
defaults:
api:
timeout: 10s # Override for production
Usage:
probe workflow.yml,base-config.yml,production.yml
Step Outputs
Steps generate outputs that can be consumed by subsequent steps and jobs.
Basic Output Definition
steps:
- name: User Authentication
id: auth
action: http
with:
url: "{{env.API_URL}}/auth/login"
method: POST
body: |
{
"username": "{{env.USERNAME}}",
"password": "{{env.PASSWORD}}"
}
test: res.status == 200
outputs:
access_token: res.json.access_token
refresh_token: res.json.refresh_token
user_id: res.json.user.id
expires_at: res.json.expires_at
user_roles: res.json.user.roles
Output Data Types
Outputs can contain various data types:
- name: Comprehensive Data Collection
id: data-collection
action: http
with:
url: "{{env.API_URL}}/comprehensive-data"
test: res.status == 200
outputs:
# Simple values
user_count: res.json.stats.user_count
server_version: res.json.version
is_healthy: res.json.health.status == "healthy"
# Complex objects
user_profile: res.json.user
configuration: res.json.config
metrics: res.json.metrics
# Arrays
active_users: res.json.users.filter(u -> u.active == true)
error_codes: res.json.errors.map(e -> e.code)
# Computed values
success_rate: (res.json.successful_requests / res.json.total_requests) * 100
avg_response_time: res.json.response_times.sum() / res.json.response_times.length
# Response metadata
response_time: res.time
response_size: res.body_size
content_type: res.headers["content-type"]
Output Scoping
Outputs are scoped to their containing step and can be referenced by ID:
steps:
- name: Database Setup
id: db-setup
action: http
with:
url: "{{env.DB_API}}/initialize"
outputs:
db_session_id: res.json.session_id
db_host: res.json.host
db_port: res.json.port
- name: Application Test
id: app-test
action: http
with:
url: "{{env.APP_URL}}/test"
headers:
X-DB-Session: "{{outputs.db-setup.db_session_id}}"
X-DB-Host: "{{outputs.db-setup.db_host}}"
outputs:
test_result: res.json.result
test_duration: res.time
- name: Performance Analysis
echo: |
Performance Analysis:
Database: {{outputs.db-setup.db_host}}:{{outputs.db-setup.db_port}}
Test Result: {{outputs.app-test.test_result}}
Test Duration: {{outputs.app-test.test_duration}}ms
Cross-Job Data Flow
Data can flow between jobs through job-level outputs and dependencies.
Job Dependencies and Data Sharing
jobs:
initialization:
name: System Initialization
steps:
- name: Create Test Environment
id: env-setup
action: http
with:
url: "{{env.SETUP_API}}/create-environment"
method: POST
body: |
{
"environment_name": "test_{{random_str(8)}}",
"configuration": "standard"
}
test: res.status == 201
outputs:
environment_id: res.json.environment.id
environment_name: res.json.environment.name
database_url: res.json.environment.database_url
api_endpoint: res.json.environment.api_endpoint
api-tests:
name: API Testing Suite
needs: [initialization] # Wait for initialization to complete
steps:
- name: Test User API
action: http
with:
url: "{{outputs.initialization.api_endpoint}}/users"
headers:
X-Environment: "{{outputs.initialization.environment_id}}"
test: res.status == 200
outputs:
user_count: res.json.total_users
api_response_time: res.time
- name: Test Database Connectivity
action: http
with:
url: "{{outputs.initialization.database_url}}/ping"
test: res.status == 200
outputs:
db_response_time: res.time
reporting:
name: Test Reporting
needs: [initialization, api-tests] # Wait for both jobs
steps:
- name: Generate Test Report
echo: |
Test Execution Report
=====================
Environment: {{outputs.initialization.environment_name}}
Environment ID: {{outputs.initialization.environment_id}}
API Tests:
- User Count: {{outputs.api-tests.user_count}}
- API Response Time: {{outputs.api-tests.api_response_time}}ms
Database Tests:
- DB Response Time: {{outputs.api-tests.db_response_time}}ms
Overall Status: All tests completed successfully
cleanup:
name: Environment Cleanup
needs: [reporting] # Run after reporting completes
steps:
- name: Destroy Test Environment
action: http
with:
url: "{{env.SETUP_API}}/environments/{{outputs.initialization.environment_id}}"
method: DELETE
test: res.status == 204
Cross-Job Output References
Access outputs from other jobs using the outputs.job-name.output-name
syntax:
jobs:
data-collection:
steps:
- name: Collect User Data
outputs:
total_users: res.json.count
active_users: res.json.active_count
analysis:
needs: [data-collection]
steps:
- name: Analyze User Metrics
echo: |
User Analysis:
Total Users: {{outputs.data-collection.total_users}}
Active Users: {{outputs.data-collection.active_users}}
Activity Rate: {{(outputs.data-collection.active_users / outputs.data-collection.total_users) * 100}}%
Advanced Data Flow Patterns
Data Transformation Chains
Transform data through multiple steps:
jobs:
data-processing-pipeline:
name: Data Processing Pipeline
steps:
- name: Fetch Raw Data
id: raw-data
action: http
with:
url: "{{env.DATA_API}}/raw-data"
outputs:
raw_records: res.json.records
total_count: res.json.total
fetch_time: res.time
- name: Filter Data
id: filtered-data
echo: "Filtering data..."
outputs:
# Filter active records
active_records: "{{outputs.raw-data.raw_records.filter(r -> r.status == 'active')}}"
active_count: "{{outputs.raw-data.raw_records.filter(r -> r.status == 'active').length}}"
- name: Aggregate Data
id: aggregated-data
echo: "Aggregating data..."
outputs:
# Group by category and calculate metrics
categories: "{{outputs.filtered-data.active_records.groupBy(r -> r.category)}}"
avg_score: "{{outputs.filtered-data.active_records.map(r -> r.score).sum() / outputs.filtered-data.active_count}}"
- name: Generate Summary
echo: |
Data Processing Summary:
Raw Records: {{outputs.raw-data.total_count}}
Active Records: {{outputs.filtered-data.active_count}}
Processing Rate: {{(outputs.filtered-data.active_count / outputs.raw-data.total_count) * 100}}%
Average Score: {{outputs.aggregated-data.avg_score}}
Fetch Time: {{outputs.raw-data.fetch_time}}ms
Conditional Data Flow
Control data flow based on conditions:
jobs:
adaptive-processing:
steps:
- name: Assess Data Quality
id: quality-check
action: http
with:
url: "{{env.API_URL}}/data-quality"
outputs:
quality_score: res.json.quality_score
has_errors: res.json.error_count > 0
record_count: res.json.record_count
- name: Standard Processing
if: outputs.quality-check.quality_score >= 0.8 && !outputs.quality-check.has_errors
id: standard-processing
action: http
with:
url: "{{env.PROCESSING_API}}/standard"
method: POST
body: |
{
"record_count": {{outputs.quality-check.record_count}},
"quality_mode": "standard"
}
outputs:
processing_result: res.json.result
processing_time: res.time
- name: Enhanced Processing
if: outputs.quality-check.quality_score < 0.8 || outputs.quality-check.has_errors
id: enhanced-processing
action: http
with:
url: "{{env.PROCESSING_API}}/enhanced"
method: POST
body: |
{
"record_count": {{outputs.quality-check.record_count}},
"quality_mode": "enhanced",
"error_correction": true
}
outputs:
processing_result: res.json.result
processing_time: res.time
corrections_applied: res.json.corrections
- name: Processing Summary
echo: |
Data Processing Complete:
Quality Score: {{outputs.quality-check.quality_score}}
Processing Mode: {{outputs.quality-check.quality_score >= 0.8 ? "Standard" : "Enhanced"}}
{{outputs.standard-processing ? "Standard Processing Time: " + outputs.standard-processing.processing_time + "ms" : ""}}
{{outputs.enhanced-processing ? "Enhanced Processing Time: " + outputs.enhanced-processing.processing_time + "ms" : ""}}
{{outputs.enhanced-processing ? "Corrections Applied: " + outputs.enhanced-processing.corrections_applied : ""}}
Data Accumulation Patterns
Collect data from multiple sources:
jobs:
multi-source-data-collection:
steps:
- name: Source A Data
id: source-a
action: http
with:
url: "{{env.SOURCE_A_URL}}/data"
outputs:
source_a_count: res.json.count
source_a_data: res.json.data
source_a_time: res.time
- name: Source B Data
id: source-b
action: http
with:
url: "{{env.SOURCE_B_URL}}/data"
outputs:
source_b_count: res.json.count
source_b_data: res.json.data
source_b_time: res.time
- name: Source C Data
id: source-c
action: http
with:
url: "{{env.SOURCE_C_URL}}/data"
outputs:
source_c_count: res.json.count
source_c_data: res.json.data
source_c_time: res.time
- name: Aggregate All Sources
echo: |
Multi-Source Data Summary:
Source A: {{outputs.source-a.source_a_count}} records ({{outputs.source-a.source_a_time}}ms)
Source B: {{outputs.source-b.source_b_count}} records ({{outputs.source-b.source_b_time}}ms)
Source C: {{outputs.source-c.source_c_count}} records ({{outputs.source-c.source_c_time}}ms)
Total Records: {{outputs.source-a.source_a_count + outputs.source-b.source_b_count + outputs.source-c.source_c_count}}
Average Response Time: {{(outputs.source-a.source_a_time + outputs.source-b.source_b_time + outputs.source-c.source_c_time) / 3}}ms
Fastest Source: {{
outputs.source-a.source_a_time <= outputs.source-b.source_b_time && outputs.source-a.source_a_time <= outputs.source-c.source_c_time ? "Source A" :
outputs.source-b.source_b_time <= outputs.source-c.source_c_time ? "Source B" : "Source C"
}}
Data Validation and Quality
Output Validation
Ensure data quality in outputs:
- name: Data Collection with Validation
id: validated-data
action: http
with:
url: "{{env.API_URL}}/user-data"
test: |
res.status == 200 &&
res.json.users != null &&
res.json.users.length > 0 &&
res.json.users.all(u -> u.id != null && u.email != null)
outputs:
# Validated outputs
user_count: res.json.users.length
valid_users: res.json.users.filter(u -> u.id != null && u.email != null)
admin_users: res.json.users.filter(u -> u.role == "admin")
# Data quality metrics
data_completeness: res.json.users.filter(u -> u.id != null && u.email != null).length / res.json.users.length
has_admin_users: res.json.users.any(u -> u.role == "admin")
# Response metadata
data_freshness: res.headers["last-modified"]
cache_status: res.headers["x-cache-status"]
Data Sanitization
Clean and sanitize data before use:
- name: Sanitize User Input
id: sanitized-input
action: http
with:
url: "{{env.API_URL}}/user-input"
outputs:
# Raw data
raw_input: res.json.input
# Sanitized data
clean_email: res.json.input.email.lower().trim()
clean_name: res.json.input.name.trim()
safe_description: res.json.input.description.substring(0, 500) # Limit length
# Validation flags
email_valid: res.json.input.email.matches("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}")
name_valid: res.json.input.name.length >= 2 && res.json.input.name.length <= 50
Performance Considerations
Efficient Data Access
Optimize data access patterns:
# Good: Direct property access
outputs:
user_id: res.json.user.id
user_name: res.json.user.name
# Good: Single computation with reuse
outputs:
active_users: res.json.users.filter(u -> u.active == true)
active_user_count: res.json.users.filter(u -> u.active == true).length
# Avoid: Repeated expensive computations
# outputs:
# user_count: res.json.users.filter(u -> expensive_validation(u)).length
# user_list: res.json.users.filter(u -> expensive_validation(u))
Memory Management
Be mindful of large data sets:
# Good: Extract essential data only
outputs:
user_ids: res.json.users.map(u -> u.id)
user_count: res.json.users.length
first_user: res.json.users[0]
# Avoid: Storing large objects unnecessarily
# outputs:
# all_user_data: res.json.users # Could be very large
# complete_response: res.json # Entire response body
Selective Data Extraction
Extract only needed data:
- name: Efficient Data Extraction
action: http
with:
url: "{{env.API_URL}}/large-dataset"
outputs:
# Extract summary information only
record_count: res.json.metadata.total_records
last_updated: res.json.metadata.last_updated
status: res.json.metadata.status
# Extract specific records by criteria
critical_items: res.json.data.filter(item -> item.priority == "critical")
error_items: res.json.data.filter(item -> item.status == "error")
# Compute aggregates
avg_score: res.json.data.map(item -> item.score).sum() / res.json.data.length
max_score: res.json.data.map(item -> item.score).max()
# Don't store the entire dataset
# full_dataset: res.json.data # Avoid this for large datasets
Best Practices
1. Clear Output Naming
Use descriptive names for outputs:
# Good: Descriptive names
outputs:
user_authentication_token: res.json.access_token
session_expiry_timestamp: res.json.expires_at
user_permission_level: res.json.user.role
# Avoid: Generic names
outputs:
token: res.json.access_token
time: res.json.expires_at
level: res.json.user.role
2. Type-Consistent Outputs
Maintain consistent data types:
# Good: Consistent types
outputs:
user_count: res.json.users.length # Always number
is_admin: res.json.user.role == "admin" # Always boolean
user_email: res.json.user.email || "" # Always string (with default)
# Avoid: Inconsistent types
outputs:
user_count: res.json.users.length || "unknown" # Number or string
3. Error-Safe Data Access
Handle potential null/undefined values:
# Good: Safe data access
outputs:
user_id: res.json.user && res.json.user.id ? res.json.user.id : null
email_verified: res.json.user && res.json.user.email_verified == true
profile_complete: res.json.user && res.json.user.profile && res.json.user.profile.complete == true
# Good: Using safe navigation
test: res.json.user?.id != null && res.json.user?.email != null
4. Document Data Dependencies
Document what data flows where:
jobs:
user-setup:
name: User Account Setup
steps:
- name: Create User Account
# Produces: user_id, username, email
outputs:
user_id: res.json.user.id
username: res.json.user.username
email: res.json.user.email
user-verification:
name: User Account Verification
needs: [user-setup]
steps:
- name: Send Verification Email
# Consumes: user_id, email from user-setup
action: smtp
with:
to: ["{{outputs.user-setup.email}}"]
subject: "Verify your account"
body: "Click here to verify user {{outputs.user-setup.user_id}}"
What's Next?
Now that you understand data flow, explore:
- Testing and Assertions - Learn validation techniques
- Error Handling - Handle data flow failures gracefully
- How-tos - See practical data flow patterns
Data flow is the circulatory system of your Probe workflows. Master these patterns to build sophisticated, data-driven automation processes.