Skip to main content

Build AI Background Services in .NET for Document Processing

Intermediate Original .NET 9 Azure.AI.OpenAI 2.1.0 Azure.Messaging.ServiceBus 7.18.0 Microsoft.Extensions.Hosting 9.0.0
By Rajesh Mishra · Mar 21, 2026 · 16 min read
Verified Mar 2026 .NET 9 Azure.AI.OpenAI 2.1.0
In 30 Seconds

This workshop builds a production-grade .NET BackgroundService that dequeues document ingestion messages from Azure Service Bus, generates embeddings via Azure OpenAI, and indexes vectors into Azure AI Search. Rate control uses SemaphoreSlim to stay within token-per-minute quotas. Each document is tracked with OpenTelemetry ActivitySource spans and System.Diagnostics.Metrics counters. Scaling is handled by Azure Container Apps with KEDA queue-length autoscaling.

What You'll Build

Build a .NET BackgroundService for AI document ingestion and embedding generation. Queue processing with Azure Service Bus and OpenTelemetry monitoring.

Azure.AI.OpenAI 2.1.0Azure.Messaging.ServiceBus 7.18.0Microsoft.Extensions.Hosting 9.0.0 .NET 9 · 16 min read to complete

Synchronous document processing works until it does not. When document volumes rise — a customer uploading a hundred files, a nightly re-indexing job, a bulk migration — blocking an API thread per document becomes a bottleneck and a reliability risk. A queue-backed BackgroundService decouples ingestion from processing, absorbs traffic spikes, and lets you control embedding throughput independently of your API layer.

This workshop builds a complete EmbeddingWorkerService that dequeues document messages from Azure Service Bus, generates embeddings via Azure OpenAI, and indexes them into Azure AI Search. You will implement rate-controlled parallel processing, dead-lettering for permanent failures, and OpenTelemetry instrumentation per document.

Prerequisites

  • .NET 9 SDK installed
  • An Azure Service Bus namespace with a queue named document-ingestion
  • An Azure OpenAI resource with a deployed text-embedding-3-small model
  • An Azure AI Search resource (the Semantic Search API workshop covers index setup)
  • Basic familiarity with IHostedService / BackgroundService lifecycle

Why Background Services for AI Workloads

Three scenarios justify moving AI processing off the synchronous request path:

Batch embedding and document ingestion. Embedding a single document takes 100–300ms. Embedding a hundred documents sequentially takes 10–30 seconds — far beyond acceptable API response time. A queue lets the API return immediately (HTTP 202 Accepted) while the worker processes at its own pace.

Scheduled re-indexing. Search indexes drift when documents are updated but embeddings are not regenerated. A background service triggered on a timer or queue message handles re-indexing without touching the API surface.

Rate quota smoothing. Azure OpenAI token-per-minute (TPM) quotas are enforced over rolling windows. Synchronous processing in response to user uploads can burst the quota unpredictably. A background worker with explicit concurrency limits spreads token consumption evenly, reducing 429 errors.

The rule of thumb: if the AI call is part of a user-interactive response, keep it synchronous. If it is part of a data pipeline, move it to a background service.

Document Upload(API)Azure Service BusQueueBackgroundService(EmbeddingWorker)Azure OpenAIEmbeddingsAzure AI SearchIndexOpenTelemetry(metrics) enqueuedequeueembed(text)index vectorsspans + metrics

Step 1 — Scaffold the Project

dotnet new worker -n EmbeddingWorker
cd EmbeddingWorker
dotnet add package Azure.Messaging.ServiceBus --version 7.18.0
dotnet add package Azure.AI.OpenAI --version 2.1.0
dotnet add package Azure.Search.Documents --version 11.7.0
dotnet add package OpenTelemetry.Extensions.Hosting --version 1.9.0
dotnet add package OpenTelemetry.Exporter.OpenTelemetryProtocol --version 1.9.0

The worker template creates a .NET project with a Program.cs that calls Host.CreateDefaultBuilder() and a starter Worker.cs extending BackgroundService. You will replace the starter worker with the full EmbeddingWorkerService.

Step 2 — Configuration and Models

appsettings.json

{
  "AzureServiceBus": {
    "ConnectionString": "<your-service-bus-connection-string>",
    "QueueName": "document-ingestion"
  },
  "AzureOpenAI": {
    "Endpoint": "https://<your-resource>.openai.azure.com/",
    "ApiKey": "<your-api-key>",
    "EmbeddingDeployment": "text-embedding-3-small"
  },
  "AzureSearch": {
    "Endpoint": "https://<your-search-service>.search.windows.net",
    "AdminKey": "<your-admin-key>",
    "IndexName": "documents"
  },
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "EmbeddingWorker": "Debug"
    }
  }
}

Models/DocumentIngestionMessage.cs

namespace EmbeddingWorker.Models;

/// <summary>
/// Payload deserialized from each Azure Service Bus message.
/// The publishing API serializes this to JSON before enqueuing.
/// </summary>
public sealed record DocumentIngestionMessage(
    string DocumentId,
    string Title,
    string Content,
    string Category,
    DateTimeOffset UploadedAt);

Models/WorkerSettings.cs

namespace EmbeddingWorker.Models;

public sealed class ServiceBusSettings
{
    public const string SectionName = "AzureServiceBus";
    public required string ConnectionString { get; init; }
    public required string QueueName { get; init; }
}

public sealed class AzureOpenAISettings
{
    public const string SectionName = "AzureOpenAI";
    public required string Endpoint { get; init; }
    public required string ApiKey { get; init; }
    public required string EmbeddingDeployment { get; init; }
}

public sealed class AzureSearchSettings
{
    public const string SectionName = "AzureSearch";
    public required string Endpoint { get; init; }
    public required string AdminKey { get; init; }
    public string IndexName { get; init; } = "documents";
}

Step 3 — The EmbeddingWorkerService Class

The core of the workshop. This class extends BackgroundService from Microsoft.Extensions.Hosting and orchestrates the entire message processing pipeline.

Workers/EmbeddingWorkerService.cs

using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Text.Json;
using Azure;
using Azure.AI.OpenAI;
using Azure.Messaging.ServiceBus;
using Azure.Search.Documents;
using Azure.Search.Documents.Models;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using OpenAI.Embeddings;
using EmbeddingWorker.Models;

namespace EmbeddingWorker.Workers;

public sealed class EmbeddingWorkerService : BackgroundService
{
    // OpenTelemetry instrumentation
    private static readonly ActivitySource _activitySource =
        new("EmbeddingWorker", "1.0.0");
    private static readonly Meter _meter =
        new("EmbeddingWorker", "1.0.0");

    private readonly Counter<long> _processedCounter;
    private readonly Counter<long> _failedCounter;
    private readonly Histogram<double> _embeddingLatency;

    // Rate control: max 3 concurrent embedding calls
    private readonly SemaphoreSlim _semaphore = new(3, 3);

    private readonly ServiceBusClient _busClient;
    private readonly AzureOpenAIClient _aiClient;
    private readonly SearchClient _searchClient;
    private readonly ServiceBusSettings _busSettings;
    private readonly AzureOpenAISettings _openAISettings;
    private readonly ILogger<EmbeddingWorkerService> _logger;

    public EmbeddingWorkerService(
        ServiceBusClient busClient,
        AzureOpenAIClient aiClient,
        SearchClient searchClient,
        IOptions<ServiceBusSettings> busSettings,
        IOptions<AzureOpenAISettings> openAISettings,
        ILogger<EmbeddingWorkerService> logger)
    {
        _busClient = busClient;
        _aiClient = aiClient;
        _searchClient = searchClient;
        _busSettings = busSettings.Value;
        _openAISettings = openAISettings.Value;
        _logger = logger;

        _processedCounter = _meter.CreateCounter<long>(
            "documents.processed", "documents", "Total documents successfully embedded and indexed");
        _failedCounter = _meter.CreateCounter<long>(
            "documents.failed", "documents", "Total documents that failed processing");
        _embeddingLatency = _meter.CreateHistogram<double>(
            "embedding.duration", "s", "Time taken to generate embeddings per document");
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("EmbeddingWorkerService starting");

        var processor = _busClient.CreateProcessor(
            _busSettings.QueueName,
            new ServiceBusProcessorOptions
            {
                // Let the SemaphoreSlim control actual AI call concurrency;
                // keep Service Bus dequeue concurrency at 1 for simplicity
                MaxConcurrentCalls = 1,
                AutoCompleteMessages = false  // We complete or dead-letter manually
            });

        processor.ProcessMessageAsync += HandleMessageAsync;
        processor.ProcessErrorAsync += HandleErrorAsync;

        await processor.StartProcessingAsync(stoppingToken);

        _logger.LogInformation(
            "Processor started. Listening on queue '{Queue}'", _busSettings.QueueName);

        // Block until cancellation is requested (host shutdown)
        try
        {
            await Task.Delay(Timeout.Infinite, stoppingToken);
        }
        catch (OperationCanceledException)
        {
            _logger.LogInformation("Cancellation requested — stopping processor");
        }
        finally
        {
            await processor.StopProcessingAsync();
            await processor.DisposeAsync();
            _logger.LogInformation("EmbeddingWorkerService stopped");
        }
    }

    private async Task HandleMessageAsync(ProcessMessageEventArgs args)
    {
        var messageBody = args.Message.Body.ToString();
        DocumentIngestionMessage? message = null;

        try
        {
            message = JsonSerializer.Deserialize<DocumentIngestionMessage>(messageBody);
        }
        catch (JsonException ex)
        {
            _logger.LogError(ex,
                "Malformed message body. Dead-lettering. MessageId={MessageId}",
                args.Message.MessageId);

            await args.DeadLetterMessageAsync(
                args.Message,
                deadLetterReason: "MalformedPayload",
                deadLetterErrorDescription: ex.Message);

            _failedCounter.Add(1, new TagList { { "reason", "malformed_payload" } });
            return;
        }

        if (message is null)
        {
            await args.DeadLetterMessageAsync(
                args.Message,
                deadLetterReason: "NullPayload",
                deadLetterErrorDescription: "Deserialized message was null");
            _failedCounter.Add(1, new TagList { { "reason", "null_payload" } });
            return;
        }

        await ProcessDocumentAsync(args, message);
    }

    private async Task ProcessDocumentAsync(
        ProcessMessageEventArgs args,
        DocumentIngestionMessage message)
    {
        using var activity = _activitySource.StartActivity(
            "EmbedDocument", ActivityKind.Internal);
        activity?.SetTag("document.id", message.DocumentId);
        activity?.SetTag("document.title", message.Title);
        activity?.SetTag("document.category", message.Category);

        _logger.LogDebug(
            "Processing document {DocumentId}: '{Title}'",
            message.DocumentId, message.Title);

        // Acquire semaphore slot before calling OpenAI
        await _semaphore.WaitAsync(args.CancellationToken);

        var sw = Stopwatch.StartNew();
        try
        {
            var vector = await GenerateEmbeddingAsync(
                message.Content, args.CancellationToken);

            sw.Stop();
            _embeddingLatency.Record(
                sw.Elapsed.TotalSeconds,
                new TagList { { "document.category", message.Category } });

            await IndexDocumentAsync(message, vector, args.CancellationToken);

            await args.CompleteMessageAsync(args.Message);

            _processedCounter.Add(1,
                new TagList { { "document.category", message.Category } });

            activity?.SetStatus(ActivityStatusCode.Ok);

            _logger.LogInformation(
                "Document {DocumentId} embedded and indexed in {ElapsedMs}ms",
                message.DocumentId, sw.ElapsedMilliseconds);
        }
        catch (ClientResultException ex) when ((int)ex.Status == 429)
        {
            // Rate limited — do not complete or dead-letter; let the lock expire
            // so Service Bus redelivers after the lock duration.
            // For guidance on handling 429s, see /hospital/fix-429-rate-limit-exceeded-azure-openai-dotnet/
            _logger.LogWarning(
                "Rate limited (429) processing document {DocumentId}. " +
                "Message will be redelivered when lock expires.",
                message.DocumentId);
            activity?.SetStatus(ActivityStatusCode.Error, "RateLimited");
        }
        catch (ClientResultException ex)
        {
            _logger.LogError(ex,
                "Permanent API error ({Status}) for document {DocumentId}. Dead-lettering.",
                (int)ex.Status, message.DocumentId);

            await args.DeadLetterMessageAsync(
                args.Message,
                deadLetterReason: $"ApiError_{ex.Status}",
                deadLetterErrorDescription: ex.Message);

            _failedCounter.Add(1,
                new TagList { { "reason", "api_error" }, { "status", (int)ex.Status } });
            activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex,
                "Unexpected error processing document {DocumentId}. Dead-lettering.",
                message.DocumentId);

            await args.DeadLetterMessageAsync(
                args.Message,
                deadLetterReason: "UnexpectedException",
                deadLetterErrorDescription: ex.Message);

            _failedCounter.Add(1, new TagList { { "reason", "unexpected" } });
            activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
        }
        finally
        {
            // Brief delay before releasing the slot to smooth token consumption
            await Task.Delay(200, CancellationToken.None);
            _semaphore.Release();
        }
    }

    private async Task<ReadOnlyMemory<float>> GenerateEmbeddingAsync(
        string text, CancellationToken ct)
    {
        var embeddingClient = _aiClient.GetEmbeddingClient(
            _openAISettings.EmbeddingDeployment);

        EmbeddingCollection embeddings =
            await embeddingClient.GenerateEmbeddingsAsync(
                new List<string> { text }, cancellationToken: ct);

        return embeddings[0].Vector;
    }

    private async Task IndexDocumentAsync(
        DocumentIngestionMessage message,
        ReadOnlyMemory<float> vector,
        CancellationToken ct)
    {
        var doc = new
        {
            id = message.DocumentId,
            title = message.Title,
            content = message.Content,
            category = message.Category,
            uploadedAt = message.UploadedAt,
            contentVector = vector.ToArray()
        };

        var batch = IndexDocumentsBatch.Upload(new[] { doc });
        await _searchClient.IndexDocumentsAsync(batch, cancellationToken: ct);
    }

    private Task HandleErrorAsync(ProcessErrorEventArgs args)
    {
        _logger.LogError(args.Exception,
            "Service Bus processor error. Source={ErrorSource} EntityPath={EntityPath}",
            args.ErrorSource, args.EntityPath);
        return Task.CompletedTask;
    }

    public override void Dispose()
    {
        _semaphore.Dispose();
        base.Dispose();
    }
}

Key Design Decisions

AutoCompleteMessages = false. You take explicit control of message completion. A message is only completed after the document is successfully embedded and indexed. Any unhandled exception leaves the message on the queue for redelivery.

SemaphoreSlim(3, 3). This limits concurrent embedding calls to three regardless of MaxConcurrentCalls. If you later increase MaxConcurrentCalls to 4 or 5 for higher throughput, the semaphore ensures you never fire more than three simultaneous OpenAI API calls. This is the primary defence against token-per-minute quota exhaustion — see the Azure OpenAI 429 rate limit fix guide for the full retry strategy.

200ms release delay. After each semaphore release, a 200ms pause before the next message acquires the slot smooths the token burst. Over one minute with 3 concurrent slots and 200ms minimum spacing, you can make at most ~150 embedding calls — a predictable ceiling you can align with your TPM quota.

Step 4 — Wire Up Program.cs

using Azure;
using Azure.AI.OpenAI;
using Azure.Messaging.ServiceBus;
using Azure.Search.Documents;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using OpenTelemetry.Metrics;
using OpenTelemetry.Trace;
using EmbeddingWorker.Models;
using EmbeddingWorker.Workers;

var host = Host.CreateDefaultBuilder(args)
    .ConfigureServices((ctx, services) =>
    {
        var cfg = ctx.Configuration;

        // Bind settings
        services.Configure<ServiceBusSettings>(
            cfg.GetSection(ServiceBusSettings.SectionName));
        services.Configure<AzureOpenAISettings>(
            cfg.GetSection(AzureOpenAISettings.SectionName));
        services.Configure<AzureSearchSettings>(
            cfg.GetSection(AzureSearchSettings.SectionName));

        // Azure Service Bus client (singleton — thread-safe)
        services.AddSingleton(sp =>
        {
            var s = cfg.GetSection(ServiceBusSettings.SectionName)
                       .Get<ServiceBusSettings>()!;
            return new ServiceBusClient(s.ConnectionString);
        });

        // Azure OpenAI client
        services.AddSingleton(sp =>
        {
            var s = cfg.GetSection(AzureOpenAISettings.SectionName)
                       .Get<AzureOpenAISettings>()!;
            return new AzureOpenAIClient(
                new Uri(s.Endpoint),
                new AzureKeyCredential(s.ApiKey));
        });

        // Azure AI Search client
        services.AddSingleton(sp =>
        {
            var s = cfg.GetSection(AzureSearchSettings.SectionName)
                       .Get<AzureSearchSettings>()!;
            return new SearchClient(
                new Uri(s.Endpoint),
                s.IndexName,
                new AzureKeyCredential(s.AdminKey));
        });

        // Register the hosted service
        services.AddHostedService<EmbeddingWorkerService>();

        // OpenTelemetry — see /university/opentelemetry-ai-dotnet-observability-guide/
        // for the full guide on tracing and metrics configuration
        services.AddOpenTelemetry()
            .WithTracing(tracing => tracing
                .AddSource("EmbeddingWorker")
                .AddOtlpExporter())
            .WithMetrics(metrics => metrics
                .AddMeter("EmbeddingWorker")
                .AddOtlpExporter());
    })
    .Build();

await host.RunAsync();

AddHostedService<EmbeddingWorkerService>() registers the service with the .NET Generic Host. The host calls StartAsync (which invokes ExecuteAsync) on startup and StopAsync (which signals the CancellationToken) on graceful shutdown — giving the processor time to finish in-flight messages before the process exits.

Step 5 — Progress Tracking with Logging and SignalR

The ILogger calls in ProcessDocumentAsync provide structured logging with document ID, category, and elapsed milliseconds. In Seq, Application Insights, or any structured log sink, you can filter by document.id to trace the full lifecycle of a single document.

For real-time UI progress bars, push notifications through SignalR. Add the optional dependency:

// In Program.cs (for web-hosted scenarios)
builder.Services.AddSignalR();
builder.Services.AddSingleton<IHubContext<ProgressHub>>();

Inject IHubContext<ProgressHub> into EmbeddingWorkerService and broadcast after each completion:

// After CompleteMessageAsync, optionally notify connected clients
await _hubContext.Clients.All.SendAsync(
    "DocumentProcessed",
    new { message.DocumentId, ElapsedMs = sw.ElapsedMilliseconds },
    args.CancellationToken);

For background services running in a pure worker process (without ASP.NET Core), skip SignalR and write progress to a Redis key or Azure Table Storage that the API polls.

Step 6 — Error Handling Strategy

The handler implements a three-tier error strategy:

Error TypeActionReason
JSON deserialization failureDead-letter immediatelyMessage is malformed — redelivery will always fail
HTTP 429 from OpenAIAbandon (let lock expire)Transient; will succeed after quota refreshes
Other ClientResultExceptionDead-letterPermanent API error (auth failure, bad request)
Unexpected exceptionDead-letterUnknown — prevent infinite redelivery loop

The 429 path deliberately avoids calling CompleteMessageAsync or DeadLetterMessageAsync. When the message lock expires (set your queue’s LockDuration to 5 minutes to give the quota window time to refresh), Service Bus automatically redelivers the message. After MaxDeliveryCount attempts (default 10), Service Bus dead-letters it automatically.

For more sophisticated 429 handling including exponential backoff and proactive token counting, see the Azure OpenAI 429 rate limit fix guide.

Step 7 — Scaling with Azure Container Apps and KEDA

A single worker instance processes one message at a time (with up to three concurrent embedding calls). For high-volume ingestion, scale horizontally with competing consumers.

Azure Container Apps KEDA scaler (containerapp.yaml fragment):

scale:
  minReplicas: 0
  maxReplicas: 10
  rules:
    - name: servicebus-queue-length
      custom:
        type: azure-servicebus
        metadata:
          queueName: document-ingestion
          messageCount: "50"   # scale up when queue depth > 50 messages per replica
        auth:
          - secretRef: servicebus-connection
            triggerParameter: connection

This KEDA scaler adds one replica per 50 queued messages, up to ten replicas. When the queue drains, replicas scale back to zero — paying only for active processing time.

Concurrency considerations with multiple replicas. Each replica runs its own EmbeddingWorkerService with its own SemaphoreSlim. Ten replicas with SemaphoreSlim(3, 3) yield up to 30 concurrent embedding calls across the fleet. Ensure your Azure OpenAI deployment’s TPM quota supports that load. If not, reduce SemaphoreSlim initial count or cap maxReplicas.

Azure Service Bus standard queues support competing consumers natively — no additional coordination is needed. Each message is delivered to exactly one consumer.

Step 8 — Monitoring with OpenTelemetry

The EmbeddingWorkerService creates three instruments from System.Diagnostics.Metrics:

  • documents.processed (Counter) — increments on each successful index write, tagged by category
  • documents.failed (Counter) — increments on dead-letter, tagged by failure reason
  • embedding.duration (Histogram) — records seconds per embedding call, tagged by category

And one ActivitySource:

  • EmbeddingWorker — a span per document with document.id, document.title, and document.category tags

Wire these into your OpenTelemetry pipeline as shown in Program.cs above. For the complete configuration including Application Insights export and PII masking, follow the OpenTelemetry AI observability guide for .NET.

A Grafana dashboard using the Prometheus remote write exporter can visualize documents.processed as a rate panel and embedding.duration as a heatmap — giving you real-time throughput visibility as queue depth fluctuates.

Sample KQL query for Application Insights:

customMetrics
| where name == "documents.processed"
| summarize TotalProcessed = sum(valueSum) by bin(timestamp, 5m)
| render timechart

This query shows five-minute throughput — useful for correlating queue depth spikes with processing rate during load tests.

What You Built

A production-grade BackgroundService that:

  • Dequeues DocumentIngestionMessage JSON from Azure Service Bus
  • Generates embeddings via AzureOpenAIClient.GetEmbeddingClient() with three-slot SemaphoreSlim rate control
  • Indexes vectors into Azure AI Search (see Build a Semantic Search API for full index schema)
  • Dead-letters malformed and permanently-failed messages; abandons 429s for redelivery
  • Tracks per-document latency, throughput, and failure reason with ActivitySource spans and Meter instruments
  • Scales horizontally on Azure Container Apps via KEDA queue-length autoscaling

The pattern extends naturally to other AI background workloads: image captioning, structured data extraction, content classification, or any pipeline where processing time exceeds interactive response budgets.

⚠ Production Considerations

  • Lock expiry during slow embedding calls. If the embedding API is slow and your Service Bus message lock expires before you call CompleteMessageAsync, Service Bus redelivers the message — causing duplicate processing and duplicate index entries. Set LockDuration on the queue to at least 5 minutes and renew locks proactively for documents that take longer to process.
  • Unbounded memory growth on large document batches. Loading an entire document into memory for embedding is fine for most files, but PDF or HTML documents with images can be several MB each. If MaxConcurrentCalls is 10 and each message loads a 5 MB document, you hold 50 MB in working set simultaneously. Chunk documents before enqueuing and keep message payload sizes under 256 KB (the Service Bus standard tier limit).

Enjoying this article?

Get weekly .NET + AI insights delivered to your inbox. No spam.

Subscribe Free →

🧠 Architect’s Note

Background workers are the right tool for AI workloads that are volume-driven rather than latency-driven. The key architectural decision is where to put backpressure: Service Bus queue depth absorbs upload spikes, MaxConcurrentCalls limits worker-level parallelism, and SemaphoreSlim limits OpenAI API concurrency. These three knobs operate at different layers and should be tuned independently. Start with MaxConcurrentCalls = 1 and SemaphoreSlim(3, 3) in production — scale up only after measuring actual throughput against your embedding quota.

AI-Friendly Summary

Summary

This workshop builds a production-grade .NET BackgroundService that dequeues document ingestion messages from Azure Service Bus, generates embeddings via Azure OpenAI, and indexes vectors into Azure AI Search. Rate control uses SemaphoreSlim to stay within token-per-minute quotas. Each document is tracked with OpenTelemetry ActivitySource spans and System.Diagnostics.Metrics counters. Scaling is handled by Azure Container Apps with KEDA queue-length autoscaling.

Key Takeaways

  • BackgroundService.ExecuteAsync with CancellationToken is the correct pattern for long-running AI workers in .NET
  • ServiceBusProcessor with MaxConcurrentCalls = 1 plus an internal SemaphoreSlim gives layered backpressure against embedding rate limits
  • Dead-letter messages immediately on permanent failures; abandon on transient failures to allow Service Bus redelivery
  • ActivitySource spans and Meter counters per document enable per-request latency and throughput monitoring
  • KEDA ServiceBus scaler on Azure Container Apps enables queue-depth-driven horizontal autoscaling

Implementation Checklist

  • Install Azure.Messaging.ServiceBus, Azure.AI.OpenAI, Azure.Search.Documents, and OpenTelemetry NuGet packages
  • Define DocumentIngestionMessage record to deserialize from Service Bus message body
  • Create EmbeddingWorkerService extending BackgroundService with ServiceBusProcessor injection
  • Wire ProcessMessageAsync and ProcessErrorAsync handlers on the processor
  • Add SemaphoreSlim(3, 3) gate around each embedding call with 200ms release delay
  • Implement dead-letter logic for ClientResultException with non-429 status codes
  • Add ActivitySource span and Meter counter per processed document
  • Register the hosted service with AddHostedService<EmbeddingWorkerService>()

Frequently Asked Questions

When should I use a BackgroundService for AI document processing instead of inline processing?

Use BackgroundService when document volumes are high, processing time exceeds acceptable API response latency, or you need to survive upstream failures without losing work. Inline processing is fine for single-document, interactive scenarios. Background services shine for bulk ingestion, nightly re-indexing runs, or queued uploads where the user does not wait for the result.

How does SemaphoreSlim help with Azure OpenAI rate limits?

SemaphoreSlim(3, 3) limits concurrent embedding calls to three in-flight requests at a time. Combined with a short delay between semaphore releases, it smooths token consumption and avoids bursting the Azure OpenAI token-per-minute quota. This is a client-side throttle that complements server-side retry logic for 429 responses.

What is the difference between MaxConcurrentCalls and the SemaphoreSlim concurrency limit?

MaxConcurrentCalls on ServiceBusProcessorOptions controls how many messages are dequeued and dispatched concurrently by the Service Bus SDK. The SemaphoreSlim limit inside your handler controls how many of those messages are making active OpenAI API calls at the same moment. Setting MaxConcurrentCalls = 1 and controlling parallelism in code gives finer-grained backpressure.

How do I dead-letter a poison message in Azure Service Bus from .NET?

Call await args.DeadLetterMessageAsync(args.Message, deadLetterReason: 'ProcessingFailed', deadLetterErrorDescription: exceptionMessage) inside your ProcessMessageAsync handler. The message moves to the $DeadLetterQueue sub-queue where it can be inspected and replayed. Do not call CompleteMessageAsync on a message you are dead-lettering.

Can I run multiple instances of the EmbeddingWorkerService for horizontal scaling?

Yes. Azure Service Bus queues support competing consumers — multiple worker instances dequeue messages independently, and each message is delivered to exactly one consumer. Deploy on Azure Container Apps with KEDA's ServiceBus scaler to automatically add and remove replicas based on queue depth.

How do I monitor embedding latency and document throughput with OpenTelemetry?

Create a Meter with CreateHistogram<double>('embedding.duration', 's') for per-document latency and a Counter<long>('documents.processed') for throughput. Record them after each successful embedding call. Register the Meter name with AddMeter() in your OpenTelemetry setup so the metrics are exported to your backend. See the OpenTelemetry AI observability guide for complete wiring.

How should I handle transient vs permanent errors differently in the message handler?

For transient errors (HTTP 429, 503, network timeouts), abandon the message by not calling Complete or DeadLetter — Service Bus will redeliver it after the lock expires. For permanent errors (malformed message, missing document ID, schema mismatch), call DeadLetterAsync immediately to prevent infinite redelivery. Use the delivery count property to dead-letter after N retries for errors that are ambiguously transient.

You Might Also Enjoy

Was this article useful?

Feedback is anonymous and helps us improve content quality.

Discussion

Engineering discussion powered by GitHub Discussions.

#BackgroundService #Hosted Service #Embeddings #Document Processing #.NET AI