Use the spark-pinecone connector to efficiently create, ingest, and update vector embeddings at scale with Databricks and Pinecone.

Install the Spark-Pinecone connector

  1. Install the Spark-Pinecone connector as a library.
  2. Configure the library as follows:
    1. Select File path/S3 as the Library Source.

    2. Enter the S3 URI for the Pinecone assembly JAR file:

      s3://pinecone-jars/1.1.0/spark-pinecone-uberjar.jar  
      

      Databricks platform users must use the Pinecone assembly jar listed above to ensure that the proper dependecies are installed.

    3. Click Install.

Batch upsert

To batch upsert embeddings to Pinecone:

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, StringType, LongType

# Your API key and index name
api_key = "PINECONE_API_KEY"
index_name = "PINECONE_INDEX_NAME"
source_tag = "PINECONE_SOURCE_TAG"

COMMON_SCHEMA = StructType([
    StructField("id", StringType(), False),
    StructField("namespace", StringType(), True),
    StructField("values", ArrayType(FloatType(), False), False),
    StructField("metadata", StringType(), True),
    StructField("sparse_values", StructType([
        StructField("indices", ArrayType(LongType(), False), False),
        StructField("values", ArrayType(FloatType(), False), False)
    ]), True)
])

# Initialize Spark
spark = SparkSession.builder.getOrCreate()

# Read the file and apply the schema
df = spark.read \
    .option("multiLine", value = True) \
    .option("mode", "PERMISSIVE") \
    .schema(COMMON_SCHEMA) \
    .json("src/test/resources/sample.jsonl")

# Show if the read was successful
df.show()

# Write the dataFrame to Pinecone in batches 
df.write \
    .option("pinecone.apiKey", api_key) \
    .option("pinecone.indexName", index_name) \
    .option("pinecone.sourceTag", source_tag) \
    .format("io.pinecone.spark.pinecone.Pinecone") \
    .mode("append") \
    .save()

For a guide on how to set up batch upserts, refer to the Databricks integration page.

Stream upsert

To stream upsert embeddings to Pinecone:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, StringType, LongType
import os

# Your API key and index name
api_key = "PINECONE_API_KEY"
index_name = "PINECONE_INDEX_NAME"
source_tag = "PINECONE_SOURCE_TAG"

COMMON_SCHEMA = StructType([
    StructField("id", StringType(), False),
    StructField("namespace", StringType(), True),
    StructField("values", ArrayType(FloatType(), False), False),
    StructField("metadata", StringType(), True),
    StructField("sparse_values", StructType([
        StructField("indices", ArrayType(LongType(), False), False),
        StructField("values", ArrayType(FloatType(), False), False)
    ]), True)
])

# Initialize Spark session
spark = SparkSession.builder \
    .appName("StreamUpsertExample") \
    .config("spark.sql.shuffle.partitions", 3) \
    .master("local") \
    .getOrCreate()

# Read the stream of JSON files, applying the schema from the input directory
lines = spark.readStream \
    .option("multiLine", True) \
    .option("mode", "PERMISSIVE") \
    .schema(COMMON_SCHEMA) \
    .json("path/to/input/directory/")

# Write the stream to Pinecone using the defined options
upsert = lines.writeStream \
    .format("io.pinecone.spark.pinecone.Pinecone") \
    .option("pinecone.apiKey", api_key) \
    .option("pinecone.indexName", index_name) \
    .option("pinecone.sourceTag", source_tag) \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .outputMode("append") \
    .start()

upsert.awaitTermination()

Learn more