使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作(2)
前言 :
上一篇文章:如何使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作
模擬學(xué)生個人信息寫入es數(shù)據(jù)庫,包括姓名、性別、年齡、特點(diǎn)、科目、成績,創(chuàng)建時間。
方案一
在寫入數(shù)據(jù)時未提前創(chuàng)建索引mapping,而是每插入一條數(shù)據(jù)都包含了索引的信息。
示例代碼:【多線程寫入數(shù)據(jù)】【一次性寫入10000*1000條數(shù)據(jù)】 【本人親測耗時3266秒】
from elasticsearch import Elasticsearch from elasticsearch import helpers from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自負(fù),不以自我為中心', '努力、積極、樂觀、拼搏是我的人生信條', '抗壓能力強(qiáng),能夠快速適應(yīng)周圍環(huán)境', '敢做敢拼,腳踏實(shí)地;做事認(rèn)真負(fù)責(zé),責(zé)任心強(qiáng)', '愛好所學(xué)專業(yè),樂于學(xué)習(xí)新知識;對工作有責(zé)任心;踏實(shí),熱情,對生活充滿激情', '主動性強(qiáng),自學(xué)能力強(qiáng),具有團(tuán)隊合作意識,有一定組織能力', '忠實(shí)誠信,講原則,說到做到,決不推卸責(zé)任', '有自制力,做事情始終堅持有始有終,從不半途而廢', '肯學(xué)習(xí),有問題不逃避,愿意虛心向他人學(xué)習(xí)', '愿意以謙虛態(tài)度贊揚(yáng)接納優(yōu)越者,權(quán)威者', '會用100%的熱情和精力投入到工作中;平易近人', '為人誠懇,性格開朗,積極進(jìn)取,適應(yīng)力強(qiáng)、勤奮好學(xué)、腳踏實(shí)地', '有較強(qiáng)的團(tuán)隊精神,工作積極進(jìn)取,態(tài)度認(rèn)真'] subjects = ['語文', '數(shù)學(xué)', '英語', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') def save_to_es(num): """ 批量寫入數(shù)據(jù)到es數(shù)據(jù)庫 :param num: :return: """ start = time.time() action = [ { "_index": "personal_info_10000000", "_type": "doc", "_id": i, "_source": { "id": i, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } } for i in range(10000 * num, 10000 * num + 10000) ] helpers.bulk(es, action) end = time.time() print(f"{num}耗時{end - start}s!") def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num) if __name__ == '__main__': start = time.time() queue = Queue() # 序號數(shù)據(jù)進(jìn)隊列 for num in range(1000): queue.put(num) # 多線程執(zhí)行程序 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程序執(zhí)行完畢!花費(fèi)時間:', end - start)
運(yùn)行結(jié)果:
自動創(chuàng)建的索引mapping:
GET personal_info_10000000/_mapping { "personal_info_10000000" : { "mappings" : { "properties" : { "age" : { "type" : "long" }, "character" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "create_time" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "grade" : { "type" : "long" }, "id" : { "type" : "long" }, "name" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "sex" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "subject" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } } } }
方案二
1.順序插入5000000條數(shù)據(jù)
先創(chuàng)建索引personal_info_5000000,確定好mapping后,再插入數(shù)據(jù)。
新建索引并設(shè)置mapping信息:
PUT personal_info_5000000 { "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "properties": { "id": { "type": "long" }, "name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 32 } } }, "sex": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 8 } } }, "age": { "type": "long" }, "character": { "type": "text", "analyzer": "ik_smart", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "subject": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "grade": { "type": "long" }, "create_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" } } } }
查看新建索引信息:
GET personal_info_5000000 { "personal_info_5000000" : { "aliases" : { }, "mappings" : { "properties" : { "age" : { "type" : "long" }, "character" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } }, "analyzer" : "ik_smart" }, "create_time" : { "type" : "date", "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" }, "grade" : { "type" : "long" }, "id" : { "type" : "long" }, "name" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 32 } } }, "sex" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 8 } } }, "subject" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } }, "settings" : { "index" : { "routing" : { "allocation" : { "include" : { "_tier_preference" : "data_content" } } }, "number_of_shards" : "3", "provided_name" : "personal_info_50000000", "creation_date" : "1663471072176", "number_of_replicas" : "1", "uuid" : "5DfmfUhUTJeGk1k4XnN-lQ", "version" : { "created" : "7170699" } } } } }
開始插入數(shù)據(jù):
示例代碼: 【單線程寫入數(shù)據(jù)】【一次性寫入10000*500條數(shù)據(jù)】 【本人親測耗時7916秒】
from elasticsearch import Elasticsearch from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自負(fù),不以自我為中心', '努力、積極、樂觀、拼搏是我的人生信條', '抗壓能力強(qiáng),能夠快速適應(yīng)周圍環(huán)境', '敢做敢拼,腳踏實(shí)地;做事認(rèn)真負(fù)責(zé),責(zé)任心強(qiáng)', '愛好所學(xué)專業(yè),樂于學(xué)習(xí)新知識;對工作有責(zé)任心;踏實(shí),熱情,對生活充滿激情', '主動性強(qiáng),自學(xué)能力強(qiáng),具有團(tuán)隊合作意識,有一定組織能力', '忠實(shí)誠信,講原則,說到做到,決不推卸責(zé)任', '有自制力,做事情始終堅持有始有終,從不半途而廢', '肯學(xué)習(xí),有問題不逃避,愿意虛心向他人學(xué)習(xí)', '愿意以謙虛態(tài)度贊揚(yáng)接納優(yōu)越者,權(quán)威者', '會用100%的熱情和精力投入到工作中;平易近人', '為人誠懇,性格開朗,積極進(jìn)取,適應(yīng)力強(qiáng)、勤奮好學(xué)、腳踏實(shí)地', '有較強(qiáng)的團(tuán)隊精神,工作積極進(jìn)取,態(tài)度認(rèn)真'] subjects = ['語文', '數(shù)學(xué)', '英語', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 添加程序耗時的功能 def timer(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start)) return res return wrapper @timer def save_to_es(num): """ 順序?qū)懭霐?shù)據(jù)到es數(shù)據(jù)庫 :param num: :return: """ body = { "id": num, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } # 此時若索引不存在時會新建 es.index(index="personal_info_5000000", id=num, doc_type="_doc", document=body) def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num) if __name__ == '__main__': start = time.time() queue = Queue() # 序號數(shù)據(jù)進(jìn)隊列 for num in range(5000000): queue.put(num) # 多線程執(zhí)行程序 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程序執(zhí)行完畢!花費(fèi)時間:', end - start)
運(yùn)行結(jié)果:
2.批量插入5000000條數(shù)據(jù)
先創(chuàng)建索引personal_info_5000000_v2,確定好mapping后,再插入數(shù)據(jù)。
新建索引并設(shè)置mapping信息:
PUT personal_info_5000000_v2 { "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "properties": { "id": { "type": "long" }, "name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 32 } } }, "sex": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 8 } } }, "age": { "type": "long" }, "character": { "type": "text", "analyzer": "ik_smart", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "subject": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "grade": { "type": "long" }, "create_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" } } } }
查看新建索引信息:
GET personal_info_5000000_v2 { "personal_info_5000000_v2" : { "aliases" : { }, "mappings" : { "properties" : { "age" : { "type" : "long" }, "character" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } }, "analyzer" : "ik_smart" }, "create_time" : { "type" : "date", "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" }, "grade" : { "type" : "long" }, "id" : { "type" : "long" }, "name" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 32 } } }, "sex" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 8 } } }, "subject" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } }, "settings" : { "index" : { "routing" : { "allocation" : { "include" : { "_tier_preference" : "data_content" } } }, "number_of_shards" : "3", "provided_name" : "personal_info_5000000_v2", "creation_date" : "1663485323617", "number_of_replicas" : "1", "uuid" : "XBPaDn_gREmAoJmdRyBMAA", "version" : { "created" : "7170699" } } } } }
批量插入數(shù)據(jù):
通過elasticsearch模塊導(dǎo)入helper,通過helper.bulk來批量處理大量的數(shù)據(jù)。首先將所有的數(shù)據(jù)定義成字典形式,各字段含義如下:
- _index對應(yīng)索引名稱,并且該索引必須存在。
- _type對應(yīng)類型名稱。
- _source對應(yīng)的字典內(nèi),每一篇文檔的字段和值,可有有多個字段。
示例代碼: 【程序中途異常,寫入4714000條數(shù)據(jù)】
from elasticsearch import Elasticsearch from elasticsearch import helpers from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自負(fù),不以自我為中心', '努力、積極、樂觀、拼搏是我的人生信條', '抗壓能力強(qiáng),能夠快速適應(yīng)周圍環(huán)境', '敢做敢拼,腳踏實(shí)地;做事認(rèn)真負(fù)責(zé),責(zé)任心強(qiáng)', '愛好所學(xué)專業(yè),樂于學(xué)習(xí)新知識;對工作有責(zé)任心;踏實(shí),熱情,對生活充滿激情', '主動性強(qiáng),自學(xué)能力強(qiáng),具有團(tuán)隊合作意識,有一定組織能力', '忠實(shí)誠信,講原則,說到做到,決不推卸責(zé)任', '有自制力,做事情始終堅持有始有終,從不半途而廢', '肯學(xué)習(xí),有問題不逃避,愿意虛心向他人學(xué)習(xí)', '愿意以謙虛態(tài)度贊揚(yáng)接納優(yōu)越者,權(quán)威者', '會用100%的熱情和精力投入到工作中;平易近人', '為人誠懇,性格開朗,積極進(jìn)取,適應(yīng)力強(qiáng)、勤奮好學(xué)、腳踏實(shí)地', '有較強(qiáng)的團(tuán)隊精神,工作積極進(jìn)取,態(tài)度認(rèn)真'] subjects = ['語文', '數(shù)學(xué)', '英語', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 添加程序耗時的功能 def timer(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start)) return res return wrapper @timer def save_to_es(num): """ 批量寫入數(shù)據(jù)到es數(shù)據(jù)庫 :param num: :return: """ action = [ { "_index": "personal_info_5000000_v2", "_type": "_doc", "_id": i, "_source": { "id": i, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } } for i in range(10000 * num, 10000 * num + 10000) ] helpers.bulk(es, action) def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num) if __name__ == '__main__': start = time.time() queue = Queue() # 序號數(shù)據(jù)進(jìn)隊列 for num in range(500): queue.put(num) # 多線程執(zhí)行程序 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程序執(zhí)行完畢!花費(fèi)時間:', end - start)
運(yùn)行結(jié)果:
3.批量插入50000000條數(shù)據(jù)
先創(chuàng)建索引personal_info_5000000_v2,確定好mapping后,再插入數(shù)據(jù)。
此過程是在上面批量插入的前提下進(jìn)行優(yōu)化,采用python生成器。
建立索引和mapping同上,直接上代碼:
示例代碼: 【程序中途異常,寫入3688000條數(shù)據(jù)】
from elasticsearch import Elasticsearch from elasticsearch import helpers from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自負(fù),不以自我為中心', '努力、積極、樂觀、拼搏是我的人生信條', '抗壓能力強(qiáng),能夠快速適應(yīng)周圍環(huán)境', '敢做敢拼,腳踏實(shí)地;做事認(rèn)真負(fù)責(zé),責(zé)任心強(qiáng)', '愛好所學(xué)專業(yè),樂于學(xué)習(xí)新知識;對工作有責(zé)任心;踏實(shí),熱情,對生活充滿激情', '主動性強(qiáng),自學(xué)能力強(qiáng),具有團(tuán)隊合作意識,有一定組織能力', '忠實(shí)誠信,講原則,說到做到,決不推卸責(zé)任', '有自制力,做事情始終堅持有始有終,從不半途而廢', '肯學(xué)習(xí),有問題不逃避,愿意虛心向他人學(xué)習(xí)', '愿意以謙虛態(tài)度贊揚(yáng)接納優(yōu)越者,權(quán)威者', '會用100%的熱情和精力投入到工作中;平易近人', '為人誠懇,性格開朗,積極進(jìn)取,適應(yīng)力強(qiáng)、勤奮好學(xué)、腳踏實(shí)地', '有較強(qiáng)的團(tuán)隊精神,工作積極進(jìn)取,態(tài)度認(rèn)真'] subjects = ['語文', '數(shù)學(xué)', '英語', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 添加程序耗時的功能 def timer(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start)) return res return wrapper @timer def save_to_es(num): """ 使用生成器批量寫入數(shù)據(jù)到es數(shù)據(jù)庫 :param num: :return: """ action = ( { "_index": "personal_info_5000000_v3", "_type": "_doc", "_id": i, "_source": { "id": i, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } } for i in range(10000 * num, 10000 * num + 10000) ) helpers.bulk(es, action) def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num) if __name__ == '__main__': start = time.time() queue = Queue() # 序號數(shù)據(jù)進(jìn)隊列 for num in range(500): queue.put(num) # 多線程執(zhí)行程序 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程序執(zhí)行完畢!花費(fèi)時間:', end - start)
運(yùn)行結(jié)果:
到此這篇關(guān)于使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作(2)的文章就介紹到這了,更多相關(guān)python生成 數(shù)據(jù) 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python實(shí)現(xiàn)高斯判別分析算法的例子
今天小編就為大家分享一篇python實(shí)現(xiàn)高斯判別分析算法的例子,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-12-12詳解Python prometheus_client使用方式
本文主要介紹了Python prometheus_client使用方式,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02python生成每日報表數(shù)據(jù)(Excel)并郵件發(fā)送的實(shí)例
今天小編就為大家分享一篇python生成每日報表數(shù)據(jù)(Excel)并郵件發(fā)送的實(shí)例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-02-02Python實(shí)現(xiàn)將圖像轉(zhuǎn)換為ASCII字符圖
使用Python進(jìn)行圖像處理,非??旖莘奖悖喍處仔写a就可以實(shí)現(xiàn)功能強(qiáng)大的效果。在這篇文章中,我們將使用Python將圖像轉(zhuǎn)換為ASCII字符照,感興趣的可以了解一下2022-08-08pycharm工具連接mysql數(shù)據(jù)庫失敗問題
這篇文章主要介紹了pycharm工具連接mysql數(shù)據(jù)庫失敗問題及解決方法,非常不錯大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-04-04python動態(tài)文本進(jìn)度條的實(shí)例代碼
這篇文章主要介紹了python動態(tài)文本進(jìn)度條的實(shí)例代碼,代碼簡單易懂,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下2020-01-01python encrypt 實(shí)現(xiàn)AES加密的實(shí)例詳解
在本篇文章里小編給大家分享的是關(guān)于python encrypt 實(shí)現(xiàn)AES加密的實(shí)例內(nèi)容,有興趣的朋友們可以參考下。2020-02-02