这两天了解了一下Flask和Elasticsearch的基本用法,就想着去把两者给结合起来,简单尝试搞了一个小的demo。
安装配置
安装主要是注意配置的修改,特别是es和kibana的连接等等。
- MySql
这个感觉没啥可说的,主要是注意表结构以及数据的生成,后面有提供语句。
- 创建网络
docker network create es
- Elasticsearch7.17.5
docker run --name search -it -d -v /etc/elasticsearch:/usr/share/elasticsearch/config -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" --net es docker.elastic.co/elasticsearch/elasticsearch:7.17.5
- kibana:7.17.5
注意和上面版本号对应,这个可视化工具可以不安装。
docker run --name kb -it -d -v /etc/kibana/config:/usr/share/kibana/config --net es -p 5601:5601 docker.elastic.co/kibana/kibana:7.17.5
创建项目
数据生成
- 创建表结构
create table test.students
(
id bigint auto_increment primary key,
sname varchar(20) not null,
sgender tinyint(1) not null,
age int not null,
scontend varchar(20) not null,
isDelete tinyint(1) not null,
lastTime datetime(6) not null,
createTime datetime(6) not null,
sgrade_id bigint not null,
);
- 导入数据
INSERT INTO test.students (id, sname, sgender, age, scontend, isDelete, lastTime, createTime, sgrade_id) VALUES (1, '薛', 0, 20, '我叫薛', 0, '2022-05-28 12:38:44.488083', '2022-05-28 20:35:20', 4);
INSERT INTO test.students (id, sname, sgender, age, scontend, isDelete, lastTime, createTime, sgrade_id) VALUES (2, '陆', 1, 30, '我叫陆', 0, '2017-12-01 00:00:00', '2022-05-28 20:35:20', 4)
INSERT INTO test.students (id, sname, sgender, age, scontend, isDelete, lastTime, createTime, sgrade_id) VALUES (3, '范', 1, 40, '我叫范', 0, '2017-12-01 00:00:00', '2022-05-28 20:35:20', 4);
INSERT INTO test.students (id, sname, sgender, age, scontend, isDelete, lastTime, createTime, sgrade_id) VALUES (4, '向', 0, 18, '我叫向', 0, '2017-12-01 00:00:00', '2022-05-28 20:35:20', 4);
INSERT INTO test.students (id, sname, sgender, age, scontend, isDelete, lastTime, createTime, sgrade_id) VALUES (5, '阳', 0, 16, '我叫阳', 0, '2017-12-01 00:00:00', '2022-05-28 20:35:20', 4);
INSERT INTO test.students (id, sname, sgender, age, scontend, isDelete, lastTime, createTime, sgrade_id) VALUES (6, '孔', 1, 50, '我叫孔', 0, '2017-12-01 00:00:00', '2022-05-28 20:35:20', 4);
INSERT INTO test.students (id, sname, sgender, age, scontend, isDelete, lastTime, createTime, sgrade_id) VALUES (7, '孟', 1, 60, '我叫孟', 1, '2017-12-01 00:00:00', '2022-05-28 20:35:20', 3);
INSERT INTO test.students (id, sname, sgender, age, scontend, isDelete, lastTime, createTime, sgrade_id) VALUES (8, '孙', 0, 70, '我叫', 1, '2017-12-01 00:00:00', '2022-05-28 20:35:20', 3);
INSERT INTO test.students (id, sname, sgender, age, scontend, isDelete, lastTime, createTime, sgrade_id) VALUES (9, '郝', 1, 33, '我叫郝', 0, '2017-12-01 00:00:00', '2022-05-28 20:35:20', 3);
INSERT INTO test.students (id, sname, sgender, age, scontend, isDelete, lastTime, createTime, sgrade_id) VALUES (10, '夏', 1, 34, '我叫夏', 0, '2017-12-01 00:00:00', '2022-05-28 20:35:20', 3);
创建Flask项目
- requirements.txt
certifi==2022.6.15
click==8.1.3
colorama==0.4.5
elastic-transport==8.1.2
elasticsearch==8.3.1
Flask==2.1.3
importlib-metadata==4.12.0
itsdangerous==2.1.2
Jinja2==3.1.2
MarkupSafe==2.1.1
pip==21.3.1
PyMySQL==1.0.2
setuptools==60.2.0
typing-extensions==4.3.0
urllib3==1.26.10
Werkzeug==2.1.2
wheel==0.37.1
zipp==3.8.1
es 配置
- config.py 配置信息
# database
mysql_host = "localhost"
mysql_port = 3306
mysql_user = "YourUser"
mysql_password = "YourPassword"
mysql_db = "test"
mysql_charset = "utf8"
# elasticsearch
es_host = "http://127.0.0.1:9200"
- DataDeal.py 用于把Mysql中的数据导入ES
import pymysql
from elasticsearch import Elasticsearch
from config import *
def get_data():
conn = pymysql.connect(host=mysql_host, port=mysql_port, user=mysql_user, password=mysql_password, database=mysql_db)
cursor = conn.cursor()
sql = "select * from students"
cursor.execute(sql)
row_names = [x[0] for x in cursor.description]
print(row_names,end="\n\n")
results = cursor.fetchall()
conn.close()
return row_names,results
def create_es_data():
es = Elasticsearch(hosts=es_host)
try:
row_names,results = get_data()
for row_name in row_names:
print(row_name)
for row in results:
print(row)
message = {
"id": row[0],
"name": row[1],
"gender":row[2],
"age":row[3],
"scontent":row[4],
"isDelete":row[5],
"lasttime":row[6],
"createtime":row[7],
"grade":row[8]
}
# 建立索引
es.index(index="students",document=message)
except Exception as e:
print("Error:" + str(e))
if __name__ == "__main__":
create_es_data()
- ESDataQuery.py
from elasticsearch import Elasticsearch
from es.config import *
class elasticsearch():
def __init__(self,index_name):
self.es=Elasticsearch(hosts=es_host)
self.index_name=index_name
def search(self,query,count:int=30):
ds={
"query":{
"match_phrase":{
"name":query
}
}
}
match_data=self.es.search(index=self.index_name,body=ds,size=count)
return match_data
Flask 配置
- app.py
import json
from flask import *
from markupsafe import escape
from es.EsDataQuery import elasticsearch
app = Flask(__name__)
@app.route("/get_es/<query>")
def get_es(query):
print("input_query",query)
es=elasticsearch(index_name="students")
data=es.search(query)
print(data)
address_data=data["hits"]["hits"]
print(address_data)
address_list=[]
for item in address_data:
address_list.append(item["_source"])
new_json=json.dumps(address_list,ensure_ascii=False)
return app.response_class(new_json,content_type="application/json")
Test
http://127.0.0.1:5000/get_es/孙