from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, StringType, LongType
import os
# Your API key and index name
api_key = "PINECONE_API_KEY"
index_name = "PINECONE_INDEX_NAME"
source_tag = "PINECONE_SOURCE_TAG"
COMMON_SCHEMA = StructType([
StructField("id", StringType(), False),
StructField("namespace", StringType(), True),
StructField("values", ArrayType(FloatType(), False), False),
StructField("metadata", StringType(), True),
StructField("sparse_values", StructType([
StructField("indices", ArrayType(LongType(), False), False),
StructField("values", ArrayType(FloatType(), False), False)
]), True)
])
# Initialize Spark session
spark = SparkSession.builder \
.appName("StreamUpsertExample") \
.config("spark.sql.shuffle.partitions", 3) \
.master("local") \
.getOrCreate()
# Read the stream of JSON files, applying the schema from the input directory
lines = spark.readStream \
.option("multiLine", True) \
.option("mode", "PERMISSIVE") \
.schema(COMMON_SCHEMA) \
.json("path/to/input/directory/")
# Write the stream to Pinecone using the defined options
upsert = lines.writeStream \
.format("io.pinecone.spark.pinecone.Pinecone") \
.option("pinecone.apiKey", api_key) \
.option("pinecone.indexName", index_name) \
.option("pinecone.sourceTag", source_tag) \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.outputMode("append") \
.start()
upsert.awaitTermination()