Rust整合Elasticsearch的詳細過程(收藏)
全文搜索Elasticsearch是什么
Lucene:Java實現(xiàn)的搜索引擎類庫
- 易擴展
- 高性能
- 僅限Java開發(fā)
- 不支持水平擴展
Elasticsearch:基于Lucene開發(fā)的分布式搜索和分析引擎
- 支持分布式、水平擴展
- 提高RestfulAPI,可被任何語言調用
Elastic Stack是什么
ELK(Elastic Stack):Elasticsearch結合Kibana、Logstash、Beats實現(xiàn)日志數(shù)據(jù)分析、實時監(jiān)控
Elasticsearch:負責存儲、搜索、分析數(shù)據(jù)Kibana:數(shù)據(jù)可視化Logstash、Beats:數(shù)據(jù)抓?。ㄒ话阌肈ebezium、Flink、RisingWave…)
Elasticsearch能做什么
實時數(shù)據(jù)分析:支持對實時數(shù)據(jù)進行索引和分析,可快速處理大量的日志、指標和事件數(shù)據(jù)
實時監(jiān)控:對系統(tǒng)指標、業(yè)務數(shù)據(jù)和用戶行為進行實時監(jiān)控
電商搜索:為電商平臺提供商品搜索功能,幫助用戶快速找到所需的商品
知識庫搜索:為企業(yè)內部的文檔、知識庫和業(yè)務數(shù)據(jù)提供搜索功能,提高員工的工作效率
Elasticsearch 索引
傳統(tǒng)數(shù)據(jù)庫使用正向索引,依據(jù)id構建B+樹,根據(jù)索引id查快,對于非索引文檔如商品描述查需要全表掃描
倒排索引:將文檔分為詞條和id進行存儲,先查文檔獲取id,再根據(jù)id查數(shù)據(jù)庫
- 文檔(Document):每條數(shù)據(jù)就是一個Json文檔
- 詞條(Term):文檔按語義分成的詞語
索引(Index):相同類型文檔的集合
映射(Mapping):索引中的文檔約束信息
字段(Fielf):Json文檔中的字段
DSL:Json風格的請求語句,用來實現(xiàn)CRUD
Docker安裝Elasticsearch、Kibana、IK
1、先創(chuàng)建自定義網(wǎng)絡
使用默認
bridge只能通過ip通信,這里加入了自定義網(wǎng)絡,自定義網(wǎng)絡可以自動解析容器名
- docker network ls查看已有網(wǎng)絡
- 創(chuàng)建自定義網(wǎng)絡docker network create pub-network
- 手動連接網(wǎng)絡docker network connect pub-network container_name_or_id
- 刪除網(wǎng)絡docker network rm network_name_or_idid
2、創(chuàng)建文件夾
mkdir -p /opt/es/data mkdir -p /opt/es/plugins mkdir -p /opt/es/logs
3、授權
chmod -R 777 /opt/es/data chmod -R 777 /opt/es/logs
安裝IK分詞器
由于ES對中文分詞無法理解語義,需要IK插件
https://release.infinilabs.com/analysis-ik/stable/
Elasticsearch、Kibana、IK所有版本保持一致,解壓后使用shell工具將整個文件夾上傳到/opt/es/plugins
離線部署Elasticsearch、Kibana
在能訪問的地方拉取鏡像
docker pull elasticsearch:8.15.2 docker pull kibana:8.15.2
這里使用wsl,wsl進入wsl,然后進入win的D盤
cd /mnt/d
打包鏡像,這個文件可以在win D盤找到
docker save elasticsearch:8.15.2 > elasticsearch.tar docker save kibana:8.15.2 > kibana.tar
使用shell工具如Windterm上傳文件
加載鏡像
docker load -i elasticsearch.tar docker load -i kibana.tar
查看鏡像
docker images
然后命令部署或者docker-compose部署即可
命令部署Elasticsearch、Kibana
部署Elasticsearch
docker run -d \ --name es \ --network pub-network \ --restart always \ -p 9200:9200 \ -p 9300:9300 \ -e "xpack.security.enabled=false" \ -e "discovery.type=single-node" \ -e "http.cors.enabled=true" \ -e "http.cors.allow-origin:*" \ -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \ -v /opt/es/data:/usr/share/elasticsearch/data \ -v /opt/es/plugins:/usr/share/elasticsearch/plugins \ -v /opt/es/logs:/usr/share/elasticsearch/logs \ --privileged=true \ elasticsearch:8.15.2
xpack.security.enabled=false禁用密碼登錄
如果要使用token: -e "xpack.security.enrollment.enabled=true" \
docker部署一般用于開發(fā),不要為難自己,使用token會有很多問題,生產環(huán)境再開,使用SSl需要證書
部署Kibana
docker run -d \ --name kibana \ --network pub-network \ --restart always \ -p 5601:5601 \ -e CSP_STRICT=false \ -e I18N_LOCALE=zh-CN \ kibana:8.15.2
報錯kibana 服務器尚未準備就緒,是因為配置了ELASTICSEARCH_HOSTS
docker-compose部署Elasticsearch、Kibana
es:
image: elasticsearch:8.15.2
container_name: es
network_mode: pub-network
restart: always
ports:
# 9200:對外暴露的端口
- 9200:9200
# 9300:節(jié)點間通信端口
- 9300:9300
environment:
# 禁用密碼登錄
xpack.security.enabled: 'false'
# 單節(jié)點運行
discovery.type: single-node
# 允許跨域
http.cors.enabled: 'true'
# 允許所有訪問
http.cors.allow-origin: '*'
# 堆內存大小
ES_JAVA_OPTS: '-Xms512m -Xmx512m'
volumes:
# 數(shù)據(jù)掛載
- /opt/es/data:/usr/share/elasticsearch/data
# 插件掛載
- /opt/es/plugins:/usr/share/elasticsearch/plugins
# 日志掛載
- /opt/es/logs:/usr/share/elasticsearch/logs
# 允許root用戶運行
privileged: true
kibana:
image: kibana:8.15.2
container_name: kibana
network_mode: pub-network
restart: always
ports:
- 5601:5601
environment:
# 禁用安全檢查
CSP_STRICT: 'false'
# 設置中文
I18N_LOCALE: zh-CN
networks:
pub-network:
name: pub-network部署
docker-compose up -d
刪除Elasticsearch、Kibana
docker rm -f es docker rm -f kibana
開啟安全配置(可選,如果要用密碼和token)
es8開始需要密碼訪問,kibana通過token訪問
# 生成密碼 docker exec -it es /usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic # 生成kibana訪問token docker exec -it es /usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token -s kibana
訪問Elasticsearch、Kibana
Elasticsearch:127.0.0.1:9200,看到以下界面就部署成功了

Kibana:127.0.0.1:5601看到以下界面就部署成功了
訪問:http://127.0.0.1:9200/.kibana跨域查看有沒有發(fā)現(xiàn)可視化工具kibana

我們選擇手動配置,使用http://es:9200,我們沒有配置ssl只能用http,容器名為es

在終端運行命令查看日志中的驗證碼
docker logs kibana

使用

GET /_analyze
{
"analyzer": "ik_max_word",
"text": "好好學習天天向上"
}如果一個字為一個詞條,就說明分詞插件IK沒裝好,重新安裝后重啟容器docker restart es

分詞原理
依據(jù)字典進行分詞
對于一些新詞語,如鋁合金鍵盤被稱為“鋁坨坨”,詞典中沒有這個詞語,會將其逐字分詞

分詞流程
- 1、
character filters:字符過濾器,進行原始處理,如轉換編碼、去停用詞、轉小寫 - 2、
tokenizer:分詞器,將文本流進行分詞為詞條 - 3、
tokenizer filter:將詞條進行進一步處理,如同義詞處理、拼音處理
擴展詞庫
在IK插件config/IKAnalyzer.cfg.xml中添加
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer 擴展配置</comment> <!--用戶可以在這里配置自己的擴展字典 --> <entry key="ext_dict">ext.dic</entry> <!--用戶可以在這里配置自己的擴展停止詞字典--> <entry key="ext_stopwords">stopword.dic</entry> <!--用戶可以在這里配置遠程擴展字典 --> <!-- <entry key="remote_ext_dict">words_location</entry> --> <!--用戶可以在這里配置遠程擴展停止詞字典--> <!-- <entry key="remote_ext_stopwords">words_location</entry> --> </properties>
停用詞庫
例如敏感詞
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer 擴展配置</comment> <!--用戶可以在這里配置自己的擴展字典 --> <entry key="ext_stopwords">stopword.dic</entry> </properties>
使用
生產使用可以用AI、ELP進行分詞
修改配置,添加擴展詞庫和停用詞庫
vim /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/IKAnalyzer.cfg.xml
這里新建一個詞庫
touch /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/ext.dic
編輯擴展詞庫
vim /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/ext.dic
添加分詞
鋁坨坨
編輯停用詞庫
vim /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/stopword.dic
添加
的
重啟ES
docker restart es
測試分詞
GET /_analyze{ "analyzer": "ik_max_word", "text": "重重的鋁坨坨"}可以看到擴展詞庫的“鋁坨坨”被分詞識別出來了,“的”沒有被分詞

分詞作用
- 創(chuàng)建倒排索引時對文檔分詞
- 用戶搜索時對輸入的內容分詞
IK分詞模式
- ik_smart:智能切分,粗粒度
- ik_max_word:最細切分,細粒度
DSL 索引操作
- 僅允許GET, PUT, DELETE, HEAD
- mapping:對索引庫中文檔的約束,常見的屬性有
- type:字段數(shù)據(jù)類型
- 字符串:text(可分詞的文本)、keyword(不分詞的精確值,合在一起有意義的詞,如國家、品牌)
- 數(shù)值:long、integer、short、byte、double、float
- 布爾:boolean
- 日期:date
- 對象:object
- index:是否創(chuàng)建倒排索引,默認true
- analyzer:使用哪種分詞器
- properties:字段的子字段
- type:字段數(shù)據(jù)類型
添加索引庫,每次寫入操作版本都會+1,如添加(POST)、更新(PUT)
索引庫mgr
PUT /mgr
{
"mappings": {
"properties": {
"info": {
"type": "text",
"analyzer": "ik_smart"
},
"email": {
"type": "keyword",
"index": false
},
"name": {
"type": "object",
"properties": {
"firstName": {
"type": "keyword"
},
"lastName": {
"type": "keyword"
}
}
}
}
}
}查詢索引庫
GET /mgr
更新索引庫(索引庫禁止修改,因為索引庫建立倒排索引后無法修改,只能添加新字段)
PUT /mgr/_mapping
{
"properties":{
"age":{
"type":"integer"
}
}
}刪除索引庫
DELETE /mgr
DSL文檔操作
添加文檔
索引庫mgr/文檔/文檔id
POST /mgr/_doc/1
{
"info": "鋁坨坨鍵盤",
"email": "11111@gmail.com",
"name": {
"firstName": "C",
"lastName": "I"
}
}查詢文檔
GET /mgr/_doc/1
更新文檔
全量更新,刪除舊文檔,添加新文檔
如果文檔id不存在則與添加文檔功能相同
PUT /mgr/_doc/1
{
"info": "鋁坨坨鍵盤",
"email": "222@gmail.com",
"name": {
"firstName": "C",
"lastName": "I"
}
}增量更新(局部更新)
指定
_update,指定文檔doc
POST /mgr/_update/1{ "doc": { "email": "333@gmail.com" }}刪除文檔
DELETE /mgr/_doc/1
Rust客戶端操作Elasticsearch
添加Cargo.toml
elasticsearch = "8.15.0-alpha.1"
# 序列化和反序列化數(shù)據(jù)
serde = { version = "1.0.127", features = ["derive"] }
# 序列化JSON
serde_json = "1.0.128"
tokio = { version = "1", features = ["full"] }
# 異步鎖
once_cell = "1.20.2"添加環(huán)境變量.env
# 指定當前配置文件 RUN_MODE=development
添加配置settings\development.toml
debug = true # 指定開發(fā)環(huán)境配置 profile = "development" [es] host = "127.0.0.1"
獲取配置config\es.rs
use serde::Deserialize;
#[derive(Debug, Deserialize, Clone)]
pub struct EsConfig {
host: String,
port: u16,
}
impl EsConfig {
// 獲取redis連接地址
pub fn get_url(&self) -> String {
format!("http://{host}:{port}", host = self.host, port = self.port)
}
}將配置存放到AppConfig
#[derive(Debug, Deserialize, Clone)]
pub struct AppConfig {
pub es:EsConfig,
}
impl AppConfig {
pub fn read(env_src: Environment) -> Result<Self, config::ConfigError> {
// 獲取配置文件目錄
let config_dir = get_settings_dir()?;
info!("config_dir: {:#?}", config_dir);
// 獲取配置文件環(huán)境
let run_mode = std::env::var("RUN_MODE")
.map(|env| Profile::from_str(&env).map_err(|e| ConfigError::Message(e.to_string())))
.unwrap_or_else(|_e| Ok(Profile::Dev))?;
// 當前配置文件名
let profile_filename = format!("{run_mode}.toml");
// 獲取配置
let config = config::Config::builder()
// 添加默認配置
.add_source(config::File::from(config_dir.join("default.toml")))
// 添加自定義前綴配置
.add_source(config::File::from(config_dir.join(profile_filename)))
// 添加環(huán)境變量
.add_source(env_src)
.build()?;
info!("Successfully read config profile: {run_mode}.");
// 反序列化
config.try_deserialize()
}
}
// 獲取配置文件目錄
pub fn get_settings_dir() -> Result<std::path::PathBuf, ConfigError> {
Ok(get_project_root()
.map_err(|e| ConfigError::Message(e.to_string()))?
.join("settings"))
}
#[cfg(test)]
mod tests {
use crate::config::profile::Profile;
use self::env::get_env_source;
pub use super::*;
#[test]
pub fn test_profile_to_string() {
// 設置dev模式
let profile: Profile = Profile::try_from("development").unwrap();
println!("profile: {:#?}", profile);
assert_eq!(profile, Profile::Dev)
}
#[test]
pub fn test_read_app_config_prefix() {
// 讀取配置
let config = AppConfig::read(get_env_source("APP")).unwrap();
println!("config: {:#?}", config);
}
}將配置存放到全局constant\mod.rs
// 環(huán)境變量前綴
pub const ENV_PREFIX: &str = "APP";
// 配置
pub static CONFIG: Lazy<crate::config::AppConfig> = Lazy::new(||
crate::config::AppConfig::read(get_env_source(ENV_PREFIX)).unwrap()
);加載配置文件client\builder.rs
use crate::config::AppConfig;
// 傳輸配置文件到客戶端
pub trait ClientBuilder: Sized {
fn build_from_config(config: &AppConfig) -> Result<Self,InfraError>;
}Es客戶端client\es.rs
InfraError為自定義錯誤,請修改為你想要的錯誤,如標準庫錯誤
// 類型別名
pub type EsClient = Arc<Elasticsearch>;
// 加載配置文件
pub trait EsClientExt: Sized {
fn build_from_config(config: &AppConfig) -> impl Future<Output = Result<Self, InfraError>>;
}
impl EsClientExt for EsClient {
async fn build_from_config(config: &AppConfig) -> Result<Self, InfraError> {
// 1、使用single_node方式創(chuàng)建client
// let transport = Transport::single_node(&config.es.get_url()).unwrap();
// let client = Elasticsearch::new(transport);
// Ok(Arc::new(client))
// 2、使用builder方式創(chuàng)建client,可以添加多個url
let url = config.es.get_url();
let url_parsed = url
.parse::<elasticsearch::http::Url>()
.map_err(|_| InfraError::OtherError("url err".to_string()))?;
let conn_pool = SingleNodeConnectionPool::new(url_parsed);
let transport = TransportBuilder::new(conn_pool)
.disable_proxy()
.build()
.map_err(|_| InfraError::OtherError("transport err".to_string()))?;
let client = Elasticsearch::new(transport);
Ok(Arc::new(client))
}
}測試client\es.rs,所有請求在body()中定義DSL語句,通過send()發(fā)送
#[cfg(test)]
mod tests {
use elasticsearch::{ cat::CatIndicesParts, DeleteParts, IndexParts, UpdateParts };
use serde_json::json;
use super::*;
use crate::constant::CONFIG;
#[tokio::test]
async fn test_add_document() {
let client_result = EsClient::build_from_config(&CONFIG).await;
assert!(client_result.is_ok());
let client = client_result.unwrap();
let response = client
.index(IndexParts::IndexId("mgr", "1"))
.body(
json!({
"id": 1,
"user": "cci",
"post_date": "2024-01-15T00:00:00Z",
"message": "Trying out Elasticsearch, so far so good?"
})
)
.send().await;
assert!(response.is_ok());
let response = response.unwrap();
assert!(response.status_code().is_success());
}
#[tokio::test]
async fn test_get_indices() {
let client_result = EsClient::build_from_config(&CONFIG).await;
assert!(client_result.is_ok());
let client = client_result.unwrap();
let get_index_response = client
.cat()
.indices(CatIndicesParts::Index(&["*"]))
.send().await;
assert!(get_index_response.is_ok());
}
#[tokio::test]
async fn test_update_document() {
let client_result = EsClient::build_from_config(&CONFIG).await;
assert!(client_result.is_ok());
let client = client_result.unwrap();
let update_response = client
.update(UpdateParts::IndexId("mgr", "1"))
.body(
json!({
"doc": {
"message": "Updated message"
}
})
)
.send().await;
assert!(update_response.is_ok());
let update_response = update_response.unwrap();
assert!(update_response.status_code().is_success());
}
#[tokio::test]
async fn test_delete_document() {
let client_result = EsClient::build_from_config(&CONFIG).await;
assert!(client_result.is_ok());
let client = client_result.unwrap();
let delete_response = client.delete(DeleteParts::IndexId("mgr", "1")).send().await;
assert!(delete_response.is_ok());
let delete_response = delete_response.unwrap();
assert!(delete_response.status_code().is_success());
}
}使用流程
// 1、創(chuàng)建client
let client_result = EsClient::build_from_config(&CONFIG).await;
assert!(client_result.is_ok());
let client = client_result.unwrap();
// 2、定義DSL語句
let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);
// 添加文檔
body.push(json!({"index": {"_id": "1"}}).into());
body.push(
json!({
"id": 1,
"user": "kimchy",
"post_date": "2009-11-15T00:00:00Z",
"message": "Trying out Elasticsearch, so far so good?"
}).into()
);
// 添加文檔
body.push(json!({"index": {"_id": "2"}}).into());
body.push(
json!({
"id": 2,
"user": "forloop",
"post_date": "2020-01-08T00:00:00Z",
"message": "Bulk indexing with the rust client, yeah!"
}).into()
);
// 3、發(fā)送請求
let response = client.bulk(BulkParts::Index("mgr")).body(body).send().await.unwrap();項目地址:https://github.com/VCCICCV/MGR
分析數(shù)據(jù)結構
mapping要考慮的問題:字段名、數(shù)據(jù)類型、是否參與搜索(建立倒排索引"index":false,默認true)、是否分詞(參與搜索的字段,text分詞,keyword、數(shù)據(jù)類型不分詞)、分詞器
- 地理坐標:
- geo_point:由經度(longitude)和緯度(latitude)確定的一個點,如
[ 13.400544, 52.530286 ] - geo_shape:由多個
geo_point組成的幾何圖形,如一條線[[13.0, 53.0], [14.0, 52.0]]
- geo_point:由經度(longitude)和緯度(latitude)確定的一個點,如
copy_to:將多個字段組合為一個字段進行索引 Rust客戶端操作索引庫
生產環(huán)境不要使用
unwrap()
這里演示在請求正文中操作,使用send()
Transport支持的方法Method:
Get:獲取資源Put:創(chuàng)建或更新資源(全量更新)Post:創(chuàng)建或更新資源(部分更新)Delete:刪除資源Head:獲取頭信息
send()請求正文需要包含的參數(shù):
method:必須path:必須headers:必須query_string:可選body:可選timeout:可選
添加索引庫
#[tokio::test]
async fn test_create_index() {
// 1、創(chuàng)建client
let client_result = EsClient::build_from_config(&CONFIG).await;
assert!(client_result.is_ok());
let client = client_result.unwrap();
// 2、定義DSL語句
let index_name = "mgr";
let index_definition =
json!({
"mappings":{
"properties":{
"age":{
"type":"integer"
}
}
}
});
let body = Some(serde_json::to_vec(&index_definition).unwrap());
let path = format!("/{}", index_name);
let headers = HeaderMap::new();
let query_string = None;
let timeout = None;
let method = Method::Put;
// 3、發(fā)送請求
let response = client.send::<Vec<u8>, ()>(
method,
&path,
headers,
query_string,
body,
timeout
).await;
assert!(response.is_ok());
let response = response.unwrap();
assert_eq!(response.status_code().is_success(), true);
}你也可以將其簡化
#[tokio::test]
async fn test_create_index() {
// 1、創(chuàng)建client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、定義DSL
let index_definition =
json!({
"mappings":{
"properties":{
"age":{
"type":"integer"
}
}
}
});
// 3、發(fā)送請求
let response = client.send::<Vec<u8>, ()>(
Method::Put,
format!("/mgr").as_str(),
HeaderMap::new(),
None,
Some(index_definition.to_string().as_bytes().to_vec()),
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
assert_eq!(response.status_code().is_success(), true);
}查詢索引庫是否存在
#[tokio::test]
async fn test_query_index() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、定義查詢 DSL 語句
let query = json!({
"query": {
"match_all": {}
}
});
// 3、發(fā)送請求
let response = client.send::<Vec<u8>, ()>(
Method::Get,
format!("/mgr/_search").as_str(),
HeaderMap::new(),
None,
Some(query.to_string().as_bytes().to_vec()),
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
println!("{:?}", response);
assert_eq!(response.status_code().is_success(), true);
}也可以不定義DSL查詢
#[tokio::test]
async fn test_query_index2() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、發(fā)送請求
let response = client.send::<Vec<u8>, ()>(
Method::Get,
format!("/mgr").as_str(),
HeaderMap::new(),
None,
None,
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
println!("{:?}", response);
assert_eq!(response.status_code().is_success(), true);
}更新索引庫
#[tokio::test]
async fn test_update_index() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、定義查詢 DSL 語句
let update_content = json!({
"properties":{
"age":{
"type":"integer"
}
}
});
// 3、發(fā)送請求
let response = client.send::<Vec<u8>, ()>(
Method::Put,
format!("/mgr/_mapping").as_str(),
HeaderMap::new(),
None,
Some(update_content.to_string().as_bytes().to_vec()),
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
println!("{:?}", response);
assert_eq!(response.status_code().is_success(), true);
}刪除索引庫
#[tokio::test]
async fn test_delete_index() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、發(fā)送請求
let response = client.send::<(), ()>(
Method::Delete,
format!("/mgr").as_str(),
HeaderMap::new(),
None,
None,
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
assert_eq!(response.status_code().is_success(), true);
}Rust客戶端操作文檔
添加文檔
#[tokio::test]
async fn test_create_doc() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、定義查詢 DSL 語句
let doc_content =
json!({
"id": "1",
"user": "kimchy",
"post_date": "2009-11-15T00:00:00Z",
"message": "Trying out Elasticsearch, so far so good?"
});
// 3、發(fā)送請求
let response = client.send::<Vec<u8>, ()>(
Method::Post,
format!("/mgr/_doc/1").as_str(),
HeaderMap::new(),
None,
Some(doc_content.to_string().as_bytes().to_vec()),
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
println!("{:?}", response);
assert_eq!(response.status_code().is_success(), true);
}查詢文檔是否存在
#[tokio::test]
async fn test_get_doc() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、發(fā)送請求
let response = client.send::<Vec<u8>, ()>(
Method::Get,
format!("/mgr/_doc/1").as_str(),
HeaderMap::new(),
None,
None,
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
println!("{:?}", response);
assert_eq!(response.status_code().is_success(), true);
}更新文檔
#[tokio::test]
async fn test_update_doc() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、定義查詢 DSL 語句
let doc_content =
json!({
"doc": {
"message": "Updated message"
}
});
// 3、發(fā)送請求
let response = client.send::<Vec<u8>, ()>(
Method::Post,
format!("/mgr/_update/1").as_str(),
HeaderMap::new(),
None,
Some(doc_content.to_string().as_bytes().to_vec()),
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
println!("{:?}", response);
assert_eq!(response.status_code().is_success(), true);
}刪除文檔
#[tokio::test]
async fn test_delete_doc() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、發(fā)送請求
let response = client.send::<Vec<u8>, ()>(
Method::Delete,
format!("/mgr/_doc/1").as_str(),
HeaderMap::new(),
None,
None,
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
println!("{:?}", response);
assert_eq!(response.status_code().is_success(), true);
}批量添加文檔
#[tokio::test]
async fn test_bulk_add_to_mgr() {
// 1、創(chuàng)建client
let client_result = EsClient::build_from_config(&CONFIG).await;
assert!(client_result.is_ok());
let client = client_result.unwrap();
// 2、定義DSL語句
let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);
// 添加第一個操作和文檔
body.push(json!({"index": {"_id": "1"}}).into());
body.push(
json!({
"id": 1,
"user": "kimchy",
"post_date": "2009-11-15T00:00:00Z",
"message": "Trying out Elasticsearch, so far so good?"
}).into()
);
// 添加第二個操作和文檔
body.push(json!({"index": {"_id": "2"}}).into());
body.push(
json!({
"id": 2,
"user": "forloop",
"post_date": "2020-01-08T00:00:00Z",
"message": "Bulk indexing with the rust client, yeah!"
}).into()
);
// 3、發(fā)送請求
let response = client.bulk(BulkParts::Index("mgr")).body(body).send().await.unwrap();
assert!(response.status_code().is_success());
}Rust客戶端操作搜索
這里演示在請求體body中進行API調用
- 查詢所有:查出所有數(shù)據(jù)
- 全文檢索查詢(full text):利用分詞器對內容分詞,從倒排索引庫中查詢
- match_query
- multi_match_query
- 精確查詢:根據(jù)精確值查詢,如integer、keyword、日期
- id
- range:根據(jù)值的范圍查詢
- term:根據(jù)詞條精確值查詢
- 地理坐標查詢(geo):根據(jù)經緯度查詢
- geo_distance:查詢geo_point指定距離范圍內的所有文檔
- geo_bounding_box:查詢geo_point值落在某個矩形范圍內的所有文檔
- 復合查詢(compound):將上述條件組合起來
查詢所有
默認10條
#[tokio::test]
async fn test_search_match_all() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2. 執(zhí)行搜索
let response = client
.search(SearchParts::Index(&["mgr"]))
.from(0)
.size(5)
.body(
json!({
"query": {
"match_all": {
}
}
})
)
.send().await
.unwrap();
// 3. 解析響應
let response_body = response.json::<Value>().await.unwrap();
// 搜索耗時
let took = response_body["took"].as_i64().unwrap();
println!("took: {}ms", took);
// 搜索結果
for hit in response_body["hits"]["hits"].as_array().unwrap() {
println!("{:?}", hit["_source"]);
}
}等價于
GET /mgr/_search
{
"query": {
"match_all": {}
}
}全文搜索
message為文檔中的字段
#[tokio::test]
async fn test_search_match() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2. 執(zhí)行搜索
let response = client
.search(SearchParts::Index(&["mgr"]))
.from(0)
.size(5)
.body(
json!({
"query": {
"match": {
"message": "good"
}
}
})
)
.send().await
.unwrap();
// 3. 解析響應
let response_body = response.json::<Value>().await.unwrap();
// 搜索耗時
let took = response_body["took"].as_i64().unwrap();
println!("took: {}ms", took);
// 搜索結果
for hit in response_body["hits"]["hits"].as_array().unwrap() {
println!("{:?}", hit["_source"]);
}
}相當于
GET /mgr/_search
{
"query": {
"match": {
"message": "good"
}
}
}多字段查詢
多字段查詢效率低,一般在創(chuàng)建時使用copy_to到一個字段中
#[tokio::test]
async fn test_search_multi_match() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2. 執(zhí)行搜索
let response = client
.search(SearchParts::Index(&["mgr"]))
.from(0)
.size(5)
.body(
json!({
"query": {
"multi_match": {
"query": "good",
"fields": [
"message",
"user"
]
}
}
})
)
.send().await
.unwrap();
// 3. 解析響應
let response_body = response.json::<Value>().await.unwrap();
// 搜索耗時
let took = response_body["took"].as_i64().unwrap();
println!("took: {}ms", took);
// 搜索結果
for hit in response_body["hits"]["hits"].as_array().unwrap() {
println!("{:?}", hit["_source"]);
}
}相當于
GET /mgr/_search
{
"query": {
"multi_match": {
"query": "good",
"fields": [
"message",
"user"
]
}
}
}根據(jù)范圍查詢(range)
gte大于等于,lte小于等于;gt大于lt小于
#[tokio::test]
async fn test_search_range() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2. 執(zhí)行搜索
let response = client
.search(SearchParts::Index(&["mgr"]))
.from(0)
.size(5)
.body(
json!({
"query": {
"range": {
"id": {
"gte": 1,
"lte": 1
}
}
}
})
)
.send().await
.unwrap();
// 3. 解析響應
let response_body = response.json::<Value>().await.unwrap();
// 搜索耗時
let took = response_body["took"].as_i64().unwrap();
println!("took: {}ms", took);
// 搜索結果
for hit in response_body["hits"]["hits"].as_array().unwrap() {
println!("{:?}", hit["_source"]);
}
}相當于
GET /mgr/_search
{
"query": {
"range": {
"id": {
"gte": 1,
"lte": 1
}
}
}
}根據(jù)詞條精確查詢(term)
#[tokio::test]
async fn test_search_term() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2. 執(zhí)行搜索
let response = client
.search(SearchParts::Index(&["mgr"]))
.from(0)
.size(5)
.body(
json!({
"query": {
"term": {
"user": "kimchy"
}
}
})
)
.send().await
.unwrap();
// 3. 解析響應
let response_body = response.json::<Value>().await.unwrap();
// 搜索耗時
let took = response_body["took"].as_i64().unwrap();
println!("took: {}ms", took);
// 搜索結果
for hit in response_body["hits"]["hits"].as_array().unwrap() {
println!("{:?}", hit["_source"]);
}
}相當于
GET /mgr/_search
{
"query": {
"term": {
"user": "kimchy"
}
}
}根據(jù)地理距離查詢
GET /mgr/_search
{
"query": {
"geo_distance": {
"distance": "100km",
"location": "31.04, 45.12"
}
}
}根據(jù)指定矩形范圍查詢
左上經緯度與右下經緯度
geo為文檔中的字段
GET /mgr/_search
{
"query": {
"geo_bounding_box": {
"geo": {
"top_left": {
"lon": 124.45,
"lat": 32.11
},
"bottom_right": {
"lon": 125.12,
"lat": 30.21
}
}
}
}
}復合查詢
查詢時文檔會對搜索詞條的關聯(lián)度打分_score,返回結果時按照降序排列
關聯(lián)度計算方法
- TF-IDF算法(ES5.0之前)
TF(詞條頻率)= 詞條出現(xiàn)次數(shù)/文檔中詞條總數(shù)
IDF(逆文檔頻率)=log(文檔總數(shù)/包含詞條的文檔總數(shù))
score = ∑(??=1,??)(TF*IDF):將詞條頻率與逆文檔頻率相乘再求和
- BM25算法(ES5.0之后)
默認采用BM25算法:考慮了TF、IDF、文檔長度等因素,能夠平衡長短文的關聯(lián)度

function_score修改關聯(lián)度
指定文檔和算分函數(shù)
GET /mgr/_search
{
"query": {
"function_score": {
"query": {
"match": {// 查詢方法
"message": "good"
}
},
"functions": [ // 算分函數(shù)
{
"filter": {// 只有符合過濾條件的才被計算
"term": {// 根據(jù)詞條精確查詢
"id": 1
}
},
"weight": 3 // 指定加權函數(shù)
}
],
// 加權模式:相乘
"boost_mode": "multiply"
}
}
}weight:給定常量值,還可以指定以下值field_value_factor:用文檔中的指定字段值作為函數(shù)結果random_score:隨機生成一個值script_score:自定義計算公式boost_mode:加權模式,multiply與原來的_score相乘,還可以配置:replace:替換原來的_scoresum:求和avg:取平均值min:取最小值max:取最大值
相當于
#[tokio::test]
async fn test_function_score_query() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2. 執(zhí)行搜索
let response = client
.search(SearchParts::Index(&["mgr"]))
.from(0)
.size(5)
.body(
json!({
"query": {
"function_score": {
"query": {
"match": {// 查詢方法
"message": "good"
}
},
"functions": [ // 算分函數(shù)
{
"filter": {// 只有符合過濾條件的才被計算
"term": {// 根據(jù)詞條精確查詢
"id": 1
}
},
"weight": 3 // 指定加權函數(shù)
}
],
// 加權模式:相乘
"boost_mode": "multiply"
}
}
})
)
.send().await
.unwrap();
// 3. 解析響應
let response_body = response.json::<Value>().await.unwrap();
// 搜索耗時
let took = response_body["took"].as_i64().unwrap();
println!("took: {}ms", took);
// 搜索結果
for hit in response_body["hits"]["hits"].as_array().unwrap() {
println!("{:?}", hit["_source"]);
}
}boolean query 布爾查詢
布爾查詢是一個或多個子句查詢的組合,組合方式有
must:必須匹配每個子查詢,類似于“與”should:選擇性匹配子查詢,類似于“或”must_not:必須不匹配,不參與算分,類似于“非”filter:必須匹配,
查詢message中包含rust,post_date不小于2020年1月1日的文檔
GET /mgr/_search
{
"query": {
"bool": {
"must": [
{
"match_phrase": {
"message": "rust"
}
}
],
"must_not": [
{
"range": {
"post_date": {
"lt": "2020-01-01T00:00:00Z"
}
}
}
]
}
}
}搜索結果處理
排序
GET /mgr/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"id": "desc"http:// ASC升序,DESC降序
}
]
}地理位置排序
GET /mgr/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"_geo_distance":{
"FIELD": {
"lat": 40,// 緯度
"lon": -70// 經度
},
"order":"asc",// 排序方式
"unit":"km" // 單位
}
}
]
}分頁
1、from+size分頁查詢(默認10條數(shù)據(jù))
GET /mgr/_search
{
"query": {
"match_all": {}
},
"from":1,// 分頁開始位置
"size":10,// 期望獲取的文檔總數(shù)
"sort": [
{
"id": "desc"http:// ASC升序,DESC降序
}
]
}深度分頁問題:一般將ES作為分布式部署,當需要"from"=990,"size"=10查數(shù)據(jù)時:
1、先從每個數(shù)據(jù)分片上查詢前1000條數(shù)據(jù)
2、將所有節(jié)點的結果聚合,在內存中重新排序選出前1000條文檔
3、在這1000條文檔中選取"from"=990,"size"=10的數(shù)據(jù)
如果搜索頁數(shù)過深,或者結果集(from+size)越大,對內存和CPU的消耗越高,因此ES設定的查詢上限是
10000
深度分頁解決方案:
2、search after分頁查詢:分頁時排序,從上一次的排序值開始查詢下一頁文檔(只能向后查詢)
3、scroll分頁查詢:將排序數(shù)據(jù)形成快照,保存在內存中(內存消耗大,官方不推薦)
高亮處理
搜索鍵盤時關鍵字高亮

highlight指定高亮字段
默認搜索字段和高亮字段匹配才高亮
GET /mgr/_search
{
"query": {
"match": {
"message":"rust"http:// 搜索message中包含rust的文檔
}
},
"highlight":{
"fields":{
"message":{// 指定高亮字段
"require_field_match":"false"http:// 搜索字段和高亮字段可以不匹配
}
}
}
}數(shù)據(jù)聚合
聚合(aggregations)可以實現(xiàn)對文檔數(shù)據(jù)的統(tǒng)計、分析、運算,聚合分類:
- 桶(Buket):用來對數(shù)據(jù)分組
- https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket.html
- TermAggregation:按文檔字段或詞條值分組
- Date Histogram:按日期階梯分組,如一周為一組
- 度量(Metric):用于計算一些值,如最大值、最小值、平均值
- https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics.html
- Avg:求平均值
- Max:求最大值
- Min:求最小值
- Sum:求和
- Stats:同時求Max、Min、Avg、Sum等
- 管道(pipeline):以其他聚合的結果作為聚合的基礎
- https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-pipeline.html
桶(Buket)
Buket默認統(tǒng)計其中的文檔數(shù)量_count,并且按照降序排序
GET /mgr/_search
{
"size":0,// 文檔大小,結果不包含文檔,只包含聚合結果
"aggs": {//指定聚合
"idAgg": {// 聚合名
"terms": {// 精確查詢
"field":"id",// 指定字段
"order":{
"_count":"asc"http:// 按升序排序
}
}
}
}
}度量(Metric)
GET /mgr/_search
{
"size":0,// 文檔大小,結果不包含文檔,只包含聚合結果
"aggs": {//指定聚合
"idAgg": {// 聚合名
"terms": {// 精確查詢
"field":"id",// 指定字段
"size":20
},
"aggs":{// 子聚合
"score_stats":{// 聚合名
"max":{//聚合類型,min、max、avg等
"field":"score"http:// 聚合字段
}
}
}
}
}
}自動補全
拼音補全
如果你想要通過拼音補全,請下載解壓拼音分詞器上傳到/opt/es/plugins目錄然后重啟es
https://github.com/infinilabs/analysis-pinyin/releases
- 補全字段必須是
completion類型 - 拼音分詞需要自定義分詞器
進行拼音分詞:創(chuàng)建索引并設置字段類型為completion,同時指定先分詞再根據(jù)詞條過濾(如果不自定義分詞器,默認將每個漢字單獨分為拼音,所以先分詞詞條再進行拼音處理),其他設置見github倉庫
PUT /test
{
"settings": {// 設置
"analysis": {
"analyzer": {// 設置分詞器
"my_analyzer": {// 分詞器名
"filters": [
"lowercase",// 轉小寫
"stop"http:// 去停用詞
],
"tokenizer": "ik_max_word", // 分詞器
"filter": "py" // 過濾時進行拼音
}
}
},
"filter": { // 自定義tokenizer filter
"py": { // 過濾器名稱
"type": "pinyin", // 過濾器類型,這里是pinyin
"keep_full_pinyin": false,// 是否保留完整的拼音形式
"keep_joined_full_pinyin": true,// 是否保留連接起來的完整拼音形式
"keep_original": true,// 是否保留原始的文本內容
"limit_first_letter_length": 16,// 限制拼音首字母的長度為 16
"remove_duplicated_term": true,// 是否移除重復的詞條
"none_chinese_pinyin_tokenize": false// 不對非中文字符進行拼音分詞
}
}
},
"mappings": {
"properties": {
"user": {
"type": "completion"
}
}
}
}不進行拼音分詞:創(chuàng)建索引并設置字段類型為completion
PUT /test
{
"mappings": {
"properties": {
"user": {
"type": "completion"
}
}
}
}添加文檔
POST /test/_doc/1
{
"id": 1,
"message": "Trying out Elasticsearch, so far so good?",
"post_date": "2009-11-15T00:00:00Z",
"user": "kimchy"
}根據(jù)關鍵字查詢補全
GET /test/_search
{
"suggest": {
"YOUR_SUGGESTION": {// 指定自動補全查詢名字
"text": "k",// 關鍵字前綴
"completion": {// 自動補全類型
"field": "user",// 補全字段
"skip_duplicates": true,// 是否跳過重復的建議
"size": 10 // 獲取前10條結果
}
}
}
}所有代碼地址:https://github.com/VCCICCV/MGR/blob/main/auth/infrastructure/src/client/es.rs
到此這篇關于Rust整合Elasticsearch的詳細過程(收藏)的文章就介紹到這了,更多相關Rust整合Elasticsearch內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

