Synchronous document processing works until it does not. When document volumes rise — a customer uploading a hundred files, a nightly re-indexing job, a bulk migration — blocking an API thread per document becomes a bottleneck and a reliability risk. A queue-backed BackgroundService decouples ingestion from processing, absorbs traffic spikes, and lets you control embedding throughput independently of your API layer.
This workshop builds a complete EmbeddingWorkerService that dequeues document messages from Azure Service Bus, generates embeddings via Azure OpenAI, and indexes them into Azure AI Search. You will implement rate-controlled parallel processing, dead-lettering for permanent failures, and OpenTelemetry instrumentation per document.
Prerequisites
- .NET 9 SDK installed
- An Azure Service Bus namespace with a queue named
document-ingestion - An Azure OpenAI resource with a deployed
text-embedding-3-smallmodel - An Azure AI Search resource (the Semantic Search API workshop covers index setup)
- Basic familiarity with
IHostedService/BackgroundServicelifecycle
Why Background Services for AI Workloads
Three scenarios justify moving AI processing off the synchronous request path:
Batch embedding and document ingestion. Embedding a single document takes 100–300ms. Embedding a hundred documents sequentially takes 10–30 seconds — far beyond acceptable API response time. A queue lets the API return immediately (HTTP 202 Accepted) while the worker processes at its own pace.
Scheduled re-indexing. Search indexes drift when documents are updated but embeddings are not regenerated. A background service triggered on a timer or queue message handles re-indexing without touching the API surface.
Rate quota smoothing. Azure OpenAI token-per-minute (TPM) quotas are enforced over rolling windows. Synchronous processing in response to user uploads can burst the quota unpredictably. A background worker with explicit concurrency limits spreads token consumption evenly, reducing 429 errors.
The rule of thumb: if the AI call is part of a user-interactive response, keep it synchronous. If it is part of a data pipeline, move it to a background service.
Step 1 — Scaffold the Project
dotnet new worker -n EmbeddingWorker
cd EmbeddingWorker
dotnet add package Azure.Messaging.ServiceBus --version 7.18.0
dotnet add package Azure.AI.OpenAI --version 2.1.0
dotnet add package Azure.Search.Documents --version 11.7.0
dotnet add package OpenTelemetry.Extensions.Hosting --version 1.9.0
dotnet add package OpenTelemetry.Exporter.OpenTelemetryProtocol --version 1.9.0
The worker template creates a .NET project with a Program.cs that calls Host.CreateDefaultBuilder() and a starter Worker.cs extending BackgroundService. You will replace the starter worker with the full EmbeddingWorkerService.
Step 2 — Configuration and Models
appsettings.json
{
"AzureServiceBus": {
"ConnectionString": "<your-service-bus-connection-string>",
"QueueName": "document-ingestion"
},
"AzureOpenAI": {
"Endpoint": "https://<your-resource>.openai.azure.com/",
"ApiKey": "<your-api-key>",
"EmbeddingDeployment": "text-embedding-3-small"
},
"AzureSearch": {
"Endpoint": "https://<your-search-service>.search.windows.net",
"AdminKey": "<your-admin-key>",
"IndexName": "documents"
},
"Logging": {
"LogLevel": {
"Default": "Information",
"EmbeddingWorker": "Debug"
}
}
}
Models/DocumentIngestionMessage.cs
namespace EmbeddingWorker.Models;
/// <summary>
/// Payload deserialized from each Azure Service Bus message.
/// The publishing API serializes this to JSON before enqueuing.
/// </summary>
public sealed record DocumentIngestionMessage(
string DocumentId,
string Title,
string Content,
string Category,
DateTimeOffset UploadedAt);
Models/WorkerSettings.cs
namespace EmbeddingWorker.Models;
public sealed class ServiceBusSettings
{
public const string SectionName = "AzureServiceBus";
public required string ConnectionString { get; init; }
public required string QueueName { get; init; }
}
public sealed class AzureOpenAISettings
{
public const string SectionName = "AzureOpenAI";
public required string Endpoint { get; init; }
public required string ApiKey { get; init; }
public required string EmbeddingDeployment { get; init; }
}
public sealed class AzureSearchSettings
{
public const string SectionName = "AzureSearch";
public required string Endpoint { get; init; }
public required string AdminKey { get; init; }
public string IndexName { get; init; } = "documents";
}
Step 3 — The EmbeddingWorkerService Class
The core of the workshop. This class extends BackgroundService from Microsoft.Extensions.Hosting and orchestrates the entire message processing pipeline.
Workers/EmbeddingWorkerService.cs
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Text.Json;
using Azure;
using Azure.AI.OpenAI;
using Azure.Messaging.ServiceBus;
using Azure.Search.Documents;
using Azure.Search.Documents.Models;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using OpenAI.Embeddings;
using EmbeddingWorker.Models;
namespace EmbeddingWorker.Workers;
public sealed class EmbeddingWorkerService : BackgroundService
{
// OpenTelemetry instrumentation
private static readonly ActivitySource _activitySource =
new("EmbeddingWorker", "1.0.0");
private static readonly Meter _meter =
new("EmbeddingWorker", "1.0.0");
private readonly Counter<long> _processedCounter;
private readonly Counter<long> _failedCounter;
private readonly Histogram<double> _embeddingLatency;
// Rate control: max 3 concurrent embedding calls
private readonly SemaphoreSlim _semaphore = new(3, 3);
private readonly ServiceBusClient _busClient;
private readonly AzureOpenAIClient _aiClient;
private readonly SearchClient _searchClient;
private readonly ServiceBusSettings _busSettings;
private readonly AzureOpenAISettings _openAISettings;
private readonly ILogger<EmbeddingWorkerService> _logger;
public EmbeddingWorkerService(
ServiceBusClient busClient,
AzureOpenAIClient aiClient,
SearchClient searchClient,
IOptions<ServiceBusSettings> busSettings,
IOptions<AzureOpenAISettings> openAISettings,
ILogger<EmbeddingWorkerService> logger)
{
_busClient = busClient;
_aiClient = aiClient;
_searchClient = searchClient;
_busSettings = busSettings.Value;
_openAISettings = openAISettings.Value;
_logger = logger;
_processedCounter = _meter.CreateCounter<long>(
"documents.processed", "documents", "Total documents successfully embedded and indexed");
_failedCounter = _meter.CreateCounter<long>(
"documents.failed", "documents", "Total documents that failed processing");
_embeddingLatency = _meter.CreateHistogram<double>(
"embedding.duration", "s", "Time taken to generate embeddings per document");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("EmbeddingWorkerService starting");
var processor = _busClient.CreateProcessor(
_busSettings.QueueName,
new ServiceBusProcessorOptions
{
// Let the SemaphoreSlim control actual AI call concurrency;
// keep Service Bus dequeue concurrency at 1 for simplicity
MaxConcurrentCalls = 1,
AutoCompleteMessages = false // We complete or dead-letter manually
});
processor.ProcessMessageAsync += HandleMessageAsync;
processor.ProcessErrorAsync += HandleErrorAsync;
await processor.StartProcessingAsync(stoppingToken);
_logger.LogInformation(
"Processor started. Listening on queue '{Queue}'", _busSettings.QueueName);
// Block until cancellation is requested (host shutdown)
try
{
await Task.Delay(Timeout.Infinite, stoppingToken);
}
catch (OperationCanceledException)
{
_logger.LogInformation("Cancellation requested — stopping processor");
}
finally
{
await processor.StopProcessingAsync();
await processor.DisposeAsync();
_logger.LogInformation("EmbeddingWorkerService stopped");
}
}
private async Task HandleMessageAsync(ProcessMessageEventArgs args)
{
var messageBody = args.Message.Body.ToString();
DocumentIngestionMessage? message = null;
try
{
message = JsonSerializer.Deserialize<DocumentIngestionMessage>(messageBody);
}
catch (JsonException ex)
{
_logger.LogError(ex,
"Malformed message body. Dead-lettering. MessageId={MessageId}",
args.Message.MessageId);
await args.DeadLetterMessageAsync(
args.Message,
deadLetterReason: "MalformedPayload",
deadLetterErrorDescription: ex.Message);
_failedCounter.Add(1, new TagList { { "reason", "malformed_payload" } });
return;
}
if (message is null)
{
await args.DeadLetterMessageAsync(
args.Message,
deadLetterReason: "NullPayload",
deadLetterErrorDescription: "Deserialized message was null");
_failedCounter.Add(1, new TagList { { "reason", "null_payload" } });
return;
}
await ProcessDocumentAsync(args, message);
}
private async Task ProcessDocumentAsync(
ProcessMessageEventArgs args,
DocumentIngestionMessage message)
{
using var activity = _activitySource.StartActivity(
"EmbedDocument", ActivityKind.Internal);
activity?.SetTag("document.id", message.DocumentId);
activity?.SetTag("document.title", message.Title);
activity?.SetTag("document.category", message.Category);
_logger.LogDebug(
"Processing document {DocumentId}: '{Title}'",
message.DocumentId, message.Title);
// Acquire semaphore slot before calling OpenAI
await _semaphore.WaitAsync(args.CancellationToken);
var sw = Stopwatch.StartNew();
try
{
var vector = await GenerateEmbeddingAsync(
message.Content, args.CancellationToken);
sw.Stop();
_embeddingLatency.Record(
sw.Elapsed.TotalSeconds,
new TagList { { "document.category", message.Category } });
await IndexDocumentAsync(message, vector, args.CancellationToken);
await args.CompleteMessageAsync(args.Message);
_processedCounter.Add(1,
new TagList { { "document.category", message.Category } });
activity?.SetStatus(ActivityStatusCode.Ok);
_logger.LogInformation(
"Document {DocumentId} embedded and indexed in {ElapsedMs}ms",
message.DocumentId, sw.ElapsedMilliseconds);
}
catch (ClientResultException ex) when ((int)ex.Status == 429)
{
// Rate limited — do not complete or dead-letter; let the lock expire
// so Service Bus redelivers after the lock duration.
// For guidance on handling 429s, see /hospital/fix-429-rate-limit-exceeded-azure-openai-dotnet/
_logger.LogWarning(
"Rate limited (429) processing document {DocumentId}. " +
"Message will be redelivered when lock expires.",
message.DocumentId);
activity?.SetStatus(ActivityStatusCode.Error, "RateLimited");
}
catch (ClientResultException ex)
{
_logger.LogError(ex,
"Permanent API error ({Status}) for document {DocumentId}. Dead-lettering.",
(int)ex.Status, message.DocumentId);
await args.DeadLetterMessageAsync(
args.Message,
deadLetterReason: $"ApiError_{ex.Status}",
deadLetterErrorDescription: ex.Message);
_failedCounter.Add(1,
new TagList { { "reason", "api_error" }, { "status", (int)ex.Status } });
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Unexpected error processing document {DocumentId}. Dead-lettering.",
message.DocumentId);
await args.DeadLetterMessageAsync(
args.Message,
deadLetterReason: "UnexpectedException",
deadLetterErrorDescription: ex.Message);
_failedCounter.Add(1, new TagList { { "reason", "unexpected" } });
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
}
finally
{
// Brief delay before releasing the slot to smooth token consumption
await Task.Delay(200, CancellationToken.None);
_semaphore.Release();
}
}
private async Task<ReadOnlyMemory<float>> GenerateEmbeddingAsync(
string text, CancellationToken ct)
{
var embeddingClient = _aiClient.GetEmbeddingClient(
_openAISettings.EmbeddingDeployment);
EmbeddingCollection embeddings =
await embeddingClient.GenerateEmbeddingsAsync(
new List<string> { text }, cancellationToken: ct);
return embeddings[0].Vector;
}
private async Task IndexDocumentAsync(
DocumentIngestionMessage message,
ReadOnlyMemory<float> vector,
CancellationToken ct)
{
var doc = new
{
id = message.DocumentId,
title = message.Title,
content = message.Content,
category = message.Category,
uploadedAt = message.UploadedAt,
contentVector = vector.ToArray()
};
var batch = IndexDocumentsBatch.Upload(new[] { doc });
await _searchClient.IndexDocumentsAsync(batch, cancellationToken: ct);
}
private Task HandleErrorAsync(ProcessErrorEventArgs args)
{
_logger.LogError(args.Exception,
"Service Bus processor error. Source={ErrorSource} EntityPath={EntityPath}",
args.ErrorSource, args.EntityPath);
return Task.CompletedTask;
}
public override void Dispose()
{
_semaphore.Dispose();
base.Dispose();
}
}
Key Design Decisions
AutoCompleteMessages = false. You take explicit control of message completion. A message is only completed after the document is successfully embedded and indexed. Any unhandled exception leaves the message on the queue for redelivery.
SemaphoreSlim(3, 3). This limits concurrent embedding calls to three regardless of MaxConcurrentCalls. If you later increase MaxConcurrentCalls to 4 or 5 for higher throughput, the semaphore ensures you never fire more than three simultaneous OpenAI API calls. This is the primary defence against token-per-minute quota exhaustion — see the Azure OpenAI 429 rate limit fix guide for the full retry strategy.
200ms release delay. After each semaphore release, a 200ms pause before the next message acquires the slot smooths the token burst. Over one minute with 3 concurrent slots and 200ms minimum spacing, you can make at most ~150 embedding calls — a predictable ceiling you can align with your TPM quota.
Step 4 — Wire Up Program.cs
using Azure;
using Azure.AI.OpenAI;
using Azure.Messaging.ServiceBus;
using Azure.Search.Documents;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using OpenTelemetry.Metrics;
using OpenTelemetry.Trace;
using EmbeddingWorker.Models;
using EmbeddingWorker.Workers;
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((ctx, services) =>
{
var cfg = ctx.Configuration;
// Bind settings
services.Configure<ServiceBusSettings>(
cfg.GetSection(ServiceBusSettings.SectionName));
services.Configure<AzureOpenAISettings>(
cfg.GetSection(AzureOpenAISettings.SectionName));
services.Configure<AzureSearchSettings>(
cfg.GetSection(AzureSearchSettings.SectionName));
// Azure Service Bus client (singleton — thread-safe)
services.AddSingleton(sp =>
{
var s = cfg.GetSection(ServiceBusSettings.SectionName)
.Get<ServiceBusSettings>()!;
return new ServiceBusClient(s.ConnectionString);
});
// Azure OpenAI client
services.AddSingleton(sp =>
{
var s = cfg.GetSection(AzureOpenAISettings.SectionName)
.Get<AzureOpenAISettings>()!;
return new AzureOpenAIClient(
new Uri(s.Endpoint),
new AzureKeyCredential(s.ApiKey));
});
// Azure AI Search client
services.AddSingleton(sp =>
{
var s = cfg.GetSection(AzureSearchSettings.SectionName)
.Get<AzureSearchSettings>()!;
return new SearchClient(
new Uri(s.Endpoint),
s.IndexName,
new AzureKeyCredential(s.AdminKey));
});
// Register the hosted service
services.AddHostedService<EmbeddingWorkerService>();
// OpenTelemetry — see /university/opentelemetry-ai-dotnet-observability-guide/
// for the full guide on tracing and metrics configuration
services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddSource("EmbeddingWorker")
.AddOtlpExporter())
.WithMetrics(metrics => metrics
.AddMeter("EmbeddingWorker")
.AddOtlpExporter());
})
.Build();
await host.RunAsync();
AddHostedService<EmbeddingWorkerService>() registers the service with the .NET Generic Host. The host calls StartAsync (which invokes ExecuteAsync) on startup and StopAsync (which signals the CancellationToken) on graceful shutdown — giving the processor time to finish in-flight messages before the process exits.
Step 5 — Progress Tracking with Logging and SignalR
The ILogger calls in ProcessDocumentAsync provide structured logging with document ID, category, and elapsed milliseconds. In Seq, Application Insights, or any structured log sink, you can filter by document.id to trace the full lifecycle of a single document.
For real-time UI progress bars, push notifications through SignalR. Add the optional dependency:
// In Program.cs (for web-hosted scenarios)
builder.Services.AddSignalR();
builder.Services.AddSingleton<IHubContext<ProgressHub>>();
Inject IHubContext<ProgressHub> into EmbeddingWorkerService and broadcast after each completion:
// After CompleteMessageAsync, optionally notify connected clients
await _hubContext.Clients.All.SendAsync(
"DocumentProcessed",
new { message.DocumentId, ElapsedMs = sw.ElapsedMilliseconds },
args.CancellationToken);
For background services running in a pure worker process (without ASP.NET Core), skip SignalR and write progress to a Redis key or Azure Table Storage that the API polls.
Step 6 — Error Handling Strategy
The handler implements a three-tier error strategy:
| Error Type | Action | Reason |
|---|---|---|
| JSON deserialization failure | Dead-letter immediately | Message is malformed — redelivery will always fail |
| HTTP 429 from OpenAI | Abandon (let lock expire) | Transient; will succeed after quota refreshes |
Other ClientResultException | Dead-letter | Permanent API error (auth failure, bad request) |
| Unexpected exception | Dead-letter | Unknown — prevent infinite redelivery loop |
The 429 path deliberately avoids calling CompleteMessageAsync or DeadLetterMessageAsync. When the message lock expires (set your queue’s LockDuration to 5 minutes to give the quota window time to refresh), Service Bus automatically redelivers the message. After MaxDeliveryCount attempts (default 10), Service Bus dead-letters it automatically.
For more sophisticated 429 handling including exponential backoff and proactive token counting, see the Azure OpenAI 429 rate limit fix guide.
Step 7 — Scaling with Azure Container Apps and KEDA
A single worker instance processes one message at a time (with up to three concurrent embedding calls). For high-volume ingestion, scale horizontally with competing consumers.
Azure Container Apps KEDA scaler (containerapp.yaml fragment):
scale:
minReplicas: 0
maxReplicas: 10
rules:
- name: servicebus-queue-length
custom:
type: azure-servicebus
metadata:
queueName: document-ingestion
messageCount: "50" # scale up when queue depth > 50 messages per replica
auth:
- secretRef: servicebus-connection
triggerParameter: connection
This KEDA scaler adds one replica per 50 queued messages, up to ten replicas. When the queue drains, replicas scale back to zero — paying only for active processing time.
Concurrency considerations with multiple replicas. Each replica runs its own EmbeddingWorkerService with its own SemaphoreSlim. Ten replicas with SemaphoreSlim(3, 3) yield up to 30 concurrent embedding calls across the fleet. Ensure your Azure OpenAI deployment’s TPM quota supports that load. If not, reduce SemaphoreSlim initial count or cap maxReplicas.
Azure Service Bus standard queues support competing consumers natively — no additional coordination is needed. Each message is delivered to exactly one consumer.
Step 8 — Monitoring with OpenTelemetry
The EmbeddingWorkerService creates three instruments from System.Diagnostics.Metrics:
documents.processed(Counter) — increments on each successful index write, tagged by categorydocuments.failed(Counter) — increments on dead-letter, tagged by failure reasonembedding.duration(Histogram) — records seconds per embedding call, tagged by category
And one ActivitySource:
EmbeddingWorker— a span per document withdocument.id,document.title, anddocument.categorytags
Wire these into your OpenTelemetry pipeline as shown in Program.cs above. For the complete configuration including Application Insights export and PII masking, follow the OpenTelemetry AI observability guide for .NET.
A Grafana dashboard using the Prometheus remote write exporter can visualize documents.processed as a rate panel and embedding.duration as a heatmap — giving you real-time throughput visibility as queue depth fluctuates.
Sample KQL query for Application Insights:
customMetrics
| where name == "documents.processed"
| summarize TotalProcessed = sum(valueSum) by bin(timestamp, 5m)
| render timechart
This query shows five-minute throughput — useful for correlating queue depth spikes with processing rate during load tests.
What You Built
A production-grade BackgroundService that:
- Dequeues
DocumentIngestionMessageJSON from Azure Service Bus - Generates embeddings via
AzureOpenAIClient.GetEmbeddingClient()with three-slot SemaphoreSlim rate control - Indexes vectors into Azure AI Search (see Build a Semantic Search API for full index schema)
- Dead-letters malformed and permanently-failed messages; abandons 429s for redelivery
- Tracks per-document latency, throughput, and failure reason with
ActivitySourcespans andMeterinstruments - Scales horizontally on Azure Container Apps via KEDA queue-length autoscaling
The pattern extends naturally to other AI background workloads: image captioning, structured data extraction, content classification, or any pipeline where processing time exceeds interactive response budgets.