[root@CentOS ~]# pip install elasticsearch
2.1 在Flask中配置Elasticsearch
首先在Flask配置类config.py中添加:
# 通用环境配置基类
class Config(object):
ELASTICSEARCH_URL = os.environ.get('ELASTICSEARCH_URL') or '192.168.40.128:9200'
然后在应用程序工厂函数app/__ini__.py中添加一个elasticsearch属性:
from elasticsearch import Elasticsearch
def create_app(config_name=None):
"""Factory Pattern: Create Flask app."""
app = Flask(__name__)
configure_app(app, config_name)
def configure_app(app, config_name):
"""Configures App."""
app.config.from_object(config[config_name])
# 全文搜索
app.elasticsearch = Elasticsearch([app.config['ELASTICSEARCH_URL']]) \
if app.config['ELASTICSEARCH_URL'] else None
config[config_name].init_app(app)
2.2 全文检索抽象化
创建app/utils/elasticsearch.py:
from flask import current_app
from elasticsearch.exceptions import NotFoundError
def add_to_index(index, model):
if not current_app.elasticsearch:
return
# 由于博客主要是中文的,所以使用 ik 中文分词插件。先要配置 Index 的 mapping
if not current_app.elasticsearch.indices.exists(index=index): # 如果是第一次插入,Index 还没创建
# 创建 Index
current_app.elasticsearch.indices.create(index=index, ignore=400)
# IK 模板,这里假设每个字段都用 text 类型,如果你要修改,也可以通过 __searchable__ 传递过来
chinese_field_config = {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
properties = {}
for field in model.__searchable__:
properties[field] = chinese_field_config
mapping = {
index: {
"properties": properties
current_app.elasticsearch.indices.put_mapping(index=index, doc_type=index, body=mapping)
# 插入新文档
payload = {}
for field in model.__searchable__:
payload[field] = getattr(model, field)
current_app.elasticsearch.index(index=index, doc_type=index, id=model.id,
body=payload)
def remove_from_index(index, model):
if not current_app.elasticsearch:
return
try:
current_app.elasticsearch.delete(index=index, doc_type=index, id=model.id)
except NotFoundError as e:
def query_index(index, query):
if not current_app.elasticsearch:
return [], 0
# 中文分词器 ik 会将 query 拆分成哪些查找关键字,前端将通过正则表达式来高亮这些词
analyze_body = {
"analyzer": "ik_max_word",
"text": query
tokens = current_app.elasticsearch.indices.analyze(index=index, body=analyze_body)
highlights = '+'.join([item['token'] for item in tokens['tokens']])
# 匹配的记录, ES默认只返回查询结果的10条,指定 size 可以多条
search = current_app.elasticsearch.search(
index=index, doc_type=index,
body={
'query': {
'multi_match': {
'query': query,
'fields': ['*']
"size": 1000
ids = [str(hit['_id']) for hit in search['hits']['hits']]
scores = [hit['_score'] for hit in search['hits']['hits']]
return ids, scores, highlights
然后,再创建app/models/search.py,里面添加一个SearchableMixin类,以后哪个数据模型要实现全文检索功能的话,就继承这个类即可
from app.utils.elasticsearch import add_to_index, remove_from_index, query_index
class SearchableMixin(object):
@classmethod
def search(cls, expression, queryset_manager='objects'):
ids, scores, highlights = query_index(cls._meta['collection'], expression)
queryset = getattr(cls, queryset_manager) # 比如博客类,可以只返回 已发布 的博客
# 更新数据库,因为后续 queryset(id__in=ids) 会按定义的字段进行排序,不会按 ids或scores 的顺序
for id, score in zip(ids, scores):
queryset(id=id).update(es_score=score) # Atomic updates, 不要用 save() 会很慢,因为定义了 post_save() 信号
queryset(id=id).update(es_highlights=highlights)