亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Python如何把Spark數(shù)據(jù)寫入ElasticSearch

 更新時(shí)間:2020年04月18日 16:13:17   作者:阿布gogo  
這篇文章主要介紹了Python如何把Spark數(shù)據(jù)寫入ElasticSearch,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下

這里以將Apache的日志寫入到ElasticSearch為例,來(lái)演示一下如何使用Python將Spark數(shù)據(jù)導(dǎo)入到ES中。

實(shí)際工作中,由于數(shù)據(jù)與使用框架或技術(shù)的復(fù)雜性,數(shù)據(jù)的寫入變得比較復(fù)雜,在這里我們簡(jiǎn)單演示一下。

如果使用Scala或Java的話,Spark提供自帶了支持寫入ES的支持庫(kù),但Python不支持。所以首先你需要去這里下載依賴的ES官方開(kāi)發(fā)的依賴包包。

下載完成后,放在本地目錄,以下面命令方式啟動(dòng)pyspark:

pyspark --jars elasticsearch-hadoop-6.4.1.jar

如果你想pyspark使用Python3,請(qǐng)?jiān)O(shè)置環(huán)境變量:

export PYSPARK_PYTHON=/usr/bin/python3
理解如何寫入ES的關(guān)鍵是要明白,ES是一個(gè)JSON格式的數(shù)據(jù)庫(kù),它有一個(gè)必須的要求。數(shù)據(jù)格式必須采用以下格式

{ "id: { the rest of your json}}

往下會(huì)展示如何轉(zhuǎn)換成這種格式。

解析Apache日志文件
我們將Apache的日志文件讀入,構(gòu)建Spark RDD。然后我們寫一個(gè)parse()函數(shù)用正則表達(dá)式處理每條日志,提取我們需要的字

rdd = sc.textFile("/home/ubuntu/walker/apache_logs")
regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'

p=re.compile(regex)
def parse(str):
  s=p.match(str)
  d = {}
  d['ip']=s.group(1)
  d['date']=s.group(4)
  d['operation']=s.group(5)
  d['uri']=s.group(6)
  return d

換句話說(shuō),我們剛開(kāi)始從日志文件讀入RDD的數(shù)據(jù)類似如下:

['83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"']

然后我們使用map函數(shù)轉(zhuǎn)換每條記錄:

rdd2 = rdd.map(parse)

rdd2.take(1)

[{'date': '17/May/2015:10:05:03 +0000', 'ip': '83.149.9.216', 'operation': 'GET', 'uri': '/presentations/logstash-monitorama-2013/images/kibana-search.png'}]

現(xiàn)在看起來(lái)像JSON,但并不是JSON字符串,我們需要使用json.dumps將dict對(duì)象轉(zhuǎn)換。

我們同時(shí)增加一個(gè)doc_id字段作為整個(gè)JSON的ID。在配置ES中我們?cè)黾尤缦屡渲谩癳s.mapping.id”: “doc_id”告訴ES我們將這個(gè)字段作為ID。

這里我們使用SHA算法,將這個(gè)JSON字符串作為參數(shù),得到一個(gè)唯一ID。
計(jì)算結(jié)果類似如下,可以看到ID是一個(gè)很長(zhǎng)的SHA數(shù)值。

rdd3.take(1)

[('a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c', '{"date": "17/May/2015:10:05:03 +0000", "ip": "83.149.9.216", "operation": "GET", "doc_id": "a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c", "uri": "/presentations/logstash-monitorama-2013/images/kibana-search.png"}')]

現(xiàn)在我們需要制定ES配置,比較重要的兩項(xiàng)是:

  • “es.resource” : ‘walker/apache': "walker"是索引,apache是類型,兩者一般合稱索引
  • “es.mapping.id”: “doc_id”: 告訴ES那個(gè)字段作為整個(gè)文檔的ID,也就是查詢結(jié)果中的_id

其他的配置自己去探索。

然后我們使用saveAsNewAPIHadoopFile()將RDD寫入到ES。這部分代碼對(duì)于所有的ES都是一樣的,比較固定,不需要理解每一個(gè)細(xì)節(jié)

es_write_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : 'walker/apache',
    "es.input.json": "yes",
    "es.mapping.id": "doc_id"
  }
    
rdd3.saveAsNewAPIHadoopFile(
    path='-',
   outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_write_conf)

rdd3 = rdd2.map(addID)

def addId(data):
  j=json.dumps(data).encode('ascii', 'ignore')
  data['doc_id'] = hashlib.sha224(j).hexdigest()
  return (data['doc_id'], json.dumps(data))

最后我們可以使用curl進(jìn)行查詢

curl http://localhost:9200s/walker/apache/_search?pretty=true&?q=*
{
    "_index" : "walker",
    "_type" : "apache",
    "_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
    "_score" : 1.0,
    "_source" : {
     "date" : "17/May/2015:10:05:32 +0000",
     "ip" : "91.177.205.119",
     "operation" : "GET",
     "doc_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
     "uri" : "/favicon.ico"
    }

如下是所有代碼:

import json
import hashlib
import re

def addId(data):
  j=json.dumps(data).encode('ascii', 'ignore')
  data['doc_id'] = hashlib.sha224(j).hexdigest()
  return (data['doc_id'], json.dumps(data))

def parse(str):
  s=p.match(str)
  d = {}
  d['ip']=s.group(1)
  d['date']=s.group(4)
  d['operation']=s.group(5)
  d['uri']=s.group(6)
  return d  

regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'

p=re.compile(regex)

rdd = sc.textFile("/home/ubuntu/walker/apache_logs")

rdd2 = rdd.map(parse)

rdd3 = rdd2.map(addID)

es_write_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : 'walker/apache',
    "es.input.json": "yes",
    "es.mapping.id": "doc_id"
  }
   
rdd3.saveAsNewAPIHadoopFile(
    path='-',
   outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_write_conf)

也可以這么封裝,其實(shí)原理是一樣的

import hashlib
import json
from pyspark import Sparkcontext

def make_md5(line):
  md5_obj=hashlib.md5()
  md5_obj.encode(line)
  return md5_obj.hexdigest()

def parse(line):
  dic={}
  l = line.split('\t')
  doc_id=make_md5(line)
  dic['name']=l[1]
  dic['age'] =l[2]
  dic['doc_id']=doc_id
  return dic  #記得這邊返回的是字典類型的,在寫入es之前要記得dumps

def saveData2es(pdd, es_host, port,index, index_type, key):
  """
  把saprk的運(yùn)行結(jié)果寫入es
  :param pdd: 一個(gè)rdd類型的數(shù)據(jù)
  :param es_host: 要寫es的ip
  :param index: 要寫入數(shù)據(jù)的索引
  :param index_type: 索引的類型
  :param key: 指定文檔的id,就是要以文檔的那個(gè)字段作為_(kāi)id
  :return:
  """
  #實(shí)例es客戶端記得單例模式
  if es.exist.index(index):
    es.index.create(index, 'spo')
  es_write_conf = {
    "es.nodes": es_host,
    "es.port": port,
    "es.resource": index/index_type,
    "es.input.json": "yes",
    "es.mapping.id": key
  }

  (pdd.map(lambda _dic: ('', json.dumps(_dic))))  #這百年是為把這個(gè)數(shù)據(jù)構(gòu)造成元組格式,如果傳進(jìn)來(lái)的_dic是字典則需要jdumps,如果傳進(jìn)來(lái)之前就已經(jīng)dumps,這便就不需要dumps了
  .saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_write_conf)
  )
if __name__ == '__main__':
  #實(shí)例化sp對(duì)象
  sc=Sparkcontext()
  #文件中的呢內(nèi)容一行一行用sc的讀取出來(lái)
  json_text=sc.textFile('./1.txt')
  #進(jìn)行轉(zhuǎn)換
  json_data=json_text.map(lambda line:parse(line))

  saveData2es(json_data,'127.0.01','9200','index_test','index_type','doc_id')

  sc.stop()

看到了把,面那個(gè)例子在寫入es之前加了一個(gè)id,返回一個(gè)元組格式的,現(xiàn)在這個(gè)封裝指定_id就會(huì)比較靈活了

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • python 正則表達(dá)式貪婪模式與非貪婪模式原理、用法實(shí)例分析

    python 正則表達(dá)式貪婪模式與非貪婪模式原理、用法實(shí)例分析

    這篇文章主要介紹了python 正則表達(dá)式貪婪模式與非貪婪模式原理、用法,結(jié)合實(shí)例形式詳細(xì)分析了python 正則表達(dá)式貪婪模式與非貪婪模式的功能、原理、用法及相關(guān)操作注意事項(xiàng),需要的朋友可以參考下
    2019-10-10
  • Spark處理數(shù)據(jù)排序問(wèn)題如何避免OOM

    Spark處理數(shù)據(jù)排序問(wèn)題如何避免OOM

    這篇文章主要介紹了Spark處理數(shù)據(jù)排序問(wèn)題如何避免OOM,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-05-05
  • opencv實(shí)現(xiàn)文檔矯正

    opencv實(shí)現(xiàn)文檔矯正

    這篇文章主要為大家詳細(xì)介紹了opencv實(shí)現(xiàn)文檔矯正功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-08-08
  • Pandas自定義選項(xiàng)option設(shè)置

    Pandas自定義選項(xiàng)option設(shè)置

    pandas有一個(gè)option系統(tǒng)可以控制pandas的展示情況,一般來(lái)說(shuō)我們不需要進(jìn)行修改,但是不排除特殊情況下的修改需求。本文將會(huì)詳細(xì)講解pandas中的option設(shè)置,感興趣的可以了解下
    2021-07-07
  • Python實(shí)現(xiàn)自動(dòng)玩貪吃蛇程序

    Python實(shí)現(xiàn)自動(dòng)玩貪吃蛇程序

    這篇文章主要介紹了通過(guò)Python實(shí)現(xiàn)的簡(jiǎn)易的自動(dòng)玩貪吃蛇游戲的小程序,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)一學(xué)
    2022-01-01
  • Python中NumPy的ufuncs函數(shù)實(shí)例

    Python中NumPy的ufuncs函數(shù)實(shí)例

    這篇文章主要介紹了Python中NumPy的ufuncs函數(shù)實(shí)例,NumPy是一個(gè)開(kāi)源的Python科學(xué)計(jì)算庫(kù),使用NumPy,就可以很自然地使用數(shù)組和矩陣,本文主要介紹Python Numpy ufuncs通用函數(shù),需要的朋友可以參考下
    2023-07-07
  • python實(shí)現(xiàn)DES加密解密方法實(shí)例詳解

    python實(shí)現(xiàn)DES加密解密方法實(shí)例詳解

    這篇文章主要介紹了python實(shí)現(xiàn)DES加密解密方法,以實(shí)例形式較為詳細(xì)的分析了基于Python實(shí)現(xiàn)的DES加密與解密技巧,需要的朋友可以參考下
    2015-06-06
  • 使用Python構(gòu)建Hopfield網(wǎng)絡(luò)的教程

    使用Python構(gòu)建Hopfield網(wǎng)絡(luò)的教程

    這篇文章主要介紹了使用Python構(gòu)建Hopfield網(wǎng)絡(luò)的教程,本文來(lái)自于IBM官方網(wǎng)站的技術(shù)文檔,需要的朋友可以參考下
    2015-04-04
  • python文件讀寫并使用mysql批量插入示例分享(python操作mysql)

    python文件讀寫并使用mysql批量插入示例分享(python操作mysql)

    這篇文章主要介紹了python文件讀寫并使用mysql批量插入示例,可以學(xué)習(xí)到python操作mysql數(shù)據(jù)庫(kù)的方法,需要的朋友可以參考下
    2014-02-02
  • 用Python爬取指定關(guān)鍵詞的微博

    用Python爬取指定關(guān)鍵詞的微博

    這篇文章主要介紹了用Python爬取指定關(guān)鍵詞的微博,下面文章圍繞Python爬取指定關(guān)鍵詞的微博的相關(guān)資料展開(kāi)詳細(xì)內(nèi)容,需要的朋友可以參考一下
    2021-11-11

最新評(píng)論