Use the spark-pinecone connector to efficiently create, ingest, and update vector embeddings at scale with Databricks and Pinecone.

Install the Spark-Pinecone connector

  1. Install the Spark-Pinecone connector as a library.
  2. Configure the library as follows:
    1. Select File path/S3 as the Library Source.

    2. Enter the S3 URI for the Pinecone assembly JAR file:


      Databricks platform users must use the Pinecone assembly jar listed above to ensure that the proper dependecies are installed.

    3. Click Install.

Batch upsert

To batch upsert embeddings to Pinecone:

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, StringType, LongType

# Your API key and index name
api_key = "PINECONE_API_KEY"
index_name = "PINECONE_INDEX_NAME"
source_tag = "PINECONE_SOURCE_TAG"

COMMON_SCHEMA = StructType([
    StructField("id", StringType(), False),
    StructField("namespace", StringType(), True),
    StructField("values", ArrayType(FloatType(), False), False),
    StructField("metadata", StringType(), True),
    StructField("sparse_values", StructType([
        StructField("indices", ArrayType(LongType(), False), False),
        StructField("values", ArrayType(FloatType(), False), False)
    ]), True)

# Initialize Spark
spark = SparkSession.builder.getOrCreate()

# Read the file and apply the schema
df = \
    .option("multiLine", value = True) \
    .option("mode", "PERMISSIVE") \
    .schema(COMMON_SCHEMA) \

# Show if the read was successful

# Write the dataFrame to Pinecone in batches 
df.write \
    .option("pinecone.apiKey", api_key) \
    .option("pinecone.indexName", index_name) \
    .option("pinecone.sourceTag", source_tag) \
    .format("io.pinecone.spark.pinecone.Pinecone") \
    .mode("append") \

For a guide on how to set up batch upserts, refer to the Databricks integration page.

Stream upsert

To stream upsert embeddings to Pinecone:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, StringType, LongType
import os

# Your API key and index name
api_key = "PINECONE_API_KEY"
index_name = "PINECONE_INDEX_NAME"
source_tag = "PINECONE_SOURCE_TAG"

COMMON_SCHEMA = StructType([
    StructField("id", StringType(), False),
    StructField("namespace", StringType(), True),
    StructField("values", ArrayType(FloatType(), False), False),
    StructField("metadata", StringType(), True),
    StructField("sparse_values", StructType([
        StructField("indices", ArrayType(LongType(), False), False),
        StructField("values", ArrayType(FloatType(), False), False)
    ]), True)

# Initialize Spark session
spark = SparkSession.builder \
    .appName("StreamUpsertExample") \
    .config("spark.sql.shuffle.partitions", 3) \
    .master("local") \

# Read the stream of JSON files, applying the schema from the input directory
lines = spark.readStream \
    .option("multiLine", True) \
    .option("mode", "PERMISSIVE") \
    .schema(COMMON_SCHEMA) \

# Write the stream to Pinecone using the defined options
upsert = lines.writeStream \
    .format("io.pinecone.spark.pinecone.Pinecone") \
    .option("pinecone.apiKey", api_key) \
    .option("pinecone.indexName", index_name) \
    .option("pinecone.sourceTag", source_tag) \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .outputMode("append") \


