亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Spark處理trick總結分析

 更新時間:2022年12月11日 16:01:00   作者:花折亦無情  
這篇文章主要為大家介紹了Spark處理trick總結分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

前言

最近做了很多數據清洗以及摸底的工作,由于處理的數據很大,所以采用了spark進行輔助處理,期間遇到了很多問題,特此記錄一下,供大家學習。

由于比較熟悉python, 所以筆者采用的是pyspark,所以下面給的demo都是基于pyspark,其實其他語言腳本一樣,重在學習思想,具體實現改改對應的API即可。

這里盡可能的把一些坑以及實現技巧以demo的形式直白的提供出來,順序不分先后。有了這些demo,大家在實現自己各種各樣需求尤其是一些有難度需求的時候,就可以參考了,當然了有時間筆者后續(xù)還會更新一些demo,感興趣的同學可以關注下。

trick

首先說一個最基本思想:能map絕不reduce。

換句話說當在實現某一需求時,要盡可能得用map類的算子,這是相當快的。但是聚合類的算子通常來說是相對較慢,如果我們最后不得不用聚合類算子的時候,我們也要把這一步邏輯看看能不能盡可能的往后放,而把一些諸如過濾什么的邏輯往前放,這樣最后的數據量就會越來越少,再進行聚合的時候就會快很多。如果反過來,那就得不償失了,雖然最后實現的效果是一樣的,但是時間差卻是數量級的。

  • 常用API

這里列一下我們最常用的算子

rdd = rdd.filter(lambda x: fun(x))
rdd = rdd.map(lambda x: fun(x))
rdd = rdd.flatMap(lambda x: fun(x))
rdd = rdd.reduceByKey(lambda a, b: a + b)

filter: 過濾,滿足條件的返回True, 需要過濾的返回False。

map: 每條樣本做一些共同的操作。

flatMap: 一條拆分成多條返回,具體的是list。

reduceByKey: 根據key進行聚合。

  • 聚合

一個最常見的場景就是需要對某一個字段進行聚合:假設現在我們有一份流水表,其每一行數據就是一個用戶的一次點擊行為,那現在我們想統計一下每個用戶一共點擊了多少次,更甚至我們想拿到每個用戶點擊過的所有item集合。偽代碼如下:

def get_key_value(x):
  user = x[0]
  item = x[1]
  return (user, [item])
rdd = rdd.map(lambda x: get_key_value(x))
rdd = rdd.reduceByKey(lambda a, b: a + b)

首先我們先通過get_key_value函數將每條數據轉化成(key, value)的形式,然后通過reduceByKey聚合算子進行聚合,它就會把相同key的數據聚合在一起,說到這里,大家可能不覺得有什么?這算什么trick!其實筆者這里想展示的是get_key_value函數返回形式:[item] 。

為了對比,這里筆者再列一下兩者的區(qū)別:

def get_key_value(x):
  user = x[0]
  item = x[1]
  return (user, [item])
def get_key_value(x):
  user = x[0]
  item = x[1]
  return (user, item)

可以看到第一個的value是一個列表,而第二個就是單純的item,我們看reduceByKey這里我們用的具體聚合形式是相加,列表相加就是得到一個更大的列表即:

所以最后我們就拿到了:每個用戶點擊過的所有item集合,具體的是一個列表。

  • 抽樣、分批

在日常中我們需要抽樣出一部分數據進行數據分析或者實驗,甚至我們需要將數據等分成多少份,一份一份用(后面會說),這個時候怎么辦呢?

當然了spark也有類似sample這樣的抽樣算子

那其實我們也可以實現,而且可以靈活控制等分等等且速度非常快,如下:

def get_prefix(x, num):
    prefix = random.randint(1, num)
    return [x, num]
def get_sample(x):
    prefix = x[1]
    if prefix == 1:
        return True
    else:
        return False
rdd = rdd.map(lambda x: get_prefix(x, num))
rdd = rdd.filter(lambda x: get_sample(x))

假設我們需要抽取1/10的數據出來,總的思路就是先給每個樣本打上一個[1,10]的隨機數,然后只過濾出打上1的數據即可。

以此類推,我們還可以得到3/10的數據出來,那就是在過濾的時候,取出打上[1,2,3]的即可,當然了[4,5,6]也行,只要取三個就行。

  • 笛卡爾積

有的時候需要在兩個集合之間做笛卡爾積,假設這兩個集合是A和B即兩個rdd。

首先spark已經提供了對應的API即cartesian,具體如下:

rdd_cartesian = rdd_A.cartesian(rdd_B)

其更具體的用法和返回形式大家可以找找相關博客,很多,筆者這里不再累述。

但是其速度非常慢

尤其當rdd_A和rdd_B比較大的時候,這個時候怎么辦呢?

這個時候我們可以借助廣播機制,其實已經有人也用了這個trick:

http://chabaoo.cn/article/203197.htm

首先說一下spark中的廣播機制,假設一個變量被申請為了廣播機制,那么其實是緩存了一個只讀的變量在每臺機器上,假設當前rdd_A比較小,rdd_B比較大,那么我可以把rdd_A轉化為廣播變量,然后用這個廣播變量和每個rdd_B中的每個元素都去做一個操作,進而實現笛卡爾積的效果,好了,筆者給一下pyspark的實現:

def ops(A, B):
    pass
def fun(A_list, B):
    result = []
    for cur_A in A_list:
        result.append(cur_A + B)
    return result
rdd_A = sc.broadcast(rdd_A.collect())
rdd_cartesian = rdd_B.flatMap(lambda x: fun(rdd_A.value, x))

可以看到我們先把rdd_A轉化為廣播變量,然后通過flatMap,將rdd_A和所有rdd_B中的單個元素進行操作,具體是什么操作大家可以在ops函數中自己定義自己的邏輯。

關于spark的廣播機制更多講解,大家也可以找找文檔,很多的,比如:

https://www.cnblogs.com/Lee-yl/p/9777857.html

但目前為止,其實還沒有真真結束,從上面我們可以看到,rdd_A被轉化為了廣播變量,但是其有一個重要的前提:那就是rdd_A比較小。但是當rdd_A比較大的時候,我們在轉化的過程中,就會報內存錯誤,當然了可以通過增加配置:

spark.driver.maxResultSize=10g

但是如果rdd_A還是極其大呢?換句話說rdd_A和rdd_B都是非常大的,哪一個做廣播變量都是不合適的,怎么辦呢?

其實我們一部分一部分的做。假設我們把rdd_A拆分成10份,這樣的話,每一份的量級就降下來了,然后把每一份轉化為廣播變量且都去和rdd_B做笛卡爾積,最后再匯總一下就可以啦。

有了想法,那么怎么實現呢?

分批大家都會了,如上。但是這里面會有另外一個問題,那就是這個廣播變量名會被重復利用,在進行下一批廣播變量的時候,需要先銷毀,再創(chuàng)建,demo如下:

def ops(A, B):
    pass
def fun(A_list, B):
    result = []
    for cur_A in A_list:
        result.append(cur_A + B)
    return result
def get_rdd_cartesian(rdd_A, rdd_B):   
    rdd_cartesian = rdd_B.flatMap(lambda x: fun(rdd_A.value, x))
    return rdd_cartesian
for i in range(len(rdd_A_batch))
    qb_rdd_temp = rdd_A_batch[i]
    qb_rdd_temp = sc.broadcast(qb_rdd_temp.collect())
    rdd_cartesian_batch = get_rdd_cartesian(qb_rdd_temp, rdd_B)
    dw.saveToTable(rdd_cartesian_batch, tdw_table, "p_" + ds, overwrite=False)
    qb_rdd_temp.unpersist()

可以看到,最主要的就是unpersist()

  • 廣播變量應用之向量索引

說到廣播機制,這里就再介紹一個稍微復雜的demo,乘熱打鐵。

做算法的同學,可能經常會遇到向量索引這一場景:即每一個item被表征成一個embedding,然后兩個item的相似度便可以基于embedding的余弦相似度進行量化。向量索引是指假設來了一個query,候選池子里面假設有幾百萬的doc,最終目的就是要從候選池子中挑選出與query最相似的n個topk個doc。

關于做大規(guī)模數量級的索引已經有很多現成好的API可以用,最常見的包比如有faiss。如果還不熟悉faiss的同學,可以先簡單搜一下其基本用法,看看demo,很簡單。

好啦,假設現在query的量級是10w,doc的量級是100w,面對這么大的量級,我們當然是想通過spark來并行處理,加快計算流程。那么該怎么做呢?

這時我們便可以使用spark的廣播機制進行處理啦,而且很顯然doc應該是廣播變量,因為每一個query都要和全部的doc做計算。

廢話不多說,直接看實現

首先建立doc索引:

# 獲取index embedding,并collect,方便后續(xù)建立索引
index_embedding_list = index_embedding_rdd.collect()
all_ids = np.array([row[1] for row in index_embedding_list], np.str)
all_vectors = np.array([str_to_vec(row[2]) for row in index_embedding_list], np.float32)
del(index_embedding_list)
#faiss.normalize_L2(all_vectors)
print(all_ids[:2])
print(all_vectors[:2])
print("all id size: {}, all vec shape: {}".format(len(all_ids), all_vectors.shape))
# 建立index索引,并轉化為廣播變量
faiss_index = FaissIndex(all_ids, all_vectors, self.args.fast_mode, self.args.nlist, self.args.nprobe)
del(all_vectors)
del(all_ids)
print("broadcast start")
bc_faiss_index = self.sc.broadcast(faiss_index)
print("broadcast done")

這里的index_embedding_rdd就是doc的embedding,可以看到先要collect,然后建立索引。

建立完索引后,就可以開始計算了,但是這里會有一個問題就是query的量級也是比較大的,如果一起計算可能會OM,所以我們分批次進行即batch:

# 開始檢索
# https://blog.csdn.net/wx1528159409/article/details/125879542
query_embedding_rdd = query_embedding_rdd.repartition(300)
top_n = 5
batch_size = 1000
query_sim_rdd = query_embedding_rdd.mapPartitions(
              lambda iters: batch_get_nearest_ids(
                iters, bc_faiss_index, top_n, batch_size
                )
)

假設query_embedding_rdd是全部query的embedding,為了實現batch,我們先將query_embedding_rdd進行分區(qū)repartition,然后每個batch進行,可以看到核心就是batch_get_nearest_ids這個函數:

def batch_get_nearest_ids(iters, bc_faiss_index, top_n, batch_size):
    import mkl
    mkl.get_max_threads()
    res = list()
    rows = list()
    for it in iters:
        rows.append(it)
        if len(rows) >= batch_size:
            batch_res = __batch_get_nearest_ids(rows, bc_faiss_index, top_n)
            res.extend(batch_res)
            rows = list()
    if rows:
        batch_res = __batch_get_nearest_ids(rows, bc_faiss_index, top_n)
        res.extend(batch_res)
    return res

從這里可以清楚的看到就是組batch,組夠一個batch后就可以給當前這個batch內的query進行計算最相似的候選啦即__batch_get_nearest_ids這個核心函數:

def __batch_get_nearest_ids(rows, bc_faiss_index, top_n):
    import mkl
    mkl.get_max_threads()
    import faiss
    embs = [str_to_vec(row[3]) for row in rows]
    vec = np.array(embs, np.float32)
    #faiss.normalize_L2(vec)
    similarities, dst_ids = bc_faiss_index.value.batch_search(vec, top_n)
    batch_res = list()
    for i in range(len(rows)):
        batch_res.append([str("\\t".join([rows[i][1], rows[i][2]])), "$$$".join(["\\t".join(dst.split("\\t")+[str(round(sim, 2))]) for dst, sim in zip(dst_ids[i], similarities[i])])])
    return batch_res

這里就是真真的調用faiss的索引API進行召回啦,當然了batch_res這個就是結果,自己可以想怎么定義都行,筆者這里不僅返回了召回的item,還返回了query自身的一些信息。

  • 注意點

在map的時候,不論是self的類成員還是類方法都要放到外面,不要放到類里面,不然會報錯

總結

總之,在用spark做任何需求之前,一定要牢記能map就map,盡量不要聚合算子,實在不行就盡可能放到最后。

以上就是Spark處理trick總結分析的詳細內容,更多關于Spark處理trick的資料請關注腳本之家其它相關文章!

相關文章

最新評論