Skip to main content

Process Enrichment Queue

Background worker function invoked on a schedule (cron) that processes two types of enrichment tasks from a shared queue: transcript fetching (from Ultravox) and AI call analysis (via OpenRouter/GPT-4o-mini). Each invocation processes up to 10 transcript items and 5 analysis items.

Endpoint

PropertyValue
Function Nameprocess-enrichment-queue
HTTP MethodAny (no method restriction)
AuthenticationNone (cron/internal invocation). The source code explicitly notes "Auth check removed for cron compatibility."
Required RoleNone (cron)

Request

Headers

No headers are required. This function is designed to be invoked by a Supabase cron scheduler or internal trigger without user authentication.

Body

No request body is expected. The function discovers work by claiming items from the enrichment queue.

Response

Success (200)

{
"transcripts": {
"processed": 8,
"failed": 1
},
"analysis": {
"processed": 4,
"failed": 0
}
}

Error Responses

StatusCondition
500Unhandled top-level exception (e.g., missing environment variables)

Behavior

This function runs two processing pipelines sequentially: transcript fetching followed by call analysis.

Phase 1: Transcript Fetching

  1. Claim batch: Calls claim_enrichment_queue_batch RPC with p_job_type: 'transcript' and p_batch_size: 10.
  2. Group by agency: Fetches each claimed item's associated call from the calls table to determine the agency_id, then groups items by agency to minimize API key lookups.
  3. Per-agency processing:
    • Retrieves the encrypted Ultravox API key from the agencies table.
    • Decrypts it via decrypt_api_key RPC.
    • For each queued transcript item:
      • Updates the call's transcript_status to fetching.
      • Calls Ultravox GET /api/calls/{call_id}/messages to retrieve the transcript.
      • Checks whether a recording exists via GET /api/calls/{call_id}/recording (manual redirect check).
      • Stores the transcript via update_call_transcript RPC.
      • Updates the has_recording flag on the calls table.
      • Marks the queue item as complete via complete_enrichment_queue_item RPC.
    • On failure, calls fail_enrichment_queue_item and fail_call_transcript RPCs.
    • Adds a 200ms delay between items to avoid rate limiting.

Phase 2: AI Call Analysis

  1. Claim batch: Calls claim_enrichment_queue_batch RPC with p_job_type: 'analysis' and p_batch_size: 5.
  2. Per-item processing:
    • Fetches the call's transcript, short_summary, campaign_id, and associated campaign type from the database.
    • Updates the call's analysis_status to processing.
    • Sends the transcript to OpenRouter (GPT-4o-mini) for AI analysis, with a system prompt tailored to the campaign type (sdr or cs).
    • Stores the analysis result via update_call_analysis RPC.
    • Marks the queue item as complete.
    • On failure, calls fail_enrichment_queue_item and fail_call_analysis RPCs.
    • Adds a 500ms delay between items.

AI Analysis Output Shape

The AI analysis produces a structured JSON object with the following fields:

{
"summary": "Brief 2-3 sentence summary",
"sentiment": {
"overall": "positive | neutral | negative",
"start": "positive | neutral | negative",
"end": "positive | neutral | negative",
"progression": "improving | declining | stable"
},
"metrics": {
"agent_talk_ratio": 0.65,
"customer_talk_ratio": 0.35,
"interruptions": 2,
"questions_asked": 5
},
"action_items": ["Follow up on pricing", "Send case study"],
"coaching_notes": ["Improve objection handling"],
"sdr": { ... },
"cs": { ... }
}

For SDR campaigns, the sdr object includes: objections, buying_signals, meeting_booked, qualified, qualification_reason.

For CS campaigns, the cs object includes: issue_category, resolved, resolution_method, escalated, escalation_reason.

Database Tables / RPCs

Table / RPCOperationDescription
claim_enrichment_queue_batch (RPC)Read/WriteAtomically claims N queue items by type
callsRead/WriteReads call data, updates transcript_status, analysis_status, has_recording
agenciesReadFetches encrypted Ultravox API key
decrypt_api_key (RPC)ReadDecrypts the agency's stored API key
campaignsReadFetches campaign type (sdr or cs) for analysis prompt selection
update_call_transcript (RPC)WriteStores the fetched transcript
update_call_analysis (RPC)WriteStores the AI analysis results
complete_enrichment_queue_item (RPC)WriteMarks a queue item as successfully processed
fail_enrichment_queue_item (RPC)WriteMarks a queue item as failed with error message
fail_call_transcript (RPC)WriteMarks a call's transcript fetch as failed
fail_call_analysis (RPC)WriteMarks a call's analysis as failed

External APIs

APIMethodEndpointDescription
UltravoxGET/api/calls/{call_id}/messagesRetrieves the call transcript
UltravoxGET/api/calls/{call_id}/recordingChecks whether a recording exists (manual redirect)
OpenRouterPOST/api/v1/chat/completionsAI call analysis using openai/gpt-4o-mini model

Environment Variables

VariableDescription
SUPABASE_URLSupabase project URL
SUPABASE_SERVICE_ROLE_KEYService role key for database access
OPENROUTER_API_KEYAPI key for OpenRouter (used for AI analysis)
  • get-call-recording -- Retrieves signed recording URLs for calls whose has_recording flag was set by this function
  • process-webhook-queue -- Processes incoming webhook events that create the call records enriched by this function
  • webhook-ingest -- Ingests raw webhook events into the processing queue