Databricks is a Unified Analytics Platform on top of Apache Spark. The primary advantage of using Spark is its ability to distribute workloads across a cluster of machines. By adding more machines or increasing the number of cores on each machine, it is easy to horizontally scale a cluster to handle computationally intensive tasks like vector embedding, where parallelization can save many hours of precious computation time and resources. Leveraging GPUs with Spark can produce even better results — enjoying the benefits of the fast computation of a GPU combined with parallelization will ensure optimal performance.

Efficiently create, ingest, and update vector embeddings at scale with Databricks and Pinecone.

Setup guide

This guide shows you how efficiently create, ingest, and update vector embeddings at scale with Databricks and Pinecone.

  • Databricks: Vector embedding at scale

    Databricks is a Unified Analytics Platform on top of Apache Spark. The primary advantage of using Spark is its ability to distribute workloads across a cluster of machines. By adding more machines or increasing the number of cores on each machine, it is easy to horizontally scale a cluster to handle computationally intensive tasks like vector embedding, where parallelization can save many hours of precious computation time and resources. Leveraging GPUs with Spark can produce even better results — enjoying the benefits of the fast computation of a GPU combined with parallelization will ensure optimal performance.

    In addition to its parallel processing capabilities, Spark allows developers to write code in popular languages like Python and Scala, which are then optimized for parallel execution under the covers. This makes it easier for developers to focus on the data processing itself, rather than worrying about the details of distributed computing.

  • Pinecone: Vector search at scale

    Pinecone is a vector database that makes it easy to build high-performance vector search applications. It offers a number of key benefits for dealing with vector embeddings at scale, including ultra-low query latency at any scale, live index updates when you add, edit, or delete data, and the ability to combine vector search with metadata filtering or keyword search for more relevant results. Pinecone can easily handle very large scales of hundreds of millions and even billions of vector embeddings. Additionally, Pinecone is fully managed, so it’s easy to use and scale.

In this guide, you’ll create embeddings based on the sentence-transformers/all-MiniLM-L6-v2 model from Hugging Face, but the approach demonstrated here should work with any model and dataset you may choose.

1. Set up a Spark Cluster

  1. Create a Spark cluster.

    To speed up the creation of your embeddings, use a GPU-enabled instance.

  2. Install the Pinecone Spark connector as a library.

    On AWS Databricks or Google Cloud Databricks, select File path/S3 as the library source and JAR as the library type, and then use the following S3 URL:

    s3://pinecone-jars/0.2.1/spark-pinecone-uberjar.jar  
    

On Azure Databricks, download the compiled JAR file from the S3 bucket, select DBFS as the library source and JAR as the library type, and then upload the JAR file.

2. Load the dataset into partitions

As your example dataset, you’ll use a collection of news articles from Hugging Face’s datasets library.

  1. Create a new notebook attached to your cluster.

  2. Install dependencies:

    pip install datasets transformers pinecone-client torch  
    
  3. Load the dataset:

    Python
    from datasets import list_datasets, load_dataset  
    dataset_name = "allenai/multinews_sparse_max"  
    dataset = load_dataset(dataset_name, split="train")  
    
  4. Convert the dataset from the Hugging Face format and repartition it:

    Python
    dataset.to_parquet("/dbfs/tmp/dataset_parquet.pq")  
    num_workers = 10  
    dataset_df = spark.read.parquet("/tmp/dataset_parquet.pq").repartition(num_workers)  
    

    Once the repartition is complete, you get back a DataFrame, which is a distributed collection of the data organized into named columns. It is conceptually equivalent to a table in a relational database or a dataframe in R/Python, but with richer optimizations under the hood. As mentioned above, each partition in the dataframe has an equal amount of the original data.

  5. The dataset doesn’t have identifiers associated with each document, so add them:

    Python
    from pyspark.sql.types import StringType  
    from pyspark.sql.functions import monotonically_increasing_id  
    dataset_df = dataset_df.withColumn("id", monotonically_increasing_id().cast(StringType()))  
    

    As its name suggests, withColumn adds a column to the dataframe, containing a simple increasing identifier that you cast to a string.

3. Create the vector embeddings

In this step, you will create a UDF (User-Defined Function) to create the embeddings, using the AutoTokenizer and AutoModel classes from the Hugging Face transformers library. The UDF will be applied to each partition in a dataframe and executed on each row in each partition. The UDF will tokenize the document using AutoTokenzier and then pass the result to the model (in this case, sentence-transformers/all-MiniLM-L6-v2). Finally, you’ll produce the embeddings by extracting the last hidden layer from the result.

Once the UDF is created, it can be applied to a dataframe to transform the data in the specified column. The Python UDF will be sent to the Spark workers, where it will be used to transform the data. After the transformation is complete, the results will be sent back to the driver program and stored in a new column.

  1. Create a UDF (User-Defined Function) to create the embeddings, using the AutoTokenizer and AutoModel classes from the Hugging Face transformers library:

    Python
    from transformers import AutoTokenizer, AutoModel  
    def create_embeddings(partitionData):  
    tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")  
    model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")  
    for row in partitionData:  
        document = str(row.document)  
        inputs = tokenizer(document, padding=True, truncation=True, return_tensors="pt", max_length=512)  
        result = model(**inputs)  
        embeddings = result.last_hidden_state[:, 0, :].cpu().detach().numpy()  
        lst = embeddings.flatten().tolist()  
        yield [row.id, lst, "", "{}", None]  
    
  2. Apply the UDF to the data:

    Python
    embeddings = dataset_df.rdd.mapPartitions(create_embeddings)  
    

    A dataframe in Spark is a higher-level abstraction built on top of a more fundamental building block called an RDD - or Resilient Distributed Dataset. Here, you use the mapPartitions function, which provides finer control over the execution of the UDF by explicitly applying it to each partition of the RDD.

  3. Convert the resulting RDD back into a dataframe with the schema required by Pinecone:

    Python
    from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, IntegerType  
    schema = StructType([  
        StructField("id",StringType(),True),  
        StructField("values",ArrayType(FloatType()),True),  
        StructField("namespace",StringType(),True),  
        StructField("metadata", StringType(), True),  
        StructField("sparse_values", StructType([  
            StructField("indices", ArrayType(IntegerType(), False), False),  
            StructField("values", ArrayType(FloatType(), False), False)  
        ]), True)  
    ])  
    embeddings_df = spark.createDataFrame(data=embeddings,schema=schema)  
    

4. Save the embeddings in Pinecone

  1. Get your Pinecone API key and environment from the Pinecone console.

  2. Initialize the connection to Pinecone:

    Python
    import pinecone  
    api_key = "<YOUR_PINECONE_API_KEY>"  
    environment = "<YOUR-ENVIRONMENT>"  
    pinecone.init(api_key=api_key, environment=environment)  
    
  3. Create an index for your embeddings:

    Python
    index_name = 'news'  
    if index_name in pinecone.list_indexes():  
    pinecone.delete_index(index_name)  
    pinecone.create_index(name=index_name, dimension=384)  
    index = pinecone.Index(index_name=index_name)  
    
  4. Use the Pinecone Spark connector to save the embeddings to your index:

    Python
    (  
        embeddings_df.write  
        .option("pinecone.apiKey", api_key)  
        .option("pinecone.environment", environment)  
        .option("pinecone.projectName", pinecone.whoami().projectname)  
        .option("pinecone.indexName", index_name)  
        .format("io.pinecone.spark.pinecone.Pinecone")  
        .mode("append")  
        .save()  
    )  
    

The process of writing the embeddings to Pinecone should take approximately 15 seconds. When it completes, you’ll see the following:

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@41638051  
pineconeOptions: scala.collection.immutable.Map[String,String] = Map(pinecone.apiKey -><YOUR API KEY>, pinecone.environment -> us-west1-gcp, pinecone.projectName -><YOUR PROJECT NAME>, pinecone.indexName -> "news")  

This means the process was completed successfully and the embeddings have been stored in Pinecone.

Summary

Creating vector embeddings for large datasets can be challenging, but Databricks is a great tool to accomplish the task. Databricks makes it easy to set up a GPU cluster and handle the required dependencies, allowing for efficient creation of embeddings at scale.

Databricks and Pinecone are the perfect combination for working with very large vector datasets. Pinecone provides a way to efficiently store and retrieve the vectors created by Databricks, making it easy and performant to work with a huge number of vectors. Overall, the combination of Databricks and Pinecone provides a powerful and effective solution for creating embeddings for very large datasets. By parallelizing the embedding generation and the data ingestion processes, you can create a fast and resilient pipeline that will be able to index and update large volumes of vectors.

Was this page helpful?