Skip to main content
Version: 2.18

Creating Pipelines

Learn the general principles of creating a pipeline.

You can use these instructions to create both indexing and query pipelines.

This task uses an example of a semantic document search pipeline.

Prerequisites

For each component you want to use in your pipeline, you must know the names of its input and output. You can check them on the documentation page for a specific component or in the component's run() method. For more information, see Components: Input and Output.

Steps to Create a Pipeline

1. Import dependencies

Import all the dependencies, like pipeline, documents, Document Store, and all the components you want to use in your pipeline. For example, to create a semantic document search pipelines, you need the Document object, the pipeline, the Document Store, Embedders, and a Retriever:

python
from haystack import Document, Pipeline
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever

2. Initialize components

Initialize the components, passing any parameters you want to configure:

python
document_store = InMemoryDocumentStore(embedding_similarity_function="cosine")
text_embedder = SentenceTransformersTextEmbedder()
retriever = InMemoryEmbeddingRetriever(document_store=document_store)

3. Create the pipeline

python
query_pipeline = Pipeline()

4. Add components

Add components to the pipeline one by one. The order in which you do this doesn't matter:

python
query_pipeline.add_component("component_name", component_type)

## Here is an example of how you'd add the components initialized in step 2 above:
query_pipeline.add_component("text_embedder", text_embedder)
query_pipeline.add_component("retriever", retriever)

## You could also add components without initializing them before:
query_pipeline.add_component("text_embedder", SentenceTransformersTextEmbedder())
query_pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store))

5. Connect components

Connect the components by indicating which output of a component should be connected to the input of the next component. If a component has only one input or output and the connection is obvious, you can just pass the component name without specifying the input or output.

To understand what inputs are expected to run your pipeline, use an .inputs() pipeline function. See a detailed examples in the Pipeline Inputs section below.

Here's a more visual explanation within the code:

python
## This is the syntax to connect components. Here you're connecting output1 of component1 to input1 of component2:
pipeline.connect("component1.output1", "component2.input1")

## If both components have only one output and input, you can just pass their names:
pipeline.connect("component1", "component2")

## If one of the components has only one output but the other has multiple inputs,
## you can pass just the name of the component with a single output, but for the component with
## multiple inputs, you must specify which input you want to connect

## Here, component1 has only one output, but component2 has multiple inputs:
pipeline.connect("component1", "component2.input1")

## And here's how it should look like for the semantic document search pipeline we're using as an example:
pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
## Because the InMemoryEmbeddingRetriever only has one input, this is also correct:
pipeline.connect("text_embedder.embedding", "retriever")

You need to link all the components together, connecting them gradually in pairs. Here's an explicit example for the pipeline we're assembling:

python
## Imagine this pipeline has four components: text_embedder, retriever, prompt_builder and llm.
## Here's how you would connect them into a pipeline:

query_pipeline.connect("text_embedder.embedding", "retriever")
query_pipeline.connect("retriever","prompt_builder.documents")
query_pipeline.connect("prompt_builder", "llm")

6. Run the pipeline

Wait for the pipeline to validate the components and connections. If everything is OK, you can now run the pipeline. Pipeline.run() can be called in two ways, either passing a dictionary of the component names and their inputs, or by directly passing just the inputs. When passed directly, the pipeline resolves inputs to the correct components.

python
## Here's one way of calling the run() method
results = pipeline.run({"component1": {"input1_value": value1, "input2_value": value2}})

## The inputs can also be passed directly without specifying component names
results = pipeline.run({"input1_value": value1, "input2_value": value2})

## This is how you'd run the semantic document search pipeline we're using as an example:
query = "Here comes the query text"
results = query_pipeline.run({"text_embedder": {"text": query}})

Pipeline Inputs

If you need to understand what component inputs are expected to run your pipeline, Haystack features a useful pipeline function .inputs() that lists all the required inputs for the components.

This is how it works:

python
## A short pipeline example that converts webpages into documents
from haystack import Pipeline
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.converters import HTMLToDocument
from haystack.components.writers import DocumentWriter

document_store = InMemoryDocumentStore()
fetcher = LinkContentFetcher()
converter = HTMLToDocument()
writer = DocumentWriter(document_store = document_store)

pipeline = Pipeline()
pipeline.add_component(instance=fetcher, name="fetcher")
pipeline.add_component(instance=converter, name="converter")
pipeline.add_component(instance=writer, name="writer")

pipeline.connect("fetcher.streams", "converter.sources")
pipeline.connect("converter.documents", "writer.documents")

## Requesting a list of required inputs
pipeline.inputs()

## {'fetcher': {'urls': {'type': typing.List[str], 'is_mandatory': True}},
## 'converter': {'meta': {'type': typing.Union[typing.Dict[str, typing.Any], typing.List[typing.Dict[str, typing.Any]], NoneType],
## 'is_mandatory': False,
## 'default_value': None},
## 'extraction_kwargs': {'type': typing.Optional[typing.Dict[str, typing.Any]],
## 'is_mandatory': False,
## 'default_value': None}},
## 'writer': {'policy': {'type': typing.Optional[haystack.document_stores.types.policy.DuplicatePolicy],
## 'is_mandatory': False,
## 'default_value': None}}}

From the above response, you can see that the urls input is mandatory for LinkContentFetcher. This is how you would then run this pipeline:

python
pipeline.run(data=
{"fetcher":
{"urls": ["https://docs.haystack.deepset.ai/docs/pipelines"]}
}
)

Example

The following example walks you through creating a RAG pipeline.

python
# import necessary dependencies
from haystack import Pipeline, Document
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.retrievers import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.builders import ChatPromptBuilder
from haystack.utils import Secret
from haystack.dataclasses import ChatMessage

# create a document store and write documents to it
document_store = InMemoryDocumentStore()
document_store.write_documents([
Document(content="My name is Jean and I live in Paris."),
Document(content="My name is Mark and I live in Berlin."),
Document(content="My name is Giorgio and I live in Rome.")
])

# A prompt corresponds to an NLP task and contains instructions for the model. Here, the pipeline will go through each Document to figure out the answer.
prompt_template = [
ChatMessage.from_system(
"""
Given these documents, answer the question.
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question:
"""
),
ChatMessage.from_user(
"{{question}}"
),
ChatMessage.from_system("Answer:")
]

# create the components adding the necessary parameters
retriever = InMemoryBM25Retriever(document_store=document_store)
prompt_builder = ChatPromptBuilder(template=prompt_template, required_variables="*")
llm = OpenAIChatGenerator(api_key=Secret.from_env_var("OPENAI_API_KEY"), model="gpt-4o-mini")

# Create the pipeline and add the components to it. The order doesn't matter.
# At this stage, the Pipeline validates the components without running them yet.
rag_pipeline = Pipeline()
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("llm", llm)

# Arrange pipeline components in the order you need them. If a component has more than one inputs or outputs, indicate which input you want to connect to which output using the format ("component_name.output_name", "component_name, input_name").
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")

# Run the pipeline by specifying the first component in the pipeline and passing its mandatory inputs. Optionally, you can pass inputs to other components.
question = "Who lives in paris?"
results = rag_pipeline.run(
{
"retriever": {"query": question},
"prompt_builder": {"question": question},
}
)

print(results["llm"]["replies"])

Here's what a visualized Mermaid graph of this pipeline would look like:


RAG pipeline diagram with three connected components: InMemoryBM25Retriever receives a query string and outputs documents, ChatPromptBuilder combines the documents with a question input to create prompt messages, and OpenAIChatGenerator processes the messages to produce replies. Each component box displays its class name and optional input parameters.