Elasticsearch

Elasticsearch is a powerful open-source search engine and analytics platform that is widely used as a document store for keyword-based text search.

Pinecone is a vector database widely used for production applications — such as semantic search, recommenders, and threat detection — that require fast and fresh vector search at the scale of tens or hundreds of millions (or even billions) of embeddings. Although Pinecone offers hybrid search for keyword-aware semantic search, Pinecone is not a document store and does not replace Elasticsearch for keyword-only retrieval.

If you already use Elasticsearch and want to add Pinecone’s low-latency and large-scale vector search to your applications, this guide will show you how. You will see how to:

  • Add an embedding model to Elasticsearch
  • Transform text data into vector embeddings within Elasticsearch
  • Load those vector embeddings into Pinecone, with corresponding IDs and metadata.

Uploading the embedding model

We first need to upload the embedding model to our Elastic instance. To do so, we’ll use the [eland](https://github.com/elastic/eland) Elastic client. We’ll have to clone the "eland" repository and build the docker image before running it:

git clone [email protected]:elastic/eland.git
cd eland
docker build -t elastic/eland .

In this example, we’ll use the [sentence-transformers/msmarco-MiniLM-L-12-v3](https://huggingface.co/sentence-transformers/msmarco-MiniLM-L-12-v3) model from Hugging Face — although you could use any model you’d like. To upload the model to your Elasticsearch deployment, run the following command:

docker run -it --rm elastic/eland \
   eland_import_hub_model \
   --url https://<user>:<password>@<host>:<port>/ \
   --hub-model-id sentence-transformers/msmarco-MiniLM-L-12-v3 \
   --task-type text_embedding \
   --start

Note that you’ll have to replace the placeholders with your Elasticsearch instance user, password, host, and port. If you set up your own Elasticsearch instance, you would have already set the username and password when initially setting up the instance. If you’re using the hosted Elastic Stack, you can find the username and password in the "Security" section of the Elastic Stack console.

We can quickly test the uploaded model by running the following command in the Elasticsearch developer console:

POST /_ml/trained_models/sentence-transformers__msmarco-minilm-l-12-v3/deployment/_infer
{
 "docs": {
   "text_field": "Hello World!"
 }
}

We should get the following result:

{
 "predicted_value": [
   -0.06176435202360153,
   -0.008180409669876099,
   0.3309500813484192,
   0.38672536611557007,
   ...
 ]
}

This is the vector embedding for our query. We’re now ready to upload our dataset and apply the model to produce the vector embeddings.

Uploading the dataset

Next, upload a dataset of documents to Elasticsearch. In this example, we’ll use a subset of the MSMacro dataset. You can download the file or run the following command:

curl -O https://msmarco.blob.core.windows.net/msmarcoranking/msmarco-passagetest2019-top1000.tsv.gz
gunzip msmarco-passagetest2019-top1000.tsv

In this example, we’ll be using the hosted Elastic Stack, which makes it easier to use various integrations. We’ll use the "Upload" integration to load the data into an Elasticsearch index.

add-dataadd-data

We’ll drag the unzipped TSV file. The Upload integration will sample the data for us and show the following:

data-previewdata-preview

We’ll click the "Import" button and continue to name the index:

importimport

Once the import is complete, you’ll see the following:

import-completeimport-complete

Clicking "View index in Discover" will reveal the index view where we can look at the uploaded data:

discover-indexdiscover-index

Creating the embeddings

We’ve now created an index for our data. Next, we’ll create a pipeline to produce a vector embedding for each document. We’ll head to the Elasticsearch developer console and issue the following command to create the pipeline:

PUT _ingest/pipeline/produce-embeddings
{
 "description": "Vector embedding pipeline",
 "processors": [
   {
     "inference": {
       "model_id": "sentence-transformers__msmarco-minilm-l-12-v3",
       "target_field": "text_embedding",
       "field_map": {
         "text": "text_field"
       }
     }
   }
 ],
 "on_failure": [
   {
     "set": {
       "description": "Index document to 'failed-<index>'",
       "field": "_index",
       "value": "failed-{{{_index}}}"
     }
   },
   {
     "set": {
       "description": "Set error message",
       "field": "ingest.failure",
       "value": "{{_ingest.on_failure_message}}"
     }
   }
 ]
}

The "processor" definition tells Elasticsearch which model to use and which field to read from. The "on_failure" definition defines the failure behavior that Elasticsearch will apply — specifically, which error message to write and which file to write them into.

Once the embedding pipeline is created, we’ll re-index our "msmacro-raw" index, applying the embedding pipeline to produce the new embeddings. In the developer console, execute the following command:

POST _reindex?wait_for_completion=false
{
 "source": {
   "index": "msmacro-raw"
 },
 "dest": {
   "index": "msmacro-with-embeddings",
   "pipeline": "text-embeddings"
 }
}

This will kick off the embedding pipeline. We’ll get a task id which we can track with the following command:

GET _tasks/<task_id>

Looking at the index, we can see that the embeddings have been created in an object called "text_embeddings" under the field "predicted_value".

To make the loading process a bit easier, we’re going to pluck the "predicted_value" field and add it as its own column:

POST _reindex?wait_for_completion=false
{
 "source": {
   "index": "msmacro-with-embeddings"
 },
 "dest": {
   "index": "msmacro-with-embeddings-flat"
 },
 "script": {
   "source": "ctx._source.predicted_value = ctx._source.text_embedding.predicted_value"
 }
}

Next, we’ll load the embeddings into Pinecone. Since the index size is considerable, we’ll use Apache Spark to parallelize the process.

Moving the Elasticsearch index to Pinecone

In this example, we’ll be using Databricks to handle the process of loading Elasticsearch index to Pinecone. We’ll add the Elasticsearch Spark from Maven by navigating to the “Libraries” tab in the cluster settings view, and clicking “Install new”:

cluster-setupcluster-setup

Use the following Maven coordinates:

org.elasticsearch:elasticsearch-spark-30_2.12:8.5.2

We’ll add the Pinecone Databricks connectors from S3:

s3://pinecone-jars/spark-pinecone-uberjar.jar

Restart the cluster if needed. Next, we’ll create a new notebook, attach it to the cluster and import the required dependencies:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._

We’ll initialize the Spark context:

val spark = SparkSession.builder.appName("elasticSpark").master("local[*]").getOrCreate()

Next, we’ll read the index from Elasticsearch:

val df = (spark.read
     .format( "org.elasticsearch.spark.sql" )
     .option( "es.nodes",   "<ELASTIC_URL>" )
     .option( "es.net.http.auth.user", "<ELASTIC_USER>" )
     .option( "es.net.http.auth.pass", "<ELASTIC_PASSWORD>" )
     .option( "es.port",    443     )
     .option( "es.nodes.wan.only", "true" )
     .option("es.net.ssl", "true")
     .option("es.read.field.as.array.include","predicted_value:1")
     .load( "msmacro-with-embeddings")
 )

Note that to ensure the index is read correctly into the dataframe, we must specify that the “predicted_value” field is an array with a depth of 1, as shown below:

  .option("es.read.field.as.array.include","predicted_value:1")

Next, we’ll use the Pinecone Spark connector to load this dataframe into a Pinecone index. We’ll start by creating an index in the Pinecone console. Log in to the console and click “Create Index”. Then, name your index, and configure it to use 384 dimensions.

create-indexcreate-index

When you’re done configuring the index, click “Create Index”.

We have to do some prep work to get the dataframe ready for indexing. In order to index the original document with the embeddings we’ve created, we’ll create the following UDF which will encode the original document as a Base64 string. This will ensure the metadata object will remain a valid JSON object regardless of the content of the document.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import java.util.Base64

val text_to_metadata = udf((text: String) => "{ \"document\" : \"" +  Base64.getEncoder.encodeToString(text.getBytes("UTF-8")) + "\" }")

We’ll apply the UDF and get rid of some unnecessary columns:

val clean_df = df.drop("text_embedding").withColumnRenamed("predicted_value", "vector").withColumn("metadata", text_to_metadata(col("text_field"))).withColumn("namespace", lit("")).drop("text_field")

Next, we’ll use the Pinecone Spark connector:

val pineconeOptions = Map(
  "pinecone.apiKey" -> "<PINECONE_API_KEY>",
  "pinecone.environment" -> "us-west1-gcp",
  "pinecone.projectName" -> "<PROJECT_IDENTIFIER>",
  "pinecone.indexName" -> "elastic-index"
)

clean_df.write
  .options(pineconeOptions)
  .format("io.pinecone.spark.pinecone.Pinecone")
  .mode(SaveMode.Append)
  .save()

Our vectors have been added to our Pinecone index!

To query the index, we’ll need to generate a vector embedding for our query first, using the sentence-transformers/msmarco-MiniLM-L-12-v3 model. Then, we’ll use the Pinecone client to issue the query. We'll do this in a Python notebook.

We’ll start by installing the required dependencies:

!pip install -qU pinecone-client sentence-transformers pandas

Next, we’ll set up the client:

import pinecone

# connect to pinecone environment
pinecone.init(
   api_key="<PINECONE API KEY>",
   environment="us-west1-gcp"
)

We’ll set up the index:

index_name = "elastic-index"
index = pinecone.Index(index_name)

We’ll create a helper function that will decode the encoded documents we get:

def decode_entries(entries):
   return list(map(lambda entry: {
       "id": entry["id"],
       "score": entry["score"],
       "document": base64.b64decode(entry["metadata"]["document"]).decode("UTF-8"),
   }, entries))

Next, we’ll create a function that will encode our query, query the index and convert the display the data using Pandas:

def queryIndex(query, num_results):
 vector = model.encode(query).tolist()
 result = index.query(vector, top_k=num_results, include_metadata=True)
 return pd.DataFrame(decode_entries(result.matches))

Finally, we’ll test our index:

display(queryIndex("star trek", 10))

Should yield the results:
resultsresults

Summary

In conclusion, by following the steps outlined in this post, you can easily upload an embedding model to Elasticsearch, ingest raw textual data, create the embeddings, and load them into Pinecone. With this approach, you can take advantage of the benefits of integrating Elasticsearch and Pinecone. As mentioned, while Elasticsearch is optimized for indexing documents, Pinecone provides vector storage and search capabilities that can handle hundreds of millions and even billions of vectors.