Skip to main content

Data Sources

Connect vector search backends for retrieval-augmented generation (RAG). Data sources provide semantic search over external knowledge bases.

Quick Start

builder.Services
.AddCoreAIServices()
.AddCoreAIOrchestration()

// Add one or both backends:
.AddCoreElasticsearchServices(
builder.Configuration.GetSection("CrestApps:Elasticsearch"))
.AddCoreAzureAISearchServices(
builder.Configuration.GetSection("CrestApps:AzureAISearch"));

If you are new to AI-powered search, here is a brief primer on the concepts that data sources rely on.

Embeddings

An embedding is a list of numbers (a vector) that represents the meaning of a piece of text. Two texts with similar meanings produce vectors that are close together in mathematical space — even if they use completely different words. For example, "How do I reset my password?" and "I forgot my login credentials" produce similar embeddings.

Embeddings are generated by specialized AI models (e.g., text-embedding-ada-002 from OpenAI or an equivalent model in Azure AI).

Given a user's question, vector search converts it into an embedding and finds the documents whose embeddings are closest. This is fundamentally different from keyword search:

Keyword SearchVector Search
Query: "password reset"Matches documents containing "password" and "reset"Matches documents about authentication help, even if those words don't appear
Handles synonyms?NoYes
Handles typos?LimitedYes

Retrieval-Augmented Generation (RAG)

RAG combines vector search with generative AI. Instead of asking the AI model to answer from its training data alone, you retrieve relevant documents and inject them into the prompt so the model can ground its response in your actual data. This dramatically reduces hallucinations and lets the model answer questions about private or recent information.

RAG Pipeline

Here is the end-to-end flow when a user sends a query to a profile with data sources configured:

1. User Query: "What is our refund policy?"


2. Embedding Generation
└── The query is converted to a vector using an embedding model


3. Vector Search (IVectorSearchService)
└── The vector is compared against indexed document chunks
└── Top-N most similar chunks are returned (e.g., top 3)


4. Context Enrichment
└── `DataSourceAICompletionContextBuilderHandler` selects the attached data source
└── `DataSourceOrchestrationHandler` injects availability/tool guidance
└── `DataSourcePreemptiveRagHandler` injects retrieved chunks as system context


5. AI Completion
└── The model generates a response grounded in the retrieved documents


6. Response with Citations
└── The response references the source documents
tip

The number of document chunks retrieved (Top-N) is configurable per profile. A higher value provides more context but uses more tokens. The default is 3.

Architecture

Data sources integrate with the orchestration pipeline through three shared framework components:

  1. DataSourceAICompletionContextBuilderHandler copies the selected data source id into the completion context
  2. DataSourceOrchestrationHandler injects data-source availability instructions and keeps the search tool in scope
  3. DataSourcePreemptiveRagHandler performs preemptive retrieval and injects matching chunks into the system message

The same shared framework layer also exposes IAIDataSourceIndexingService for keeping knowledge-base indexes synchronized with their source indexes. Data source mappings are observed by the framework itself so source document writes can flow into the mapped knowledge-base index without each host re-implementing custom handlers.

User Query


DataSourceAICompletionContextBuilderHandler
│ (attaches data source id)

DataSourceOrchestrationHandler
│ (adds data-source availability + tool guidance)

DataSourcePreemptiveRagHandler
│ (queries vector store)

Completion Context (enriched with relevant documents)


AI Model (grounds response in retrieved data)

Common Services (Keyed by Provider Name)

Each data source backend registers these services, keyed by its provider name:

ServicePurpose
IDataSourceContentManagerManages content in data source indices
IDataSourceDocumentReaderReads documents from data source indices
IAIDataSourceIndexingServiceRebuilds and repairs knowledge-base indexes from source indexes
IAIDataSourceIndexingQueueQueues asynchronous rebuild and partial-sync work for mapped data sources
IODataFilterTranslatorTranslates OData filters to backend-native queries
ISearchIndexManagerCreates, deletes, and manages search indices
ISearchDocumentManagerIndexes and removes documents in search indices
IVectorSearchServicePerforms vector similarity search

Automatic Synchronization

When you configure an AIDataSource that maps a source index profile to a knowledge-base index profile, the framework keeps the two aligned in two ways:

  1. AIDataSourceCatalogIndexingHandler queues an initial or updated full rebuild whenever the mapping itself is created, edited, or deleted.
  2. ISearchDocumentManager implementations notify registered ISearchDocumentHandler instances after successful document upserts and deletes, and AIDataSourceSearchDocumentHandler queues a targeted sync for any mapped source profile.

That means document changes such as article create, update, and delete operations automatically flow into the mapped knowledge-base index as long as the source write goes through CrestApps.Core indexing services.

If a source update happens outside the framework, or an unexpected exception interrupts the queue flow, AIDataSourceAlignmentBackgroundService performs a nightly full reconciliation at 2:00 AM UTC to repair any drift.

Async flow and tracing

The default synchronization path is intentionally asynchronous:

  1. A catalog event (CreatedAsync, UpdatedAsync, DeletedAsync) or search-document event (DocumentsAddedOrUpdatedAsync, DocumentsDeletedAsync) fires first.
  2. The handler writes an AIDataSourceIndexingWorkItem into IAIDataSourceIndexingQueue.
  3. AIDataSourceIndexingBackgroundService dequeues the work item and invokes IAIDataSourceIndexingService.
  4. AIDataSourceAlignmentBackgroundService performs nightly reconciliation if anything was missed.

The default IAIDataSourceIndexingQueue implementation uses an in-memory channel. Replace it if you need durable storage across restarts or one shared queue across multiple nodes.

Set logging to Trace for the CrestApps.Core.AI categories when you need tracing. The framework logs when handler notifications are received, when queue work items are written and dequeued, and when the alignment worker runs or skips a scheduled pass.

Override points

Use these services when you need to customize the default behavior:

Service or contractDefault roleOverride when you need
IAIDataSourceIndexingQueueIn-memory async queuedurable storage, distributed dispatch, custom throttling
IAIDataSourceIndexingServicefull and partial sync orchestrationcustom chunking, filtering, or source-to-target mapping rules
ISearchDocumentHandlerreacts after source-index writes/deletesadditional downstream side effects after successful index mutations
ICatalogEntryHandler<AIDataSource>reacts after data-source mapping changescustom provisioning or non-default rebuild policy
IIndexProfileHandlerdefines and validates data-source index fieldscustom index schemas or provider-specific profile behavior

AddAIDataSources() on the provider builders registers all of the default queue, handler, background-service, and index-profile services for you.

Available Backends

BackendExtensionProvider NameDocumentation
ElasticsearchAddCoreElasticsearchServices()"Elasticsearch"Elasticsearch
Azure AI SearchAddCoreAzureAISearchServices()"AzureAISearch"Azure AI Search

Key Interfaces Deep Dive

IVectorSearchService

Performs vector similarity search against an index. This is the core search operation used during RAG.

public interface IVectorSearchService
{
Task<IEnumerable<DocumentChunkSearchResult>> SearchAsync(
IIndexProfileInfo indexProfile,
float[] embedding,
string referenceId,
string referenceType,
int topN,
CancellationToken cancellationToken = default);
}

The embedding parameter is the vector representation of the user's query. The topN parameter controls how many chunks to return.

ISearchDocumentManager

Manages the lifecycle of documents within a search index — adding, updating, and removing documents.

public interface ISearchDocumentManager
{
Task<bool> AddOrUpdateAsync(
IIndexProfileInfo profile,
IReadOnlyCollection<IndexDocument> documents,
CancellationToken cancellationToken = default);

Task DeleteAsync(
IIndexProfileInfo profile,
IEnumerable<string> documentIds,
CancellationToken cancellationToken = default);

Task DeleteAllAsync(
IIndexProfileInfo profile,
CancellationToken cancellationToken = default);
}

ISearchIndexManager

Creates and manages search indexes themselves (not the documents within them).

public interface ISearchIndexManager
{
Task<bool> ExistsAsync(string indexFullName, CancellationToken cancellationToken = default);

Task CreateAsync(
IIndexProfileInfo profile,
IReadOnlyCollection<SearchIndexField> fields,
CancellationToken cancellationToken = default);

Task DeleteAsync(string indexFullName, CancellationToken cancellationToken = default);
}

IDataSourceContentManager

A higher-level service that searches for document chunks with optional OData filtering and manages data source content.

public interface IDataSourceContentManager
{
Task<IEnumerable<DataSourceSearchResult>> SearchAsync(
IIndexProfileInfo indexProfile,
float[] embedding,
string dataSourceId,
int topN,
string filter = null,
CancellationToken cancellationToken = default);

Task<long> DeleteByDataSourceIdAsync(
IIndexProfileInfo indexProfile,
string dataSourceId,
CancellationToken cancellationToken = default);
}

The optional filter parameter accepts an OData expression that is translated to the backend-native query language via IODataFilterTranslator.

IAIDataSourceIndexingService

Coordinates full and partial synchronization between a source index and its AI knowledge-base index.

public interface IAIDataSourceIndexingService
{
Task SyncAllAsync(CancellationToken cancellationToken = default);
Task SyncDataSourceAsync(AIDataSource dataSource, CancellationToken cancellationToken = default);
Task SyncSourceDocumentsAsync(IEnumerable<string> documentIds, CancellationToken cancellationToken = default);
Task SyncSourceDocumentsAsync(string sourceIndexProfileName, IEnumerable<string> documentIds, CancellationToken cancellationToken = default);
Task RemoveSourceDocumentsAsync(IEnumerable<string> documentIds, CancellationToken cancellationToken = default);
Task RemoveSourceDocumentsAsync(string sourceIndexProfileName, IEnumerable<string> documentIds, CancellationToken cancellationToken = default);
Task DeleteDataSourceDocumentsAsync(AIDataSource dataSource, CancellationToken cancellationToken = default);
}

SyncDataSourceAsync() performs a full rebuild for a data source by deleting that data source's existing chunk documents and re-reading the mapped source index through IDataSourceDocumentReader. The framework uses the source-profile overloads of SyncSourceDocumentsAsync() and RemoveSourceDocumentsAsync() to react to document-level changes automatically, so most hosts only need to create the mapping and route source writes through ISearchDocumentManager.

MethodWhat it doesParameters
SyncAllAsync(cancellationToken)Reconciles every configured AIDataSource mapping. This is what the nightly alignment service uses when it repairs drift.cancellationToken: stops the full reconciliation when the host is shutting down or the caller cancels.
SyncDataSourceAsync(dataSource, cancellationToken)Runs a full rebuild for one mapped data source. Existing knowledge-base chunks for that mapping are removed and rebuilt from the source index.dataSource: the mapping definition to rebuild. cancellationToken: stops the rebuild.
SyncSourceDocumentsAsync(documentIds, cancellationToken)Synchronizes a set of source documents across any matching data sources without pre-filtering by source profile.documentIds: source document ids to refresh. cancellationToken: stops the partial sync.
SyncSourceDocumentsAsync(sourceIndexProfileName, documentIds, cancellationToken)Synchronizes only the changed source documents for mappings attached to one source index profile. This is the main path used by AIDataSourceSearchDocumentHandler.sourceIndexProfileName: the source profile that produced the document mutation. documentIds: source document ids to refresh. cancellationToken: stops the partial sync.
RemoveSourceDocumentsAsync(documentIds, cancellationToken)Removes a set of source documents from any matching data sources without pre-filtering by source profile.documentIds: source document ids to remove. cancellationToken: stops the removal.
RemoveSourceDocumentsAsync(sourceIndexProfileName, documentIds, cancellationToken)Removes source documents from knowledge-base indexes for mappings attached to one source index profile.sourceIndexProfileName: the source profile that produced the delete. documentIds: source document ids to remove. cancellationToken: stops the removal.
DeleteDataSourceDocumentsAsync(dataSource, cancellationToken)Deletes all indexed knowledge-base documents that belong to one mapped data source. The framework uses this when a mapping is removed.dataSource: the mapping definition whose chunks should be deleted. cancellationToken: stops the delete operation.

ISearchDocumentHandler

Handles post-write notifications emitted by ISearchDocumentManager implementations.

public interface ISearchDocumentHandler
{
Task DocumentsAddedOrUpdatedAsync(
IIndexProfileInfo profile,
IReadOnlyCollection<string> documentIds,
CancellationToken cancellationToken = default);

Task DocumentsDeletedAsync(
IIndexProfileInfo profile,
IReadOnlyCollection<string> documentIds,
CancellationToken cancellationToken = default);
}

This is the extension point the framework uses to keep data-source synchronization asynchronous without forcing hosts to wrap the provider-specific document managers.

MethodWhat it doesParameters
DocumentsAddedOrUpdatedAsync(profile, documentIds, cancellationToken)Runs after a provider successfully writes documents into the source index. Use it for downstream reactions that must happen only after the source write succeeded.profile: the source index profile that completed the write. documentIds: the successfully written source document ids. cancellationToken: stops follow-up work.
DocumentsDeletedAsync(profile, documentIds, cancellationToken)Runs after a provider successfully deletes documents from the source index. Use it for downstream cleanup that depends on successful deletion.profile: the source index profile that completed the delete. documentIds: the successfully deleted source document ids. cancellationToken: stops follow-up work.

IAIDataSourceIndexingQueue

Queues the asynchronous work generated by catalog and search-document handlers.

public interface IAIDataSourceIndexingQueue
{
ValueTask QueueSyncDataSourceAsync(AIDataSource dataSource, CancellationToken cancellationToken = default);
ValueTask QueueDeleteDataSourceAsync(AIDataSource dataSource, CancellationToken cancellationToken = default);
ValueTask QueueSyncSourceDocumentsAsync(string sourceIndexProfileName, IReadOnlyCollection<string> documentIds, CancellationToken cancellationToken = default);
ValueTask QueueRemoveSourceDocumentsAsync(string sourceIndexProfileName, IReadOnlyCollection<string> documentIds, CancellationToken cancellationToken = default);
}

Replace this service if you need a durable queue or shared distributed worker infrastructure instead of the built-in in-memory channel.

MethodWhat it doesParameters
QueueSyncDataSourceAsync(dataSource, cancellationToken)Enqueues a full rebuild for one mapped data source after the mapping itself changes.dataSource: the mapping definition to rebuild. cancellationToken: stops queue submission.
QueueDeleteDataSourceAsync(dataSource, cancellationToken)Enqueues cleanup for all indexed chunks that belong to one mapped data source after the mapping is deleted.dataSource: the mapping definition whose chunks should be deleted. cancellationToken: stops queue submission.
QueueSyncSourceDocumentsAsync(sourceIndexProfileName, documentIds, cancellationToken)Enqueues a targeted refresh for changed source documents under one source profile.sourceIndexProfileName: the source profile that produced the document change. documentIds: the source document ids to refresh. cancellationToken: stops queue submission.
QueueRemoveSourceDocumentsAsync(sourceIndexProfileName, documentIds, cancellationToken)Enqueues targeted cleanup for deleted source documents under one source profile.sourceIndexProfileName: the source profile that produced the document delete. documentIds: the source document ids to remove. cancellationToken: stops queue submission.

IDataSourceDocumentReader

Reads raw documents from a source index, typically used during re-indexing or migration.

public interface IDataSourceDocumentReader
{
IAsyncEnumerable<KeyValuePair<string, SourceDocument>> ReadAsync(
IIndexProfileInfo indexProfile,
string keyFieldName,
string titleFieldName,
string contentFieldName,
CancellationToken cancellationToken = default);

IAsyncEnumerable<KeyValuePair<string, SourceDocument>> ReadByIdsAsync(
IIndexProfileInfo indexProfile,
IEnumerable<string> documentIds,
string keyFieldName,
string titleFieldName,
string contentFieldName,
CancellationToken cancellationToken = default);
}

IODataFilterTranslator

Translates OData $filter expressions into backend-native query syntax. Each backend (Elasticsearch, Azure AI Search) has its own implementation.

public interface IODataFilterTranslator
{
string Translate(string odataFilter);
}

For example, the Elasticsearch translator converts category eq 'support' into an Elasticsearch query DSL filter targeting the filters.category field.

Adding a Custom Backend

To add a custom vector store backend (e.g., Pinecone, Qdrant, Weaviate), implement all six keyed services:

const string providerName = "MyBackend";

builder.Services.AddKeyedScoped<IVectorSearchService, MyVectorSearchService>(providerName);
builder.Services.AddKeyedScoped<ISearchIndexManager, MySearchIndexManager>(providerName);
builder.Services.AddKeyedScoped<ISearchDocumentManager, MySearchDocumentManager>(providerName);
builder.Services.AddKeyedScoped<IDataSourceContentManager, MyDataSourceContentManager>(providerName);
builder.Services.AddKeyedScoped<IDataSourceDocumentReader, MyDataSourceDocumentReader>(providerName);
builder.Services.AddKeyedScoped<IODataFilterTranslator, MyODataFilterTranslator>(providerName);

If you implement a custom ISearchDocumentManager, make sure successful upsert and delete operations notify any registered ISearchDocumentHandler instances so automatic data-source synchronization continues to work for your provider.

Example: Custom Vector Search Implementation

public sealed class PineconeVectorSearchService : IVectorSearchService
{
private readonly PineconeClient _client;

public PineconeVectorSearchService(PineconeClient client)
{
_client = client;
}

public async Task<IEnumerable<DocumentChunkSearchResult>> SearchAsync(
IIndexProfileInfo indexProfile,
float[] embedding,
string referenceId,
string referenceType,
int topN,
CancellationToken cancellationToken = default)
{
var response = await _client.QueryAsync(new QueryRequest
{
Vector = embedding,
TopK = topN,
Namespace = indexProfile.IndexFullName,
IncludeMetadata = true,
}, cancellationToken);

return response.Matches.Select(match => new DocumentChunkSearchResult
{
DocumentId = match.Id,
Score = match.Score,
Content = match.Metadata["content"].ToString(),
});
}
}
warning

All six services must be registered with the same providerName key. The framework resolves them by key at runtime based on the data source configuration.

Configuration Guide

Data source backends are configured in appsettings.json under the CrestApps:Search section. Each backend has its own configuration section:

{
"CrestApps": {
"Search": {
"Elasticsearch": {
"Url": "https://localhost:9200",
"Username": "elastic",
"Password": "your-password"
},
"AzureAISearch": {
"Endpoint": "https://my-search.search.windows.net",
"ApiKey": "your-admin-api-key"
}
}
}
}

The configuration section is passed to the provider registration extension method:

// Bind from configuration
builder.Services.AddCoreElasticsearchServices(
builder.Configuration.GetSection("CrestApps:Elasticsearch"));

// Or bind Azure AI Search
builder.Services.AddCoreAzureAISearchServices(
builder.Configuration.GetSection("CrestApps:AzureAISearch"));

See the individual backend pages for detailed configuration options: Elasticsearch | Azure AI Search