Skip to main content

Real Usage Examples

This section showcases complete, real-world Paral workflows that demonstrate how to combine tasks, directives, memory management, and control flow to solve common automation challenges.

Complete CI/CD Pipeline

# Configuration
environments = ["staging", "production"]
services = ["frontend", "backend", "worker"]
notification_channels = ["#deployments", "#alerts"]

# Initialize build context
task setup_build {
-> @buf["build_id"] << git rev-parse --short HEAD
-> @buf["build_timestamp"] << date +%s
-> @buf["branch"] << git rev-parse --abbrev-ref HEAD
-> @printf("Starting build %s from branch %s", @buf("build_id"), @buf("branch"))
}

# Parallel testing phase
@depend setup_build
task run_unit_tests {
-> @printf("Running unit tests for build %s", @buf("build_id"))
-> npm test -- --coverage
-> @buf["unit_tests"] << echo "passed"
}

@depend setup_build
task run_integration_tests {
-> @printf("Running integration tests")
-> docker-compose -f docker-compose.test.yml up --abort-on-container-exit
-> @buf["integration_tests"] << echo "passed"
}

@depend setup_build
task security_scan {
-> @printf("Running security scan")
-> npm audit --audit-level=moderate
-> trivy fs . --exit-code 1
-> @buf["security_scan"] << echo "passed"
}

@depend setup_build
task lint_and_format {
-> @printf("Running linting and format checks")
-> npm run lint
-> npm run format:check
-> @buf["lint_check"] << echo "passed"
}

# Build phase - wait for all tests to pass
@wait @is(@buf("unit_tests"), "passed") @is(@buf("integration_tests"), "passed") @is(@buf("security_scan"), "passed") @is(@buf("lint_check"), "passed")
@for services
task build_service {
-> service=@value
-> @printf("Building service %d: %s", @key, $service)
-> docker build -t @sprintf("%s:%s", $service, @buf("build_id")) ./services/$service
-> docker push @sprintf("myregistry/%s:%s", $service, @buf("build_id"))
-> @buf[@sprintf("%s_built", $service)] << echo "true"
}

# Deployment phase
@for environments
@wait @is(@buf("frontend_built"), "true") @is(@buf("backend_built"), "true") @is(@buf("worker_built"), "true")
task deploy_to_environment {
-> env=@value
-> @printf("Deploying to environment: %s", $env)

# Load environment-specific configuration
-> @envfile @sprintf("./config/%s.env", $env)

# Deploy each service to this environment
-> @for services {
service=@value
@printf("Deploying %s to %s", $service, $env)

# Production requires manual approval
@if @and(@is($env, "production"), @is(@buf("branch"), "main")) {
@printf("Production deployment ready for %s", $service)
@buf[@sprintf("prod_ready_%s", $service)] << echo "true"
} else {
kubectl set image deployment/$service $service=myregistry/@sprintf("%s:%s", $service, @buf("build_id")) -n $env
@buf[@sprintf("%s_deployed_%s", $service, $env)] << echo "true"
}
}
}

# Manual production deployment
@manual
@args service
@description "Deploy specific service to production"
@wait @is(@buf(@sprintf("prod_ready_%s", @arg("service"))), "true")
task deploy_production {
-> service=@arg("service")
-> @printf("Manually deploying %s to production", $service)
-> kubectl set image deployment/$service $service=myregistry/@sprintf("%s:%s", $service, @buf("build_id")) -n production
-> @buf[@sprintf("%s_deployed_production", $service)] << echo "true"
}

# Post-deployment verification
@defer
task verify_deployments {
-> @printf("Verifying all deployments for build %s", @buf("build_id"))
-> @for environments {
env=@value
@for services {
service=@value
@if @is(@buf(@sprintf("%s_deployed_%s", $service, $env)), "true") {
@printf("Verifying %s in %s", $service, $env)
kubectl rollout status deployment/$service -n $env --timeout=300s
}
}
}
}

# Cleanup and notifications
@defer
task cleanup_and_notify {
-> docker system prune -f
-> @printf("Build %s completed at %s", @buf("build_id"), $(date))
-> slack-cli send @sprintf("🚀 Build %s deployed successfully", @buf("build_id")) --channel=#deployments
}

Database Management System

# Configuration
databases = ["users", "orders", "products", "analytics", "audit"]
backup_retention_days = 7
replication_lag_threshold = 10

# Daily backup routine
@schedule "0 2 * * *" # Daily at 2 AM
@for databases
task backup_database {
-> db=@value
-> @printf("Starting backup for database: %s", $db)
-> backup_file=@sprintf("backup-%s-%s.sql.gz", $db, $(date +%Y%m%d))

-> pg_dump $db | gzip > $backup_file
-> @if @is($?, "0") {
@printf("✓ Backup successful: %s", $backup_file)
@buf[@sprintf("%s_backup_status", $db)] << echo "success"
@buf[@sprintf("%s_backup_file", $db)] << echo $backup_file
} else {
@printf("✗ Backup failed for %s", $db)
@buf[@sprintf("%s_backup_status", $db)] << echo "failed"
}
}

# Cleanup old backups
@schedule "0 3 * * *" # Daily at 3 AM
@depend backup_database
task cleanup_old_backups {
-> @printf("Cleaning up backups older than %d days", backup_retention_days)
-> find ./backups -name "*.sql.gz" -mtime @sprintf("+%d", backup_retention_days) -delete
-> @printf("Old backup cleanup completed")
}

# Monitor replication lag
@schedule "*/5 * * * *" # Every 5 minutes
@for databases
task monitor_replication {
-> db=@value
-> lag_seconds=$(psql $db -t -c "SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))")
-> @buf[@sprintf("%s_replication_lag", $db)] << echo $lag_seconds

-> @if @is($lag_seconds, "gt", replication_lag_threshold) {
@printf("⚠️ High replication lag for %s: %d seconds", $db, $lag_seconds)
@buf[@sprintf("%s_lag_alert", $db)] << echo "true"
}
}

# Send replication alerts
@wait @contains(@buf, "_lag_alert")
task send_replication_alerts {
-> @for databases {
db=@value
@if @is(@buf(@sprintf("%s_lag_alert", $db)), "true") {
slack-cli send @sprintf("🔥 Database %s has high replication lag: %s seconds", $db, @buf(@sprintf("%s_replication_lag", $db))) --channel=#database-alerts
}
}
}

# Manual database operations
@manual
@args database
@args operation
@description "Perform maintenance operation on specific database"
task database_maintenance {
-> db=@arg("database")
-> operation=@arg("operation")
-> @printf("Performing %s on database %s", $operation, $db)

-> @if @is($operation, "vacuum") {
vacuumdb $db --analyze --verbose
}

-> @if @is($operation, "reindex") {
reindexdb $db --verbose
}

-> @if @is($operation, "stats") {
psql $db -c "SELECT schemaname,tablename,n_tup_ins,n_tup_upd,n_tup_del FROM pg_stat_user_tables;"
}
}

# Emergency restore procedure
@manual
@args target_database
@args backup_date
@description "Restore database from specific backup date"
task emergency_restore {
-> db=@arg("target_database")
-> backup_date=@arg("backup_date")
-> backup_file=@sprintf("backup-%s-%s.sql.gz", $db, $backup_date)

-> @if @not(@file_exists($backup_file)) {
@printf("❌ Backup file not found: %s", $backup_file)
exit 1
}

-> @printf("🚨 EMERGENCY RESTORE: %s from %s", $db, $backup_file)
-> @printf("This will overwrite the current database. Confirm? (y/N)")
-> read confirmation

-> @if @is($confirmation, "y") {
dropdb $db
createdb $db
gunzip -c $backup_file | psql $db
@printf("✅ Restore completed for %s", $db)
} else {
@printf("Restore cancelled")
}
}

Multi-Cloud Infrastructure Management

# Configuration
cloud_providers = ["aws", "gcp", "azure"]
regions = ["us-east-1", "us-west-2", "eu-central-1"]
instance_types = ["small", "medium", "large"]
scaling_thresholds = [70, 85, 95]

# Health check across all providers
@schedule "*/2 * * * *" # Every 2 minutes
@for cloud_providers
task monitor_cloud_health {
-> provider=@value
-> @printf("Monitoring %s infrastructure", $provider)

-> @if @is($provider, "aws") {
aws_status=$(aws ec2 describe-instance-status --query 'InstanceStatuses[?InstanceState.Name==`running`]' --output text | wc -l)
@buf["aws_healthy_instances"] << echo $aws_status
}

-> @if @is($provider, "gcp") {
gcp_status=$(gcloud compute instances list --filter="status:RUNNING" --format="value(name)" | wc -l)
@buf["gcp_healthy_instances"] << echo $gcp_status
}

-> @if @is($provider, "azure") {
azure_status=$(az vm list --show-details --query "[?powerState=='VM running']" --output tsv | wc -l)
@buf["azure_healthy_instances"] << echo $azure_status
}
}

# Dynamic scaling based on load
@for cloud_providers
@schedule "*/10 * * * *" # Every 10 minutes
task evaluate_scaling {
-> provider=@value
-> current_load=$(kubectl top nodes --no-headers | awk '{sum+=$3} END {print sum/NR}' | sed 's/%//')
-> @buf[@sprintf("%s_current_load", $provider)] << echo $current_load

-> @for scaling_thresholds {
threshold=@value
@if @is($current_load, "gt", $threshold) {
@printf("Load %d%% exceeds threshold %d%% for %s", $current_load, $threshold, $provider)

@if @is($threshold, "70") {
@buf[@sprintf("%s_scale_action", $provider)] << echo "scale_up_small"
}
@if @is($threshold, "85") {
@buf[@sprintf("%s_scale_action", $provider)] << echo "scale_up_medium"
}
@if @is($threshold, "95") {
@buf[@sprintf("%s_scale_action", $provider)] << echo "emergency_scale"
}
}
}
}

# Execute scaling actions
@for cloud_providers
@wait @isnot(@buf(@sprintf("%s_scale_action", @value)), "")
task execute_scaling {
-> provider=@value
-> action=@buf(@sprintf("%s_scale_action", $provider))
-> @printf("Executing scaling action %s for %s", $action, $provider)

-> @if @is($action, "scale_up_small") {
@if @is($provider, "aws") {
aws ec2 run-instances --image-id ami-12345 --instance-type t3.small --count 2
}
@if @is($provider, "gcp") {
gcloud compute instances create instance-$(date +%s) --machine-type=e2-small --zone=us-central1-a
}
}

-> @if @is($action, "emergency_scale") {
slack-cli send @sprintf("🚨 Emergency scaling triggered for %s", $provider) --channel=#ops
# Implement emergency scaling logic
}
}

E-commerce Order Processing

# Configuration
order_statuses = ["pending", "processing", "shipped", "delivered"]
payment_providers = ["stripe", "paypal", "square"]
shipping_carriers = ["ups", "fedex", "dhl"]

# Process pending orders
@schedule "*/1 * * * *" # Every minute
task process_pending_orders {
-> pending_orders=$(psql orders -t -c "SELECT id FROM orders WHERE status='pending' LIMIT 50")
-> @buf["pending_count"] << echo $pending_orders | wc -w

-> @if @is(@buf("pending_count"), "gt", 0) {
@printf("Processing %d pending orders", @buf("pending_count"))
@buf["orders_to_process"] << echo $pending_orders
}
}

# Validate payment for each order
@wait @isnot(@buf("orders_to_process"), "")
@for @split(@buf("orders_to_process"), " ")
task validate_payment {
-> order_id=@value
-> @printf("Validating payment for order %s", $order_id)
-> payment_provider=$(psql orders -t -c @sprintf("SELECT payment_provider FROM orders WHERE id=%s", $order_id))
-> @stash[@sprintf("order_%s_provider", $order_id)] << echo $payment_provider

-> @if @is($payment_provider, "stripe") {
stripe_status=$(curl -s "https://api.stripe.com/v1/payment_intents/$order_id" -H "Authorization: Bearer $STRIPE_KEY" | jq -r '.status')
@stash[@sprintf("order_%s_payment_status", $order_id)] << echo $stripe_status
}

-> @if @is($payment_provider, "paypal") {
paypal_status=$(curl -s @sprintf("https://api.paypal.com/v2/payments/payment/%s", $order_id) -H "Authorization: Bearer $PAYPAL_TOKEN" | jq -r '.state')
@stash[@sprintf("order_%s_payment_status", $order_id)] << echo $paypal_status
}
}

# Process validated orders
@depend validate_payment
@for @split(@buf("orders_to_process"), " ")
task process_validated_order {
-> order_id=@value
-> payment_status=@stash(@sprintf("order_%s_payment_status", $order_id))

-> @if @is($payment_status, "succeeded") {
@printf("Payment confirmed for order %s", $order_id)
psql orders -c @sprintf("UPDATE orders SET status='processing' WHERE id=%s", $order_id)
@stash[@sprintf("order_%s_ready", $order_id)] << echo "true"
} else {
@printf("Payment failed for order %s: %s", $order_id, $payment_status)
psql orders -c @sprintf("UPDATE orders SET status='payment_failed' WHERE id=%s", $order_id)
}
}

# Assign shipping carriers
@depend process_validated_order
@for @split(@buf("orders_to_process"), " ")
task assign_shipping {
-> order_id=@value

-> @if @is(@stash(@sprintf("order_%s_ready", $order_id)), "true") {
# Get order details
order_weight=$(psql orders -t -c @sprintf("SELECT weight FROM orders WHERE id=%s", $order_id))
order_region=$(psql orders -t -c @sprintf("SELECT shipping_region FROM orders WHERE id=%s", $order_id))

# Assign carrier based on weight and region
@if @and(@is($order_weight, "lt", 5), @is($order_region, "domestic")) {
carrier="ups"
}
@if @and(@is($order_weight, "gte", 5), @is($order_region, "domestic")) {
carrier="fedex"
}
@if @is($order_region, "international") {
carrier="dhl"
}

@printf("Assigned carrier %s for order %s", $carrier, $order_id)
psql orders -c @sprintf("UPDATE orders SET carrier='%s', status='ready_to_ship' WHERE id=%s", $carrier, $order_id)
}
}

# Generate shipping labels
@schedule "0 */2 * * *" # Every 2 hours
task generate_shipping_labels {
-> ready_orders=$(psql orders -t -c "SELECT id FROM orders WHERE status='ready_to_ship'")

-> @for @split($ready_orders, " ") {
order_id=@value
carrier=$(psql orders -t -c @sprintf("SELECT carrier FROM orders WHERE id=%s", $order_id))

@printf("Generating label for order %s via %s", $order_id, $carrier)

@if @is($carrier, "ups") {
python3 scripts/generate_ups_label.py --order-id=$order_id
}
@if @is($carrier, "fedex") {
python3 scripts/generate_fedex_label.py --order-id=$order_id
}
@if @is($carrier, "dhl") {
python3 scripts/generate_dhl_label.py --order-id=$order_id
}

psql orders -c @sprintf("UPDATE orders SET status='shipped' WHERE id=%s", $order_id)
}
}

Log Analysis and Monitoring

# Configuration
log_sources = ["/var/log/nginx/access.log", "/var/log/app/application.log", "/var/log/db/postgresql.log"]
alert_thresholds = [100, 50, 10] # errors per minute
time_windows = ["1m", "5m", "15m"]

# Real-time log monitoring
@schedule "*/1 * * * *" # Every minute
@for log_sources
task analyze_logs {
-> log_file=@value
-> @printf("Analyzing log file %d: %s", @key, $log_file)

# Extract recent errors
-> recent_errors=$(tail -n 1000 $log_file | grep -i error | wc -l)
-> @buf[@sprintf("errors_%d", @key)] << echo $recent_errors

# Check against thresholds
-> threshold=@get(alert_thresholds, @key)
-> @if @is($recent_errors, "gt", $threshold) {
@printf("🚨 High error rate in %s: %d errors", $log_file, $recent_errors)
@buf[@sprintf("alert_%d", @key)] << echo "true"

# Extract error patterns
error_patterns=$(tail -n 1000 $log_file | grep -i error | awk '{print $NF}' | sort | uniq -c | sort -nr | head -5)
@buf[@sprintf("error_patterns_%d", @key)] << echo "$error_patterns"
}
}

# Generate detailed reports
@for time_windows
@schedule @sprintf("*/%s * * * *", @value)
task generate_log_report {
-> window=@value
-> @printf("Generating %s log analysis report", $window)

-> @for log_sources {
log_file=@value
log_index=@key

# Time-based analysis
@if @is($window, "1m") {
tail -n 100 $log_file > @sprintf("recent_%d.log", $log_index)
}
@if @is($window, "5m") {
tail -n 500 $log_file > @sprintf("medium_%d.log", $log_index)
}
@if @is($window, "15m") {
tail -n 1500 $log_file > @sprintf("long_%d.log", $log_index)
}

# Generate statistics
python3 scripts/analyze_logs.py @sprintf("--input=%s_%d.log", $window, $log_index) @sprintf("--output=stats_%s_%d.json", $window, $log_index)
}

# Combine all statistics
-> python3 scripts/combine_stats.py @sprintf("--pattern=stats_%s_*.json", $window) @sprintf("--output=report_%s.json", $window)
}

# Anomaly detection
@schedule "*/5 * * * *" # Every 5 minutes
task detect_anomalies {
-> @printf("Running anomaly detection")
-> current_hour=$(date +%H)
-> current_minute=$(date +%M)

# Different thresholds for different times
-> @if @and(@is($current_hour, "gte", 9), @is($current_hour, "lte", 17)) {
# Business hours - higher thresholds
error_threshold=200
response_threshold=500
} else {
# Off hours - lower thresholds
error_threshold=50
response_threshold=200
}

-> total_errors=0
-> @for log_sources {
log_errors=@buf(@sprintf("errors_%d", @key))
total_errors=$(($total_errors + $log_errors))
}

-> @if @is($total_errors, "gt", $error_threshold) {
@printf("🚨 Anomaly detected: %d total errors exceeds threshold %d", $total_errors, $error_threshold)
python3 scripts/anomaly_response.py --total-errors=$total_errors --threshold=$error_threshold
slack-cli send @sprintf("Anomaly Alert: %d errors detected", $total_errors) --channel=#ops
}
}

Microservices Health Management

# Service configuration
services = ["auth-service", "user-service", "order-service", "payment-service", "notification-service"]
health_endpoints = ["/health", "/ready", "/live"]
environments = ["dev", "staging", "prod"]

# Comprehensive health checks
@schedule "*/30 * * * * *" # Every 30 seconds
@for services
task health_check_service {
-> service=@value
-> service_index=@key
-> @printf("Health checking service %d: %s", $service_index, $service)

-> @for environments {
env=@value
@for health_endpoints {
endpoint=@value
url=@sprintf("http://%s.%s.svc.cluster.local:8080%s", $service, $env, $endpoint)

response=$(curl -s -o /dev/null -w "%{http_code}:%{time_total}" $url)
status_code=$(echo $response | cut -d: -f1)
response_time=$(echo $response | cut -d: -f2)

@buf[@sprintf("%s_%s_%s_status", $service, $env, $endpoint)] << echo $status_code
@buf[@sprintf("%s_%s_%s_time", $service, $env, $endpoint)] << echo $response_time

@if @isnot($status_code, "200") {
@printf("❌ %s/%s%s failed: %s", $service, $env, $endpoint, $status_code)
@buf[@sprintf("%s_%s_unhealthy", $service, $env)] << echo "true"
}

@if @is($response_time, "gt", 2.0) {
@printf("⚠️ %s/%s%s slow response: %.2fs", $service, $env, $endpoint, $response_time)
@buf[@sprintf("%s_%s_slow", $service, $env)] << echo "true"
}
}
}
}

# Auto-healing unhealthy services
@wait @contains(@buf, "_unhealthy")
@for services
task auto_heal_service {
-> service=@value
-> @printf("Checking if %s needs healing", $service)

-> @for environments {
env=@value
@if @is(@buf(@sprintf("%s_%s_unhealthy", $service, $env)), "true") {
@printf("🔧 Auto-healing %s in %s", $service, $env)

# Restart the service
kubectl rollout restart deployment/$service -n $env

# Wait for rollout to complete
kubectl rollout status deployment/$service -n $env --timeout=300s

# Clear the unhealthy flag
@buf[@sprintf("%s_%s_unhealthy", $service, $env)] << echo ""

# Log healing action
@buf[@sprintf("healing_log_%s", $(date +%s))] << @sprintf("Auto-healed %s in %s", $service, $env)
}
}
}

# Performance optimization for slow services
@wait @contains(@buf, "_slow")
@for services
task optimize_slow_service {
-> service=@value

-> @for environments {
env=@value
@if @is(@buf(@sprintf("%s_%s_slow", $service, $env)), "true") {
@printf("🚀 Optimizing slow service %s in %s", $service, $env)

# Scale up replicas temporarily
current_replicas=$(kubectl get deployment $service -n $env -o jsonpath='{.spec.replicas}')
new_replicas=$(($current_replicas + 2))

kubectl scale deployment $service --replicas=$new_replicas -n $env
@stash[@sprintf("%s_%s_original_replicas", $service, $env)] << echo $current_replicas

# Schedule scale-down after 30 minutes
@defer
@schedule "*/30 * * * *"
task scale_down_after_optimization {
-> original_replicas=@stash(@sprintf("%s_%s_original_replicas", $service, $env))
-> kubectl scale deployment $service --replicas=$original_replicas -n $env
-> @printf("Scaled %s back to %d replicas in %s", $service, $original_replicas, $env)
}
}
}
}

# Manual service operations
@manual
@args service_name
@args environment
@args operation
@description "Perform operations on specific service"
task service_operation {
-> service=@arg("service_name")
-> env=@arg("environment")
-> operation=@arg("operation")

-> @if @not(@contains(services, $service)) {
@printf("❌ Invalid service: %s", $service)
exit 1
}

-> @if @not(@contains(environments, $env)) {
@printf("❌ Invalid environment: %s", $env)
exit 1
}

-> @printf("Performing %s on %s in %s", $operation, $service, $env)

-> @if @is($operation, "restart") {
kubectl rollout restart deployment/$service -n $env
}

-> @if @is($operation, "scale") {
@printf("Current replicas for %s: ", $service)
kubectl get deployment $service -n $env -o jsonpath='{.spec.replicas}'
@printf("Enter new replica count: ")
read new_replicas
kubectl scale deployment $service --replicas=$new_replicas -n $env
}

-> @if @is($operation, "logs") {
kubectl logs deployment/$service -n $env --tail=100 --follow
}
}