Databricks

Using Databricks and Pinecone to create and index vector embeddings at scale

Databricks, built on top of Apache Spark, is a powerful platform for data processing and analytics, known for its ability to efficiently handle large datasets. In this guide, we will show you how to use Spark (with Databricks) to create vector embeddings and load them into Pinecone.

First, let’s discuss why using Databricks and Pinecone is necessary in this context. When you process less than a million records, using a single machine might be sufficient. But when you work with hundreds of millions of records, you have to start thinking about how the operation scales. We need to consider two things:

  1. How efficiently can we generate the embeddings at scale?
  2. How efficiently would we be able to ingest and update these embeddings, at scale?

Databricks is a great tool for creating embeddings at scale: it allows us to parallelize the process over multiple machines and leverage GPUs to accelerate the process.

Pinecone lets us efficiently ingest, update and query hundreds of millions or even billions of embeddings. As a managed service, Pinecone can guarantee a very high degree of reliability and performance when it comes to datasets of this size.

Pinecone provides a specialized connector for Databricks that is optimized to ingest data from Databricks and into Pinecone. That allows the ingestion process to be completed much faster than it would have if we were to use Pinecone’s REST or gRPC APIs on a large-scale dataset.

Together, Pinecone and Databricks make a great combination for managing the entire lifecycle of vector embeddings at scale.

Why Databricks?

Databricks is a Unified Analytics Platform on top of Apache Spark. The primary advantage of using Spark is its ability to distribute the workload across a cluster of machines, allowing it to process large amounts of data quickly and efficiently. By adding more machines or increasing the number of cores on each machine, it is easy to horizontally scale the cluster as needed to handle larger workloads.

At the core of Spark is the map-reduce pattern, where data is divided into partitions and a series of transformations is applied to each partition in parallel. The results from each partition are then automatically collected and aggregated into the final result. This approach makes Spark both fast and fault-tolerant, as it can retry failed tasks without requiring the entire workload to be reprocessed.

In addition to its parallel processing capabilities, Spark allows developers to write code in popular languages like Python and Scala, which are then optimized for parallel execution under the covers. This makes it easier for developers to focus on the data processing itself, rather than worrying about the details of distributed computing.

Vector embedding is a computationally intensive task, where parallelization can save many hours of precious computation time and resources. Leveraging GPUs with Spark can produce even better results — enjoying the benefits of the fast computation of a GPU combined with parallelization will ensure optimal performance.

Databricks makes it easier to work with Apache Spark: it provides easy set-up and tear-down of clusters, dependency management, compute allocation, storage solution integrations, and more.

Why Pinecone?

Pinecone is a vector database that makes it easy to build high-performance vector search applications. It offers a number of key benefits for dealing with vector embeddings at scale, including ultra-low query latency at any scale, live index updates when you add, edit, or delete data, and the ability to combine vector search with metadata filtering or keyword search for more relevant results. As mentioned before, Pinecone can easily handle very large scales of hundreds of millions and even billions of vector embeddings. Additionally, Pinecone is fully managed, so it's easy to use and scale.

With Pinecone, you can easily index and search through vector embeddings. It is ideal for a variety of use cases such as semantic text search, question-answering, visual search, recommendation systems, and more.

In this example, we'll create embeddings based on the sentence-transformers/all-MiniLM-L6-v2 model from Hugging Face. We'll then use a dataset with a large volume of documents to produce the embeddings and upsert them into Pinecone. Note that the actual model and dataset we'll use are immaterial for this example. This method should work on any embeddings you may want to create, with whatever dataset you may choose.

In order to create embeddings at scale, we need to do four things:

  1. Set up a Spark cluster
  2. Load the dataset into partitions
  3. Apply an embedding model on each entry to produce the embedding
  4. Save the results

Let's get started!

Setting up a Spark Cluster

Using Databricks makes it easy to speed up the creation of our embedding even more by using GPUs instead of CPUs in our cluster. To do this, navigate to the "Compute" section in your Databricks console, and select the following options:

cluster-setup

Next, we'll add the Pinecone Spark connector to our cluster. Navigate to the "Libraries" tab and click "Instal" new”.

install

Select "DBF"/S3” and paste the following S3 URI:

s3://pinecone-jars/spark-pinecone-uberjar.jar

s3-install

To complete the installation, click "Install". To use the new cluster, create a new notebook and attach it to the newly created cluster.

Environment Setup

We'll start by installing some dependencies:

%pip install datasets transformers pinecone-client torch

Next, we'll set up the connection to Pinecone. You'll have to retrieve the following information from your Pinecone console:

  1. API Key: navigate to your project and click the "API Keys" button on the sidebar
  2. Environment: check the browser url to fetch the environment. https://app.pinecone.io/organizations/[org-id]/projects/[environment]:[project_name]/indexes

Your index name will be the same index name used when we initialized the index (in this case, news).

import pinecone

api_key = # <YOUR_PINECONE_API_KEY>
environment = 'us-west1-gcp'
pinecone.init(api_key=api_key, environment=environment)

Next, we'll create a new index in Pinecone, where our vector embeddings will be saved:

index_name = 'news'

if index_name in pinecone.list_indexes():
   pinecone.delete_index(index_name)
pinecone.create_index(name=index_name, dimension=384)
index = pinecone.Index(index_name=index_name)

Load the dataset into partitions

In this example, we'll use a collection of news articles as our example dataset. We'll use Hugging Face's datasets library and load the data into our environment:

from datasets import list_datasets, load_dataset

dataset_name = "allenai/multinews_sparse_max"
dataset = load_dataset(dataset_name, split="train")

Next, we'll convert the dataset from the Hugging Face format and repartition it:

dataset.to_parquet('/dbfs/tmp/dataset_parquet.pq')
num_workers = 10
dataset_df = spark.read.parquet('/tmp/dataset_parquet.pq').repartition(num_workers)

Once the repartition is complete, we get back a DataFrame, which is a distributed collection of the data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. As mentioned above, each partition in the dataframe has an equal amount of the original data.

The dataset doesn't have identifiers associated with each document, so let's add them:

from pyspark.sql.types import StringType
from pyspark.sql.functions import monotonically_increasing_id

dataset_df = dataset_df.withColumn('id', monotonically_increasing_id().cast(StringType()))

As its name suggests, withColumn adds a column to the dataframe, containing a simple increasing identifier that we cast to a string. Great! Now we have identifiers for each document. Let's move on to creating the embeddings for each document.

Create a function for transforming text into embeddings

In this example, we will create a UDF (User Defined Function) to create the embeddings, using the AutoTokenizer and AutoModel classes from the Hugging Face transformers library. The UDF will be applied to each partition in a dataframe. When applied to a partition, a UDF is executed on each row in the partition. The UDF will tokenize the document using AutoTokenzier and then pass the result to the model (in this case we're using sentence-transformers/all-MiniLM-L6-v2). Finally, we'll produce the embeddings themselves by extracting the last hidden layer from the result.

Once the UDF is created, it can be applied to a dataframe to transform the data in the specified column. The Python UDF will be sent to the Spark workers, where it will be used to transform the data. After the transformation is complete, the results will be sent back to the driver program and stored in a new column.

from transformers import AutoTokenizer, AutoModel

def create_embeddings(partitionData):
   tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
   model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")

   for row in partitionData:
       document = str(row.document)
       inputs = tokenizer(document, padding=True, truncation=True, return_tensors="pt", max_length=512)
       result = model(**inputs)
       embeddings = result.last_hidden_state[:, 0, :].cpu().detach().numpy()
       lst = embeddings.flatten().tolist()
       yield [row.id, lst, '', '{}']

Applying the UDF to the data

A dataframe in Spark is a higher-level abstraction built on top of a more fundamental building block called an RDD - or Resilient Distributed Dataset. We're going to use the mapPartitions function that gives us finer control over the execution of our UDF, by explicitly applying it to each partition of the RDD.

embeddings = dataset_df.rdd.mapPartitions(create_embeddings)

Next, we’ll convert the resulting RDD back into a dataframe with the schema required by Pinecone:

from pyspark.sql.types import StructType,StructField, ArrayType, FloatType

schema = StructType([
    StructField("id",StringType(),True),
    StructField("vector",ArrayType(FloatType()),True),
    StructField("namespace",StringType(),True),
    StructField("metadata", StringType(), True),
  ])

embeddings_df = spark.createDataFrame(data=embeddings,schema=schema)

Upserting the embeddings

Lastly, we'll use the Pinecone Spark connector to save the embeddings to our index.


(
    df.write
    .option("pinecone.apiKey", api_key)
    .option("pinecone.environment", environment)
    .option("pinecone.projectName", pinecone.whoami().projectname)
    .option("pinecone.indexName", index_name)
    .format("io.pinecone.spark.pinecone.Pinecone")
    .mode("append")
    .save()
)

The process of writing the embeddings to Pinecone should take approximately 15 seconds. When it completes, you’ll see the following:

spark: org.apache.spark.sql.SparkSession = [email protected]

pineconeOptions: scala.collection.immutable.Map[String,String] = Map(pinecone.apiKey -><YOUR API KEY>, pinecone.environment -> us-west1-gcp, pinecone.projectName -><YOUR PROJECT NAME>, pinecone.indexName -> "news")

This means the process was completed successfully and the embeddings have been stored in Pinecone.

Summary

Creating vector embeddings for large datasets can be challenging, but Databricks a great tool to accomplish the task. Databricks makes it easy to set up a GPU cluster and handle the required dependencies, allowing for efficient creation of embeddings at scale.

Databricks and Pinecone are the perfect combination for working with very large vector datasets. Pinecone provides a way to efficiently store and retrieve the vectors created by Databricks, making it easy and performant to work with a huge number of vectors. Overall, the combination of Databricks and Pinecone provides a powerful and effective solution for creating embeddings for very large datasets. By parallelizing the embedding generation and the data ingestion processes, we can create a fast and resilient pipeline that will be able to index and update large volumes of vectors.