Implement Search Feature In Your Flask App Using Elasticsearch

Elasticsearch may mean different things to different people, depending on their level of familiarity with the technology. At its core, Elasticsearch is largely a distributed open-source search and analytics engine for all types of data built on Apache Lucene and developed in Java, using NoSQL, meaning it stores data in an unstructured way and that we cannot use SQL to query it. It is at the heart of the ELK Stack (Elasticsearch, Kibana, and Logtash) such that it has become synonymous with the name of the stack itself.

The following are other sections in the Elasticsearch series. You can click on any of the links to learn more:

As you learned in the installation guide, support for full-text search is not standardized like relational databases are. Also, the fact that SQLAlchemy does not natively support the search functionality, we have to contend with the fact that we need to manually do this ourselves.



Table of Contents



Overview

From the Testing section in the Localhost installation guide, you saw how to index a document and get corresponding JSON data back. However, you may be curious to understand how everything works. The Elasticsearch documentation does a good job of trying to explain how indexing and parsing work. Let us cover some basic concepts of how it organizes data and its backend components.



Documents

They are the basic unit of JSON data that can be indexed in Elasticsearch. If you have interacted with rows in relational databases, a document is more or less like a row object. Each document has a unique ID and a given data type.

body={'test': 'this is the first test'}


Indices

An index is used to classify documents that have the same characteristics. It is what you would typically use to query against Elasticsearch to retrieve logically-related data. Say you have a blog, you can have an index for Articles, another for Comments et cetera. Each index has a name.

index='test_index'


Inverted Index

Elasticsearch uses a data structure called an inverted index that supports very fast full-text searches. An inverted index lists every unique word that appears in any document and identifies all of the documents each word occurs in. Let us look at this example to best understand what it is.

Inverted index

The inverted index does not really store items directly, but it instead splits each document up into individual search terms and then maps each term to the documents they occur in. By using inverted indices, Elasticsearch can quickly find the best matches for full-text searches from even large data sets.

When performing full-text searches, we are querying an inverted index and not the JSON documents that we defined when indexing the documents. A cluster can have at least one inverted index. That’s because there will be an inverted index for each full-text field per index. So if you have an index containing documents that contain five full-text fields, you will have five inverted indices.



Cluster

A cluster is a group of one or more node instances that are connected. The effectiveness of Elasticsearch is in the distribution of tasks to each node in the cluster.



Nodes

A node is a single server that participates in the indexing and search capabilities of a cluster.

  • Master node: Responsible for cluster-wide operations such as creating and deleting an index and adding or removing a node
  • Data node: Stores data and executes data-related operations such as search and aggregation
  • Client node: Forwards cluster requests to master and data nodes


Shards

It is possible to subdivide an index into multiple pieces called shards. Each shard is a full-functional and independent index that can be hosted on any node. By distributing documents across many shards and distributing those shards across multiple nodes, Elasticsearch can protect against hardware failures and increase query capacities as nodes are added to a cluster.



Build A Simple Flask App

We are going to search data generated by users in an app. We, therefore, need to store these user data in a database against which we shall query. This section presumes you are already familiar with working with databases as we will not go into details of how to set up the application. You can refer to this completed project on GitHub.

Sample project

The key things that your application will need to have are:

  • A model or models used to store user data (the Post model has been used in the sample project)
  • A form that allows users to post something
  • A page that displays all the data users posted in the app

Your navigation bar does not necessarily need to have a search form as seen in the image above. In a subsequent section below, we will learn how to add one such that it will be available across all web pages of the app.



Understanding Elasticsearch

In the localhost installation guide, you learned how to install and configure Elasticsearch. We will need the Python client library while managing it, so make sure you install it in your virtual environment too:

(venv)$ pip3 install elasticsearch && pip3 freeze > requirements.txt

The first thing we need to do is to connect to Elasticsearch:

(venv)$ flask shell


>>> from elasticsearch import Elasticsearch
>>> es = Elasticsearch('https://localhost:9200')

# Document 1
>>> es.index(index='test', id=1, body={'text': 'testing elasticsearch connection'})

# Document 2
>>> es.index(index='test', id=2, body={'text': 'this is another test'})

Above, we have written two documents with the index test each with a field called text. Let us try to search for the word test as seen in the body of each document.

>>> es.search(index='test', body={'query': {'match': {'text': 'test'}}})

The response from the es.search() call is a Python dictionary whose results are:

{
    'took': 16, 
    'timed_out': False, 
    '_shards': {
        'total': 1, 
        'successful': 1, 
        'skipped': 0, 
        'failed': 0
        }, 
    'hits': {
        'total': {
            'value': 1, 
            'relation': 'eq'
            }, 
        'max_score': 0.6548753, 
        'hits': [
                {
                    '_index': 'test', 
                    '_type': '_doc', 
                    '_id': '2', 
                    '_score': 0.6548753, 
                    '_source': {'text': 'this is another test'}
                }
            ]
        }
    }

The results query returned one document with a non-perfect score. A perfect score is 1. The returned document is the one that contains our search words. Typically, if there is more than one document, the one with the highest score is the one that contains almost the exact words searched for. Those that partially contain the words would be returned but with a lower score. To delete an index, you can run:

>>> es.indices.delete('test')


Configure Elasticsearch

Like with other configurations, the connection URL for Elasticsearch is going to be sourced from an environment variable.

config.py: Configure Elasticsearch

class Config(object):
    # ...

    ELASTICSEARCH_URL = os.environ.get('ELASTICSEARCH_URL') or None

If this variable is not defined, we are going to set it to None and use it as a signal to disconnect Elasticsearch. In certain situations, say during unit testing, we may not necessarily need Elasticsearch to run, and therefore, disabling it may come in handy. To instantiate this variable, we will need to update the .env file as follows:

.env: Elasticsearch URL

ELASTICSEARCH_URL=http:localhost:9200

The challenge with working with Elasticsearch is that it is not wrapped by a Flask extension, and cannot be initialized in a global scope as many other extensions. The only way to access this variable is through app.config which becomes available once a Flask context is created.

app/__init__.py: Access configurations

# ... 
from elasticsearch import Elasticsearch


app = Flask(__name__)
app.config.from_object(Config)

# Access value from the config module
app.elasticsearch = Elasticsearch([app.config['ELASTICSEARCH_URL']]) \
    if app.config['ELASTICSEARCH_URL'] else None

# ...

If you are using blueprints and a factory function, Elasticsearch configurations will only be accessible once create_app() function has been invoked.

app/__init__.py: Elasticsearch with a factory function

# ...
from elasticsearch import Elasticsearch
# ...

def create_app(config_class=Config):
    app = Flask(__name__)
    app.config.from_object(config_class)

    app.elasticsearch = Elasticsearch([app.config['ELASTICSEARCH_URL']]) \
        if app.config['app.config['ELASTICSEARCH_URL']'] else None


Generic Implementation Of Search Functionality

The assumption here is that we are not going to limit ourselves to one model when searching. The search functionality is going to be open in the sense that any model can be used in the search query. Also, it is best not to limit ourselves to only Elasticsearch for this functionality. There are a handful of powerful other search engines that we can use. Our implementation is going to be open to a possible switch to another search engine.

The first attempt to get this rolling is to identify what model and what fields in this model we would like to index. How can this be done? Well, we can define an attribute called __searchable__ which lists all the fields we would need to be included in the index.

app/modes.py: Identify model and fields to be indexed

# ...


class Post(db.Model):
    __searchable__ = ['body']
    # ...

Next, we can define all indexing, deletion, and querying of a model in a search module following the principle of separation of concerns.

app/search.py: Implement search functionality

from app import app

'''
If using the factory function, you can import the current_app from Flask as:

from flask import current_app

Then replace every instance of `app.` with `curent_app.`
'''

def add_to_index(index, model):
    # Check if elasticsearch is configured; if not return nothing
    if not app.elasticsearch:
        return
    payload = {}
    # Add the fields to be searched to a payload after looping through 
    # The selected fields of the model
    for field in model.__searchable__:
        payload[field] = getattr(model, field)
    app.elasticsearch.index(index=index, id=model.id, body=payload)

To ensure that the search functionality runs even when Elasticsearch has not been configured, we begin by checking if there are any configurations set, without which we return nothing. This is only a matter of convenience. If the configurations are set, then we loop through the fields listed in __searchable__ and pass those as payloads to a document's body whose ID is that of the model (uniquely convenient) and an index we can appropriately choose for ourselves (you will see this later).

In the context above, all data in the body field of the Post model will be added as a document that can be searched. Refer to the section Understanding Elasticsearch to see how documents can be added to an index before a search query. Using the same ID for SQLAlchemy and Elasticsearch is super convenient when running searches since we can link the two databases.

app/search.py: Delete an index

# ...

def remove_from_index(index, model):
    if not app.elasticsearch:
        return
    app.elasticsearch.delete(index=index, id=model.id)

The elasticsearch.delete() function is to be used to delete a document based on its ID, which is conveniently similar in both databases.

app/search.py: Query function

# ...

def query_index(index, query, page, per_page):
    if not app.elasticsearch:
        return [], 0
    search = app.elasticsearch.search(
        index=index,
        body={'query': {'multi_match': {'query': query, 'fields': ['*']}},
              'from': (page - 1) * per_page,
              'size': per_page})
    ids = [int(hit['_id']) for hit in search['hits']['hits']]
    return ids, search['hits']['total']['value']

The query_index function takes an index name together with the query to search for along with pagination controls. Unlike before where you saw the use of match, above, we are using multi-match which can search across multiple fields. The use of * in the key fields basically tells Elasticsearch to look in all fields.

Unfortunately, Elasticsearch does not provide a nice pagination control as is the case with SQLAlchemy. The implementation implements a custom logic to add pagination. The return statement returns two items, (1) A list of IDs from the search results and (2) the total number of results. Refer to the search results of the Understanding Elasticsearch section to learn more.

Let us test our work on a Flask shell:

(venv)$ flask shell

>>> from app.search import add_to_index, remove_from_index, query_index
>>> for post in Post.query.all():
        add_to_index('posts', post)
>>> query_index('posts', 'test one three the', 1, 20)
([5, 1, 6, 9, 10, 2, 3, 7, 8], 9)
>>> query_index('posts', 'test one three the', 1, 5)
([5, 1, 6, 9, 10], 9)
>>> query_index('posts', 'test one three the', 2, 5)
([2, 3, 7, 8], 9)
>>> query_index('posts', 'test one three the', 3, 5)
([], 9)

Our query returned a total of 9 results. When we asked for page 1 with 20 items, we got all 9 items. The subsequent examples have been used to show pagination the way we know from SQLAlchemy. After experimenting, we can delete the posts index as follows:

>>> app.elasticsearch.indices.delete('posts')


Combining The Search Functionality With SQLAlchemy

What we saw above is fantastic, but it is less than ideal. The search results should return the actual model(s) instead of IDs. With models, we can pass them to the templates for rendering. So, we need to find a way to replace the IDs with the actual models. Another obvious problem from our example above is that we have to explicitly issue an indexing call so that posts can be added or removed. A more convenient way would be for the application to automatically trigger such calls as soon as there is a change made to the SQLAlchemy database.



SearchableMixin Class

We are going to utilize a mixin class to solve the two-mentioned challenges of our prior example. A mixin class provides method implementation for reuse by multiple related child classes but is not considered a base class itself. The SearchableMixin class is going to act as a link between SQLAlchemy and Elasticsearch databases to automatically manage the associated full-text search index.

app/models.py: SearchableMixin class

# ...
from app.search import add_to_index, remove_from_index, query_index


class SearchableMixin(object):
    @classmethod
    def search(cls, expression, page, per_page):
        ids, total = query_index(cls.__tablename__, expression, page, per_page):
        if total == 0:
            return cls.query.filter_by(id=0), 0
        when = {}
        for i in range(len(ids)):
            when[ids[i]] = i
        return cls.query.filter(cls.id.in_(ids)).order_by(
            db.case(when, value=cls.id)), total

The search() function uses a class method to associate it with a given class rather than a particular instance. Instead of using self as is normally the case with a class, notice how I use the cls to make it clear that this method receives a class and not an instance as its first argument. Once it is attached to a model, say the Post model, the search method will be invoked as Post.search() without needing an actual instance of the class Post.

To begin, you will notice that the cls.__tablename__ is passed to query_index as the index name. This is going to be a convention such that the names assigned by SQLAlchemy to a model shall be used as the index name. The returned query is a series of positional elements (rather than a list) and their total number. We have used case from SQLAlchemy to retrieve the list of objects by their IDs in the order they were given. This is so because Elasticsearch returns a sorted query from more to less relevant.

The CASE construct in SQL is a conditional object that acts somewhat analogously to an “if/then” construct in other languages. It returns an instance of Case. case() in its usual form is passed a series of “when” constructs, that is, a list of conditions and results as tuples:

from sqlalchemy import case

stmt = select(users_table).\
        where(
            case(
                (users_table.c.name == 'wendy', 'W'),
                (users_table.c.name == 'jack', 'J'),
                else_='E'
            )
        )

We loop through the list of IDs to retrieve their positions, which is what we pass to the when dictionary. Before Flask-SQLAlchemy V3.0.2, the case() method took a list rather than a dictionary. Instead of returning a series of positional elements, we'd get a list of objects by their IDs.

class SearchableMixin(object):
    @classmethod
    def search(cls, expression, page, per_page):
        # ...
        when = []
        for i in range(len(ids)):
            when.append((ids[i], i))
        # ...

The latest version of Flask-sqlalchemy will return the error sqlalchemy.exc.ArgumentError: The "whens" argument to case(), when referring to a sequence of items, is now passed as a series of positional elements, rather than as a list. if a list is used.

To trigger changes from the SQLAlchemy database such as before and after a commit is made, we can define class methods for each.

app/models.py: Before and after commit

class SearchableMixin(object):
    # ...

    @classmethod
    def before_commit(cls, session):
        session._changes = {
            'add': list(session.new),
            'update': list(session.dirty),
            'delete': list(session.deleted)
        }
    
    @classmethod
    def after_commit(cls, session):
        for obj in session._changes['add']:
            if isinstance(obj, SearchableMixin):
                add_to_index(obj.__tablename__, obj)
        for obj in session._changes['update']:
            if isinstance(obj, SearchableMixin):
                add_to_index(obj.__tablename__, obj)
        for obj in session._changes['delete']:
            if isinstance(obj, SearchableMixin):
                remove_from_index(obj.__tablename__, obj)
        session._changes = None

Just before a session is committed, the before_commit() handler will allow us to check what object has been added, modified or deleted through session.new, session.dirty and session.delete respectively. These objects are not going to be available anymore after a commit is made. session._changes dictionary allows us to save the objects and have them survive a commit since we shall be using them to update the Elasticsearch index.

As soon as a session has been successfully committed, this is the proper time to make changes on the Elasticsearch side of things using after_commit(). We begin by iterating over what has been added, modified or deleted and make corresponding calls to the indexing functions in the search module for objects with SearchableMixin.

We can include a simple reindex() helper method that can allow us to refresh an index with all the data on the relational side. You may experience instances where you have to manually update the Elasticsearch index to ensure the latest changes are applied.

app/models.py: Reindexing

class SearchabelMixin(object):
    # ...

    @classmethod
    def reindex(cls):
        for obj in cls.query:
            add_to_index(cls.__tablename__, obj)

Given that reindex is a class method, you can run Model.reindex() to update the Elasticsearch index. Finally, to ensure that SQLAlchemy listens to database change events, we can call the function db.event.listen() from SQLAlchemy. This function serves to call before_commit and after_commit methods before and after each commit respectively.

app/models.py: Listen to database changes

class SearchableMixin(object):
    # ...

db.event.listen(db.session, 'before_commit', SearchableMixin.before_commit)
db.event.listen(db.session, 'after_commit', SearchableMixin.after_commit)

To ensure that SearchableMixin is fully incorporated into a model, we can pass it as a class argument to the select model as follows:

app/models.py: Integrate SearchableMixin into a model

class Post(SearchableMixin, db.Model):
    # ...

With this minor change to the Post model, we can maintain a full text-search for posts. Let us begin by initializing all the index from the posts currently in the database:

>>> Post.reindex()

And to search, we can do:

>>> query, total = Post.search('test the Japan', 1, 20)
>>> total
7
>>> query.all()
[Post: test comment, Post: Another one test for you, Post: Why the diss, Post: Japan the great country, Post: I am the greatest, Post: Two of the tests were not done well, Post: There has got to be the best way around it]


Define The Search Form

Everything we have done so far has been through the terminal. To complete this feature, we now need to provide a more user-friendly form that will allow users to search for posts in the application. Common across web browsers, search results use the q argument in the URL. For example, to search for gitauharrison on the browser, the search URL pointing to the results would look like https://www.google.com/search?q=gitauharrison. Let us begin by defining such a form:

app/forms.py: Search form

from flask import request

# ...

class SearchForm(FlaskForm):
    q = StringField(
        'Search',
        validators=[DataRequired()],
        render_kw={'placeholder': 'Search ...'})

    def __init__(self, *args, **kwargs):
        if 'formdata' not in kwargs:
            kwargs['formdata'] = request.args
        if 'meta' not in kwargs:
            kwargs['meta'] = {'csrf': False}
        super(SearchForm, self).__init__(*args, **kwargs)

A deviation from how you may be working with forms, a search form typically sends a GET request rather than a POST request. We have a constructor function called __init__() which provides the formdata and the meta arguments. If you are not aware, formdata is where Flask gets its form submissions. A POST request would use request.form while a GET request uses request.args.

By default, all forms have CSRF protection enabled. However, for clickable search links to work, this feature needs to be disabled hence why we are setting meta to {'csrf': False}. This tells Flask-wtf to bypass all csrf validation for this form.

Curious that this form lacks a submit button? Well, since it is a text field, a simple press of the Enter key on the keyboard will submit a user's input. That is why we do not necessarily need a submit button.



Access The Search Form Before Each Request

In this application, we are going to put the search form in the navigation bar, meaning it will be located in the base.html file and appear across all other templates. To avoid possible duplication of code where we have to instantiate the SearchForm object in each route and pass it to relevant templates, we can use the before_request handler.

app/routes.py: Instantiating the search form before each request

from flask import g
from app.forms import SearchForm


@app.before_request
def before_request():
    g.search_form = SearchForm()

Flask provides the g object. It is a variable that acts as storage. Data stored in this variable can persist through the life of a request. What will happen is that when a call to this handler ends and Flask invokes the view function responsible for rendering the search form, the g object will remain unchanged and still have the form attached to it.

Note that the g variable is specific to each request. If your application is serving multiple requests to clients (browsers), you can rely on this object to provide private storage for each request. You do not have to worry about one client's data being similar to data requested by another client.



Display The Search Form

This form is going to be in the reusable navigation bar.

app/templates/base.html: Display the search form

<nav class="navbar navbar-default">
    <div class="container">
        <div class="navbar-header">
            <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1" aria-expanded="false">
                <span class="sr-only">Toggle navigation</span>
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
            </button>
            <a class="navbar-brand" href=" {{ url_for('index') }} ">Add Search Feature</a>
        </div>
        <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">            
            <ul class="nav navbar-nav navbar-right">

                <!-- Search form -->
                {% if g.search_form %}
                    <form class="navbar-form navbar-left" action="{{ url_for('search') }}" method="get">
                        <div class="form-group">
                            {{ g.search_form.q(size=20, class_='form-control') }}
                        </div>
                    </form>
                {% endif %}
                <!-- End of the search form -->

            </ul>                       
        </div>
    </div>
</nav>

The display of the form is conveniently limited to only when the g.search_form has been defined. This prevents pages such as the error pages from having this. Notice that the request method is GET and that the view function to handle this request is specifically search. Typically, we would leave this blank so that the default URL handles this.



Render The Search Form

To complete the last functionality, we now need to define the search() view function that will handle search form submissions.

app/search.py: Search view function

@app.route('/search')
def search():
    if not g.search_form.validate():
        return redirect(url_for('index'))
    page = request.args.get('page', 1, type=int)
    posts, total = Post.search(
        g.search_form.q.data, page, app.config['POSTS_PER_PAGE'])
    next_url = url_for('search', q=g.search_form.q.data, page=page + 1) \
        if total > page * app.config['POSTS_PER_PAGE'] else None
    prev_url = url_for('search', q=g.search_form.q.data, page=page - 1) \
        if page > 1 else None
    return render_template(
        'search.html',
        title='Search',
        total=total,
        posts=posts,
        next_url=next_url,
        prev_url=prev_url)

The form.validate() has been used to primarily validate field values without checking how the data was submitted. Remember, our search form lacks the submit button. The function form.validate_on_submit() cannot be used here because it is ideal for POST requests where it validates field data upon submission.

Pagination is cleverly handled using the number of pages available depending on the outcome of the search results. Remember, Elasticsearch does not have inbuilt pagination as SQLAlchemy does. The search.html template will then look like this:

app/templates/search.html: Display search results

{% extends 'base.html' %}

{% block app_context %}
<div class="row">
    <div class="col-md-12">
        <h1 class="text-center">{{ title }} ({{ total }})</h1>

        <div class="row">
            <div class="col-lg-12">
                <!-- User posts -->
                <table class="table table-hover">
                    {% for post in posts %}
                        <tr>
                            <td width="70px"><img src="{{ post.author.avatar(35) }}" /></td>
                            <td><strong>{{ post.author.username }}</strong> said {{ moment(post.timestamp).fromNow() }}: <br> {{ post.body }}</td>
                        </tr>
                    {% endfor %}
                </table>
                <!-- End of listing user posts -->

                <!-- Pagination -->
                <nav aria-label="...">
                    <ul class="pager">
                        <li class="previous{% if not prev_url %} disabled{% endif %}">
                            <a href="{{ prev_url or '#' }}">
                                <span aria-hidden="true">&larr;</span> Previous results
                            </a>
                        </li>
                        <li class="next{% if not next_url %} disabled{% endif %}">
                            <a href="{{ next_url or '#' }}">
                                Next results <span aria-hidden="true">&rarr;</span>
                            </a>
                        </li>
                    </ul>
                </nav>
                <!-- End of pagination -->
            </div>
        </div>
    </div>  
</div>
{% endblock %}

You should see this:

Search results




Share

If you enjoyed this article, you can share it with another person.

Newsletter Subcription

Level up your skills.

We take your privacy seriously. Read our privacy policy. Unsubscribe | Resubscribe.


Comments (0)