Semantic Index (#10329)

This introduces semantic indexing in Zed based on chunking text from
files in the developer's workspace and creating vector embeddings using
an embedding model. As part of this, we've created an embeddings
provider trait that allows us to work with OpenAI, a local Ollama model,
or a Zed hosted embedding.

The semantic index is built by breaking down text for known
(programming) languages into manageable chunks that are smaller than the
max token size. Each chunk is then fed to a language model to create a
high dimensional vector which is then normalized to a unit vector to
allow fast comparison with other vectors with a simple dot product.
Alongside the vector, we store the path of the file and the range within
the document where the vector was sourced from.

Zed will soon grok contextual similarity across different text snippets,
allowing for natural language search beyond keyword matching. This is
being put together both for human-based search as well as providing
results to Large Language Models to allow them to refine how they help
developers.

Remaining todo:

* [x] Change `provider` to `model` within the zed hosted embeddings
database (as its currently a combo of the provider and the model in one
name)


Release Notes:

- N/A

---------

Co-authored-by: Nathan Sobo <nathan@zed.dev>
Co-authored-by: Antonio Scandurra <me@as-cii.com>
Co-authored-by: Conrad Irwin <conrad@zed.dev>
Co-authored-by: Marshall Bowers <elliott.codes@gmail.com>
Co-authored-by: Antonio <antonio@zed.dev>
This commit is contained in:
Kyle Kelley 2024-04-12 10:40:59 -07:00 committed by GitHub
parent 4b40e83b8b
commit 49371b44cb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 2649 additions and 41 deletions

View file

@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS "embeddings" (
"model" TEXT,
"digest" BYTEA,
"dimensions" FLOAT4[1536],
"retrieved_at" TIMESTAMP NOT NULL DEFAULT now(),
PRIMARY KEY ("model", "digest")
);
CREATE INDEX IF NOT EXISTS "idx_retrieved_at_on_embeddings" ON "embeddings" ("retrieved_at");

View file

@ -6,6 +6,7 @@ pub mod channels;
pub mod contacts;
pub mod contributors;
pub mod dev_servers;
pub mod embeddings;
pub mod extensions;
pub mod hosted_projects;
pub mod messages;

View file

@ -0,0 +1,94 @@
use super::*;
use time::Duration;
use time::OffsetDateTime;
impl Database {
pub async fn get_embeddings(
&self,
model: &str,
digests: &[Vec<u8>],
) -> Result<HashMap<Vec<u8>, Vec<f32>>> {
self.weak_transaction(|tx| async move {
let embeddings = {
let mut db_embeddings = embedding::Entity::find()
.filter(
embedding::Column::Model.eq(model).and(
embedding::Column::Digest
.is_in(digests.iter().map(|digest| digest.as_slice())),
),
)
.stream(&*tx)
.await?;
let mut embeddings = HashMap::default();
while let Some(db_embedding) = db_embeddings.next().await {
let db_embedding = db_embedding?;
embeddings.insert(db_embedding.digest, db_embedding.dimensions);
}
embeddings
};
if !embeddings.is_empty() {
let now = OffsetDateTime::now_utc();
let retrieved_at = PrimitiveDateTime::new(now.date(), now.time());
embedding::Entity::update_many()
.filter(
embedding::Column::Digest
.is_in(embeddings.keys().map(|digest| digest.as_slice())),
)
.col_expr(embedding::Column::RetrievedAt, Expr::value(retrieved_at))
.exec(&*tx)
.await?;
}
Ok(embeddings)
})
.await
}
pub async fn save_embeddings(
&self,
model: &str,
embeddings: &HashMap<Vec<u8>, Vec<f32>>,
) -> Result<()> {
self.weak_transaction(|tx| async move {
embedding::Entity::insert_many(embeddings.iter().map(|(digest, dimensions)| {
let now_offset_datetime = OffsetDateTime::now_utc();
let retrieved_at =
PrimitiveDateTime::new(now_offset_datetime.date(), now_offset_datetime.time());
embedding::ActiveModel {
model: ActiveValue::set(model.to_string()),
digest: ActiveValue::set(digest.clone()),
dimensions: ActiveValue::set(dimensions.clone()),
retrieved_at: ActiveValue::set(retrieved_at),
}
}))
.on_conflict(
OnConflict::columns([embedding::Column::Model, embedding::Column::Digest])
.do_nothing()
.to_owned(),
)
.exec_without_returning(&*tx)
.await?;
Ok(())
})
.await
}
pub async fn purge_old_embeddings(&self) -> Result<()> {
self.weak_transaction(|tx| async move {
embedding::Entity::delete_many()
.filter(
embedding::Column::RetrievedAt
.lte(OffsetDateTime::now_utc() - Duration::days(60)),
)
.exec(&*tx)
.await?;
Ok(())
})
.await
}
}

View file

@ -11,6 +11,7 @@ pub mod channel_message_mention;
pub mod contact;
pub mod contributor;
pub mod dev_server;
pub mod embedding;
pub mod extension;
pub mod extension_version;
pub mod feature_flag;

View file

@ -0,0 +1,18 @@
use sea_orm::entity::prelude::*;
use time::PrimitiveDateTime;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "embeddings")]
pub struct Model {
#[sea_orm(primary_key)]
pub model: String,
#[sea_orm(primary_key)]
pub digest: Vec<u8>,
pub dimensions: Vec<f32>,
pub retrieved_at: PrimitiveDateTime,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -2,6 +2,7 @@ mod buffer_tests;
mod channel_tests;
mod contributor_tests;
mod db_tests;
mod embedding_tests;
mod extension_tests;
mod feature_flag_tests;
mod message_tests;

View file

@ -0,0 +1,84 @@
use super::TestDb;
use crate::db::embedding;
use collections::HashMap;
use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, QueryFilter};
use std::ops::Sub;
use time::{Duration, OffsetDateTime, PrimitiveDateTime};
// SQLite does not support array arguments, so we only test this against a real postgres instance
#[gpui::test]
async fn test_get_embeddings_postgres(cx: &mut gpui::TestAppContext) {
let test_db = TestDb::postgres(cx.executor().clone());
let db = test_db.db();
let provider = "test_model";
let digest1 = vec![1, 2, 3];
let digest2 = vec![4, 5, 6];
let embeddings = HashMap::from_iter([
(digest1.clone(), vec![0.1, 0.2, 0.3]),
(digest2.clone(), vec![0.4, 0.5, 0.6]),
]);
// Save embeddings
db.save_embeddings(provider, &embeddings).await.unwrap();
// Retrieve embeddings
let retrieved_embeddings = db
.get_embeddings(provider, &[digest1.clone(), digest2.clone()])
.await
.unwrap();
assert_eq!(retrieved_embeddings.len(), 2);
assert!(retrieved_embeddings.contains_key(&digest1));
assert!(retrieved_embeddings.contains_key(&digest2));
// Check if the retrieved embeddings are correct
assert_eq!(retrieved_embeddings[&digest1], vec![0.1, 0.2, 0.3]);
assert_eq!(retrieved_embeddings[&digest2], vec![0.4, 0.5, 0.6]);
}
#[gpui::test]
async fn test_purge_old_embeddings(cx: &mut gpui::TestAppContext) {
let test_db = TestDb::postgres(cx.executor().clone());
let db = test_db.db();
let model = "test_model";
let digest = vec![7, 8, 9];
let embeddings = HashMap::from_iter([(digest.clone(), vec![0.7, 0.8, 0.9])]);
// Save old embeddings
db.save_embeddings(model, &embeddings).await.unwrap();
// Reach into the DB and change the retrieved at to be > 60 days
db.weak_transaction(|tx| {
let digest = digest.clone();
async move {
let sixty_days_ago = OffsetDateTime::now_utc().sub(Duration::days(61));
let retrieved_at = PrimitiveDateTime::new(sixty_days_ago.date(), sixty_days_ago.time());
embedding::Entity::update_many()
.filter(
embedding::Column::Model
.eq(model)
.and(embedding::Column::Digest.eq(digest)),
)
.col_expr(embedding::Column::RetrievedAt, Expr::value(retrieved_at))
.exec(&*tx)
.await
.unwrap();
Ok(())
}
})
.await
.unwrap();
// Purge old embeddings
db.purge_old_embeddings().await.unwrap();
// Try to retrieve the purged embeddings
let retrieved_embeddings = db.get_embeddings(model, &[digest.clone()]).await.unwrap();
assert!(
retrieved_embeddings.is_empty(),
"Old embeddings should have been purged"
);
}

View file

@ -6,8 +6,8 @@ use axum::{
Extension, Router,
};
use collab::{
api::fetch_extensions_from_blob_store_periodically, db, env, executor::Executor, AppState,
Config, RateLimiter, Result,
api::fetch_extensions_from_blob_store_periodically, db, env, executor::Executor,
rpc::ResultExt, AppState, Config, RateLimiter, Result,
};
use db::Database;
use std::{
@ -23,7 +23,7 @@ use tower_http::trace::TraceLayer;
use tracing_subscriber::{
filter::EnvFilter, fmt::format::JsonFields, util::SubscriberInitExt, Layer,
};
use util::ResultExt;
use util::ResultExt as _;
const VERSION: &str = env!("CARGO_PKG_VERSION");
const REVISION: Option<&'static str> = option_env!("GITHUB_SHA");
@ -90,6 +90,7 @@ async fn main() -> Result<()> {
};
if is_collab {
state.db.purge_old_embeddings().await.trace_err();
RateLimiter::save_periodically(state.rate_limiter.clone(), state.executor.clone());
}

View file

@ -32,6 +32,8 @@ use axum::{
use collections::{HashMap, HashSet};
pub use connection_pool::{ConnectionPool, ZedVersion};
use core::fmt::{self, Debug, Formatter};
use open_ai::{OpenAiEmbeddingModel, OPEN_AI_API_URL};
use sha2::Digest;
use futures::{
channel::oneshot,
@ -568,6 +570,22 @@ impl Server {
app_state.config.google_ai_api_key.clone(),
)
})
})
.add_request_handler({
user_handler(move |request, response, session| {
get_cached_embeddings(request, response, session)
})
})
.add_request_handler({
let app_state = app_state.clone();
user_handler(move |request, response, session| {
compute_embeddings(
request,
response,
session,
app_state.config.openai_api_key.clone(),
)
})
});
Arc::new(server)
@ -4021,8 +4039,6 @@ async fn complete_with_open_ai(
session: UserSession,
api_key: Arc<str>,
) -> Result<()> {
const OPEN_AI_API_URL: &str = "https://api.openai.com/v1";
let mut completion_stream = open_ai::stream_completion(
&session.http_client,
OPEN_AI_API_URL,
@ -4276,6 +4292,128 @@ async fn count_tokens_with_language_model(
Ok(())
}
struct ComputeEmbeddingsRateLimit;
impl RateLimit for ComputeEmbeddingsRateLimit {
fn capacity() -> usize {
std::env::var("EMBED_TEXTS_RATE_LIMIT_PER_HOUR")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(120) // Picked arbitrarily
}
fn refill_duration() -> chrono::Duration {
chrono::Duration::hours(1)
}
fn db_name() -> &'static str {
"compute-embeddings"
}
}
async fn compute_embeddings(
request: proto::ComputeEmbeddings,
response: Response<proto::ComputeEmbeddings>,
session: UserSession,
api_key: Option<Arc<str>>,
) -> Result<()> {
let api_key = api_key.context("no OpenAI API key configured on the server")?;
authorize_access_to_language_models(&session).await?;
session
.rate_limiter
.check::<ComputeEmbeddingsRateLimit>(session.user_id())
.await?;
let embeddings = match request.model.as_str() {
"openai/text-embedding-3-small" => {
open_ai::embed(
&session.http_client,
OPEN_AI_API_URL,
&api_key,
OpenAiEmbeddingModel::TextEmbedding3Small,
request.texts.iter().map(|text| text.as_str()),
)
.await?
}
provider => return Err(anyhow!("unsupported embedding provider {:?}", provider))?,
};
let embeddings = request
.texts
.iter()
.map(|text| {
let mut hasher = sha2::Sha256::new();
hasher.update(text.as_bytes());
let result = hasher.finalize();
result.to_vec()
})
.zip(
embeddings
.data
.into_iter()
.map(|embedding| embedding.embedding),
)
.collect::<HashMap<_, _>>();
let db = session.db().await;
db.save_embeddings(&request.model, &embeddings)
.await
.context("failed to save embeddings")
.trace_err();
response.send(proto::ComputeEmbeddingsResponse {
embeddings: embeddings
.into_iter()
.map(|(digest, dimensions)| proto::Embedding { digest, dimensions })
.collect(),
})?;
Ok(())
}
struct GetCachedEmbeddingsRateLimit;
impl RateLimit for GetCachedEmbeddingsRateLimit {
fn capacity() -> usize {
std::env::var("EMBED_TEXTS_RATE_LIMIT_PER_HOUR")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(120) // Picked arbitrarily
}
fn refill_duration() -> chrono::Duration {
chrono::Duration::hours(1)
}
fn db_name() -> &'static str {
"get-cached-embeddings"
}
}
async fn get_cached_embeddings(
request: proto::GetCachedEmbeddings,
response: Response<proto::GetCachedEmbeddings>,
session: UserSession,
) -> Result<()> {
authorize_access_to_language_models(&session).await?;
session
.rate_limiter
.check::<GetCachedEmbeddingsRateLimit>(session.user_id())
.await?;
let db = session.db().await;
let embeddings = db.get_embeddings(&request.model, &request.digests).await?;
response.send(proto::GetCachedEmbeddingsResponse {
embeddings: embeddings
.into_iter()
.map(|(digest, dimensions)| proto::Embedding { digest, dimensions })
.collect(),
})?;
Ok(())
}
async fn authorize_access_to_language_models(session: &UserSession) -> Result<(), Error> {
let db = session.db().await;
let flags = db.get_user_flags(session.user_id()).await?;