python中elasticsearch_dsl模塊的使用方法
前言
elasticsearch-dsl是基于elasticsearch-py封裝實現(xiàn)的,提供了更簡便的操作elasticsearch的方法。
安裝:
install elasticsearch_dsl

連接elasticsearch
from elasticsearch_dsl import connections, Search es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20) print(es)
還可以通過alias給連接設置別名,后續(xù)可以通過別名來引用該連接,默認別名為default。
from elasticsearch_dsl import connections, Search # 方式一:連接es es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20) print(es) # 方式二:連接es connections.create_connection(alias="my_new_connection", hosts=["127.0.0.1:9200"], timeout=20)
elasticsearch_dsl.Search
search對象代表整個搜索請求,包括:queries、filters、aggregations、sort、pagination、additional parameters、associated client。
API被設置為可鏈接的即和用.連續(xù)操作。search對象是不可變的,除了聚合,對對象的所有更改都將導致創(chuàng)建包含該更改的淺表副本。
當初始化Search對象時,傳遞elasticsearch客戶端作為using的參數(shù)
示例代碼1:
from elasticsearch_dsl import connections, Search
# 方式一:連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
# 方式二:連接es
connections.create_connection(alias="my_new_connection", hosts=["127.0.0.1:9200"], timeout=20)
# 不使用別名使用
res = Search(using=es).index("test_index").query()
# print(res)
for data in res:
print(data.to_dict())
print("*" * 100)
# 使用別名后這樣使用
res2 = Search(using="my_new_connection").index('test_index').query()
# print(e)
for data in res2:
print(data.to_dict())運行結果:

示例代碼2:
from elasticsearch_dsl import connections, Search
# 方式一:連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
# 不使用別名使用
res = Search(using=es).index("test_index").query()
# print(res)
for data in res:
print(data.to_dict())
print("*" * 100)
# 書寫方式一:按條件查詢數(shù)據
res2 = Search(using=es).index("test_index").query("match", name="張三") # 查詢時注意分詞器的使用
for data in res2:
print(data.to_dict())
print("*" * 100)
# 書寫方式二:按條件查詢數(shù)據
res3 = Search(using=es).index("test_index").query({"match": {"name": "張三"}})
for data in res3:
print(data.to_dict())運行結果:

在上述執(zhí)行execute方法將請求發(fā)送給elasticsearch:
response = res.execute()不需要執(zhí)行execute()方法,迭代后可以通過to_dict()方法將Search對象序列化為一個dict對象,這樣可以方便調試。
query方法
查詢,參數(shù)可以是Q對象,也可以是query模塊中的一些類,還可以是自已寫上如何查詢。
示例代碼1:
from elasticsearch_dsl import connections, Search, Q import time # 方式一:連接es es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20) # print(es) res = Search(using=es, index="test_index").query().query() # 當調用.query()方法多次時,內部會使用&操作符 print(res.to_dict())
運行結果:

filter方法
在過濾上下文中添加查詢,可以使用filter()函數(shù)來使之變的簡單。
示例代碼1:
from elasticsearch_dsl import connections, Search, Q
# 方式一:連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
# res = Search(using=es).index("test_index").filter({"match": {"name": "北"}})
# res = Search(using=es).index("test_index").filter("terms", tags=["name", "id"])
res = Search(using=es).index("test_index").query("bool", filter=[
Q("terms", tags=["name", "id"])]) # 上面代碼在背后會產生一個bool查詢,并將指定的條件查詢放入到filter分支
print(res)
for data in res:
print(data.to_dict())示例代碼2:
from elasticsearch_dsl import connections, Search, Q
import time
# 方式一:連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
# 范圍查詢
# res = Search(using=es, index="test_index").filter("range", timestamp={"gte": 0, "lt": time.time()}).query({"match": {"name": "北"}})
res = Search(using=es, index="test_index").filter("range", id={"gte": 1, "lte": 4}).query({"match": {"name": "北"}})
print(res)
for data in res:
print(data.to_dict())
# 普通過濾
res2 = Search(using=es, index="test_index").filter("terms", id=["2", "4"]).execute()
print(res2)
for data in res2:
print(data.to_dict())運行結果:

示例代碼3:
from elasticsearch_dsl import connections, Search, Q
# 方式一:連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
# 方式一
q = Q('range', age={"gte": 25, "lte": 27})
res = Search(using=es, index="account_info").query(q)
print(res.to_dict())
for data in res:
print(data.to_dict())
print("*" * 100)
# 方式二
q2 = Q('range', **{"age": {"gte": 25, "lte": 27}})
res2 = Search(using=es, index="account_info").query(q2)
print(res2.to_dict())
for data in res2:
print(data.to_dict())運行結果:

index方法
指定索引
usring方法
指定哪個elasticsearch
elasticsearch_dsl.query
該庫為所有的Elasticsearch查詢類型都提供了類。以關鍵字參數(shù)傳遞所有的參數(shù),最終會把參數(shù)序列化后傳遞給Elasticsearch,這意味著在原始查詢和它對應的dsl之間有這一個清理的一對一的映射。
示例代碼:
from elasticsearch_dsl import connections, Search, Q
from elasticsearch_dsl.query import MultiMatch, Match
# 方式一:連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
# 相對與{"multi_match": {"query": "ha", "fields": ["firstname", "lastname"]}}
m1 = MultiMatch(query="Ha", fields=["firstname", "lastname"])
res = Search(using=es, index="test_index").query(m1)
print(res)
for data in res:
print(data.to_dict())
# 相當于{"match": {"firstname": {"query": "Hughes"}}}
m2 = Match(firstname={"query": "Hughes"})
res = Search(using=es, index="test_index").query(m2)
print(res)
for data in res:
print(data.to_dict())elasticsearch_dsl.Q
使用快捷方式Q通過命名參數(shù)或者原始dict類型數(shù)據來構建一個查詢實例。Q的格式一般是Q("查詢類型", 字段="xxx")或Q("查詢類型", query="xxx", fields=["字段1", "字段2"])
示例代碼1:
from elasticsearch_dsl import connections, Search, Q
from elasticsearch_dsl.query import MultiMatch, Match
# 方式一:連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
# q = Q("match", city="Summerfield")
q = Q("multi_match", query="Summerfield", fields=["city", "firstname"])
res = Search(using=es, index="test_index").query(q)
print(res)
for data in res:
print(data.to_dict())查詢對象可以通過邏輯運算符組合起來:
Q("match", title="python") | Q("match", title="django")
# {"bool": {"should": [...]}}
Q("match", title="python") & Q("match", title="django")
# {"bool": {"must": [...]}}
~Q("match", title="python")
# {"bool": {"must_not": [...]}}示例代碼2:
from elasticsearch_dsl import connections, Search, Q
# 方式一:連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
# q = Q("multi_match", query="123.244.101.255", fields=["clientip", "timestamp"])
q = Q('match', name='張') | Q("match", name="北")
res = Search(using=es, index="test_index").query(q)
# print(res)
for data in res:
print(data.to_dict(), data.name)
print("*" * 100)
q = Q('match', name='張') & Q("match", name="北")
res = Search(using=es, index="test_index").query(q)
# print(res)
for data in res:
print(data.to_dict(), data.name)
print("*" * 100)
q = ~Q('match', name='張')
res = Search(using=es, index="test_index").query(q)
# print(res)
for data in res:
print(data.to_dict(), data.name)運行結果:

示例代碼3:
from elasticsearch_dsl import connections, Search, Q
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="account_info")
# constant_score內置屬性
q = Q({"constant_score": {"filter": {"term": {"age": 25}}}})
res = s.query(q).execute()
for hit in res:
print(hit.to_dict())
print("*" * 100)
q2 = Q("bool", must=[Q("match", address="山")], should=[Q("match", gender="男"), Q("match", emplyer="AAA")], minimum_should_match=1)
res2 = s.query(q2).execute()
for hit in res2:
print(hit.to_dict())運行結果:

嵌套類型
有時候需要引用一個在其他字段中的字段,例如多字段(title.keyword)或者在一個json文檔中的address.city。為了方便,Q允許你使用雙下劃線‘__’代替關鍵詞參數(shù)中的‘.’
示例代碼:
from elasticsearch_dsl import connections, Search, Q
# 方式一:連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
# res = Search(using=es, index="test_index").query("match", address__city="北京")
res = Search(using=es, index="test_index").filter("term", address__city="北京")
# print(res)
for data in res:
print(data.to_dict(), data.name)查詢
示例代碼:
from elasticsearch_dsl import Search
from elasticsearch import Elasticsearch
# 連接es
es = Elasticsearch(hosts=["127.0.0.1:9200"], sniffer_timeout=60, timeout=30)
# 獲取es中所有的索引
# 返回類型為字典,只返回索引名
index_name = es.cat.indices(format="json", h="index")
print(index_name)
# 查詢多個索引
es_multi_index = Search(using=es, index=["personal_info_5000000", "grade", "test_index"])
print(es_multi_index.execute())
# 查詢一個索引
es_one_index = Search(using=es, index="test_index")
print(es_one_index.execute())
print("*" * 100)
# 條件查詢1
es_search1 = es_one_index.filter("range", id={"gte": 1, "lt": 5})
print(es_search1.execute())
# 條件查詢2
es_search2 = es_one_index.filter("term", name="張")
print(es_search2.execute())
print("*" * 100)
# 結果轉換為字典
es_search3 = es_search2.to_dict()
print(es_search3)
es_search4 = es_search2.execute().to_dict()
print(es_search4)運行結果:

排序
示例代碼:
from elasticsearch_dsl import connections, Search, A
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="account_info")
res = s.query().sort('-age').execute()
# print(res)
for data in res:
print(data.to_dict())運行結果:

分頁
要指定from、size
示例代碼:
from elasticsearch_dsl import connections, Search, A
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="account_info")
res = s.query()[2: 5].execute() # {"from": 2, "size": 5}
# print(res)
for data in res:
print(data.to_dict())運行結果:

要訪問匹配的所有文檔,可以使用scan()函數(shù),scan()函數(shù)使用scan、scroll elasticsearch API,需要注意的是這種情況下結果是不會被排序的。
示例代碼:
from elasticsearch_dsl import connections, Search
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="account_info")
res = s.query()
# print(res)
for hit in res.scan():
print(hit.age, hit.address)運行結果:

聚合
使用A快捷方式來定義一個聚合。為了實現(xiàn)聚合嵌套,你可以使用.bucket()、.metirc()以及.pipeline()方法。
bucket 即為分組,其中第一個參數(shù)是分組的名字,自己指定即可,第二個參數(shù)是方法,第三個是指定的field。
metric 也是同樣,metric的方法有sum、avg、max、min等等,但是需要指出的是有兩個方法可以一次性返回這些值,stats和extended_stats,后者還可以返回方差等值。
示例代碼1:
from elasticsearch_dsl import connections, Search, A
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="account_info")
a = A("terms", field="gender")
s.aggs.bucket("gender_terms", a)
res = s.execute()
# print(res)
for hit in res.aggregations.gender_terms:
print(hit.to_dict())運行結果:

示例代碼2:
from elasticsearch_dsl import connections, Search, A
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="account_info")
s.aggs.bucket("per_gender", "terms", field="gender")
s.aggs["per_gender"].metric("sum_age", "sum", field="age")
s.aggs["per_gender"].bucket("terms_balance", "terms", field="balance")
res = s.execute()
# print(res)
for hit in res.aggregations.per_gender:
print(hit.to_dict())運行結果:


示例代碼3:
from elasticsearch_dsl import connections, Search, Q
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="account_info")
res = s.aggs.bucket("aaa", "terms", field="gender").metric("avg_age", "avg", field="age")
print(res.to_dict())運行結果:

示例代碼4: 【聚合,內置排序】
from elasticsearch_dsl import connections, Search, Q
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
"""
{
'terms': {
'field': 'age',
'order': {
'_count': 'desc'
}
}
}
"""
s = Search(using=es, index="account_info")
res = s.aggs.bucket("agg_age", "terms", field="age", order={"_count": "desc"})
print(res.to_dict())
response = s.execute()
for hit in response.aggregations.agg_age:
print(hit.to_dict())
"""
{
'terms': {
'field': 'age',
'order': {
'_count': 'asc'
}
},
'aggs': {
'avg_age': {
'avg': {
'field': 'age'
}
}
}
}
"""
s2 = Search(using=es, index="account_info")
res2 = s2.aggs.bucket("agg_age", "terms", field="age", order={"_count": "asc"}).metric("avg_age", "avg", field="age")
print(res2.to_dict())
response = s2.execute()
for hit in response.aggregations.agg_age:
print(hit.to_dict())運行結果:

示例代碼5:
from elasticsearch_dsl import connections, Search, Q
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
"""
{
'aggs': {
'avg_age': {
'avg': {
'field': 'age'
}
}
}
}
"""
s = Search(using=es, index="account_info").query("range", age={"gte": 28})
res = s.aggs.metric("avg_age", "avg", field="age")
print(res.to_dict())
response = s.execute()
print(response)
for hit in response:
print(hit.to_dict())運行結果:

高亮顯示
示例代碼:【目前似乎沒有效果,待驗證】
from elasticsearch_dsl import connections, Search, Q
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="test_index")
res = s.highlight("id").execute().to_dict()
print(res)運行結果:

source限制返回字段
示例代碼:
from elasticsearch_dsl import connections, Search, Q
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="account_info")
res = s.query().execute()
for hit in res:
print(hit.to_dict())
# 限制返回字段
s2 = Search(using=es, index="account_info")
res2 = s2.query().source(['account_number', 'address']).execute()
for hit in res2:
print(hit.to_dict())運行結果:

刪除
調用Search對象上的delete方法而不是execute來實現(xiàn)刪除匹配查詢的文檔
示例代碼:
from elasticsearch_dsl import connections, Search, Q
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="test_index")
res = s.query("match", name="張").delete()
print(res)運行結果:

案例分析
創(chuàng)建索引:
from elasticsearch_dsl import Search
from elasticsearch import Elasticsearch
# 連接es
es = Elasticsearch(hosts=["127.0.0.1:9200"], sniffer_timeout=60, timeout=30)
body = {
"mappings": {
"properties": {
"account_number": {
"type": "integer"
},
"balance": {
"type": "integer"
},
"firstname": {
"type": "text"
},
"lastname": {
"type": "text"
},
"age": {
"type": "integer"
},
"gender": {
"type": "keyword"
},
"address": {
"type": "text"
},
"employer": {
"type": "text"
},
"email": {
"type": "text"
},
"province": {
"type": "text"
},
"state": {
"type": "text"
}
}
}
}
# 創(chuàng)建 index
es.indices.create(index="account_info", body=body)查看索引:

使用kibana批量生成數(shù)據:
POST account_info/_bulk
{"index": {"_index":"account_info"}}
{"account_number":1,"balance":20,"firstname":"三","lastname":"張","age":25,"gender":"男","address":"北京朝陽","employer":"AAA","email":"123@qq.com","province":"北京","state":"正常"}
{"index": {"_index":"account_info"}}
{"account_number":2,"balance":70,"firstname":"二","lastname":"張","age":26,"gender":"男","address":"北京海淀","employer":"AAA","email":"123@qq.com","province":"北京","state":"正常"}
{"index": {"_index":"account_info"}}
{"account_number":3,"balance":80,"firstname":"四","lastname":"張","age":27,"gender":"女","address":"遼寧朝陽","employer":"BBB","email":"123@qq.com","province":"遼寧","state":"正常"}
{"index": {"_index":"account_info"}}
{"account_number":4,"balance":60,"firstname":"五","lastname":"張","age":28,"gender":"男","address":"山東青島","employer":"AAA","email":"123@qq.com","province":"山東","state":"正常"}
{"index": {"_index":"account_info"}}
{"account_number":5,"balance":40,"firstname":"六","lastname":"張","age":29,"gender":"女","address":"山東濟南","employer":"AAA","email":"123@qq.com","province":"山東","state":"正常"}
{"index": {"_index":"account_info"}}
{"account_number":6,"balance":50,"firstname":"七","lastname":"張","age":30,"gender":"男","address":"河北唐山","employer":"BBB","email":"123@qq.com","province":"河北","state":"正常"}
{"index": {"_index":"account_info"}}
{"account_number":7,"balance":30,"firstname":"一","lastname":"張","age":31,"gender":"女","address":"河北石家莊","employer":"AAA","email":"123@qq.com","province":"河北","state":"正常"}查看生成的數(shù)據:


根據條件查詢:
1.查詢balance在40~70的信息
from elasticsearch_dsl import connections, Search, Q
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="account_info")
# 查詢balance在40~70的信息
q = Q("range", balance={"gte": 40, "lte": 70})
res = s.query(q)
for data in res:
print(data.to_dict())
print("共查到%d條數(shù)據" % res.count())
2.查詢balance在40~70的男性信息
from elasticsearch_dsl import connections, Search, Q
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="account_info")
# 查詢balance在40~70的信息
q1 = Q("range", balance={"gte": 40, "lte": 70})
# 男性
q2 = Q("term", gender="男")
# and
q = q1 & q2
res = s.query(q)
for data in res:
print(data.to_dict())
print("共查到%d條數(shù)據" % res.count())
3.省份為北京、25或30歲的男性信息
from elasticsearch_dsl import connections, Search, Q
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="account_info")
# 方式一:
# 省份為北京
q1 = Q("match", province="北京")
# 25或30歲的男性信息
q2 = Q("bool", must=[Q("terms", age=[25, 30]), Q("term", gender="男")])
# and
q = q1 & q2
res = s.query(q)
for data in res:
print(data.to_dict())
print("共查到%d條數(shù)據" % res.count())
print("*" * 100)
# 方式二
# 省份為北京
q1 = Q("match", province="北京")
# 25或30歲的信息
# q2 = Q("bool", must=[Q("terms", age=[25, 30]), Q("term", gender="男")])
q2 = Q("term", age=25) | Q("term", age=30)
# 男性
q3 = Q("term", gender="男")
res = s.query(q1).query(q2).query(q3) # 多次query就是& ==> and 操作
for data in res:
print(data.to_dict())
print("共查到%d條數(shù)據" % res.count())
4.地址中有“山”字,年齡不在25~28歲的女性信息
from elasticsearch_dsl import connections, Search, Q
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="account_info")
# 地址中有“山”字且為女性
q1 = Q("match", address="山") & Q("match", gender="女")
# 年齡在25~28歲
q2 = ~Q("range", age={"gte": 25, "lte": 28})
# 使用filter過濾
# query和filter的前后關系都行
res = s.filter(q2).query(q1)
for data in res:
print(data.to_dict())
print("共查到%d條數(shù)據" % res.count())
5.根據年齡進行聚合,然后計算每個年齡的評價balance數(shù)值
示例代碼:
from elasticsearch_dsl import connections, Search, A
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="account_info")
# 先用年齡聚合,然后拿到返平均數(shù)
# size指定最大返回多少條數(shù)據,默認10條
# 實質上account的數(shù)據中,age分組沒有100個這么多
q = A("terms", field="age", size=100).metric("age_per_balance", "avg", field="balance")
s.aggs.bucket("res", q)
# 執(zhí)行并拿到返回值
response = s.execute()
# res是bucket指定的名字
# response.aggregations.to_dict是一個{'key': 25, 'doc_count': 1, 'age_per_balance': {'value': 20.0}}的數(shù)據,和用restful查詢的一樣
for data in response.aggregations.res:
print(data.to_dict())運行結果:

6.根據年齡聚合,求25~28歲不同性別的balance值。
示例代碼:
from elasticsearch_dsl import connections, Search, A
# 連接es
es = connections.create_connection(hosts=["127.0.0.1:9200"], timeout=20)
# print(es)
s = Search(using=es, index="account_info")
# 這次就用這種方法
# range 要注意指定ranges參數(shù)和from to
a1 = A("range", field="age", ranges={"from": 25, "to": 28})
a2 = A("terms", field="gender")
a3 = A("avg", field="balance")
s.aggs.bucket("res", a1).bucket("gender_group", a2).metric("balance_avg", a3)
# 執(zhí)行并拿到返回值
response = s.execute()
# res是bucket指定的名字
for data in response.aggregations.res:
print(data.to_dict())運行結果: 【注意:不包含年齡28的值】

總結:
假如是數(shù)組,如:bool的must、terms,那么就要字段=[ ]假如是字典,如:range,那么就要字段={xxx: yyy, .... }假如是單值,如:term、match,那么就要字段=值假如查的是多個字段,如:multi_mathc,那么就要加上query="要查的值", fields=["字段1", "字段2", ...]然后各個條件的邏輯關系,可以通過多次query和filter或直接用Q("bool", must=[Q...], should=[Q...])再加上& | ~表示
到此這篇關于python中elasticsearch_dsl模塊的使用方法的文章就介紹到這了,更多相關python elasticsearch_dsl模塊內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
基于Python實現(xiàn)報表自動化并發(fā)送到郵箱
作為數(shù)據分析師,我們需要經常制作統(tǒng)計分析圖表。但是報表太多的時候往往需要花費我們大部分時間去制作報表。本文將利用Python實現(xiàn)報表自動化并發(fā)送到郵箱,需要的可以參考一下2022-07-07
python 讀取.csv文件數(shù)據到數(shù)組(矩陣)的實例講解
今天小編就為大家分享一篇python 讀取.csv文件數(shù)據到數(shù)組(矩陣)的實例講解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-06-06

