DataFrame:通過(guò)SparkSql將scala類(lèi)轉(zhuǎn)為DataFrame的方法
更新時(shí)間:2019年01月29日 14:33:24 作者:silentwolfyh
今天小編就為大家分享一篇DataFrame:通過(guò)SparkSql將scala類(lèi)轉(zhuǎn)為DataFrame的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
如下所示:
import java.text.DecimalFormat
import com.alibaba.fastjson.JSON
import com.donews.data.AppConfig
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
/**
* Created by silentwolf on 2016/6/3.
*/
case class UserTag(SUUID: String,
MAN: Float,
WOMAN: Float,
AGE10_19: Float,
AGE20_29: Float,
AGE30_39: Float,
AGE40_49: Float,
AGE50_59: Float,
GAME: Float,
MOVIE: Float,
MUSIC: Float,
ART: Float,
POLITICS_NEWS: Float,
FINANCIAL: Float,
EDUCATION_TRAINING: Float,
HEALTH_CARE: Float,
TRAVEL: Float,
AUTOMOBILE: Float,
HOUSE_PROPERTY: Float,
CLOTHING_ACCESSORIES: Float,
BEAUTY: Float,
IT: Float,
BABY_PRODUCT: Float,
FOOD_SERVICE: Float,
HOME_FURNISHING: Float,
SPORTS: Float,
OUTDOOR_ACTIVITIES: Float,
MEDICINE: Float
)
object UserTagTable {
val LOG = LoggerFactory.getLogger(UserOverviewFirst.getClass)
val REP_HOME = s"${AppConfig.HDFS_MASTER}/${AppConfig.HDFS_REP}"
def main(args: Array[String]) {
var startTime = System.currentTimeMillis()
val conf: com.typesafe.config.Config = ConfigFactory.load()
val sc = new SparkContext()
val sqlContext = new SQLContext(sc)
var df1: DataFrame = null
if (args.length == 0) {
println("請(qǐng)輸入: appkey , StartTime : 2016-04-10 ,StartEnd :2016-04-11")
}
else {
var appkey = args(0)
var lastdate = args(1)
df1 = loadDataFrame(sqlContext, appkey, "2016-04-10", lastdate)
df1.registerTempTable("suuidTable")
sqlContext.udf.register("taginfo", (a: String) => userTagInfo(a))
sqlContext.udf.register("intToString", (b: Long) => intToString(b))
import sqlContext.implicits._
//***重點(diǎn)***:將臨時(shí)表中的suuid和自定函數(shù)中Json數(shù)據(jù),放入U(xiǎn)serTag中。
sqlContext.sql(" select distinct(suuid) AS suuid,taginfo(suuid) from suuidTable group by suuid").map { case Row(suuid: String, taginfo: String) =>
val taginfoObj = JSON.parseObject(taginfo)
UserTag(suuid.toString,
taginfoObj.getFloat("man"),
taginfoObj.getFloat("woman"),
taginfoObj.getFloat("age10_19"),
taginfoObj.getFloat("age20_29"),
taginfoObj.getFloat("age30_39"),
taginfoObj.getFloat("age40_49"),
taginfoObj.getFloat("age50_59"),
taginfoObj.getFloat("game"),
taginfoObj.getFloat("movie"),
taginfoObj.getFloat("music"),
taginfoObj.getFloat("art"),
taginfoObj.getFloat("politics_news"),
taginfoObj.getFloat("financial"),
taginfoObj.getFloat("education_training"),
taginfoObj.getFloat("health_care"),
taginfoObj.getFloat("travel"),
taginfoObj.getFloat("automobile"),
taginfoObj.getFloat("house_property"),
taginfoObj.getFloat("clothing_accessories"),
taginfoObj.getFloat("beauty"),
taginfoObj.getFloat("IT"),
taginfoObj.getFloat("baby_Product"),
taginfoObj.getFloat("food_service"),
taginfoObj.getFloat("home_furnishing"),
taginfoObj.getFloat("sports"),
taginfoObj.getFloat("outdoor_activities"),
taginfoObj.getFloat("medicine")
)}.toDF().registerTempTable("resultTable")
val resultDF = sqlContext.sql(s"select '$appkey' AS APPKEY, '$lastdate' AS DATE,SUUID ,MAN,WOMAN,AGE10_19,AGE20_29,AGE30_39 ," +
"AGE40_49 ,AGE50_59,GAME,MOVIE,MUSIC,ART,POLITICS_NEWS,FINANCIAL,EDUCATION_TRAINING,HEALTH_CARE,TRAVEL,AUTOMOBILE," +
"HOUSE_PROPERTY,CLOTHING_ACCESSORIES,BEAUTY,IT,BABY_PRODUCT ,FOOD_SERVICE ,HOME_FURNISHING ,SPORTS ,OUTDOOR_ACTIVITIES ," +
"MEDICINE from resultTable WHERE SUUID IS NOT NULL")
resultDF.write.mode(SaveMode.Overwrite).options(
Map("table" -> "USER_TAGS", "zkUrl" -> conf.getString("Hbase.url"))
).format("org.apache.phoenix.spark").save()
}
}
def intToString(suuid: Long): String = {
suuid.toString()
}
def userTagInfo(num1: String): String = {
var de = new DecimalFormat("0.00")
var mannum = de.format(math.random).toFloat
var man = mannum
var woman = de.format(1 - mannum).toFloat
var age10_19num = de.format(math.random * 0.2).toFloat
var age20_29num = de.format(math.random * 0.2).toFloat
var age30_39num = de.format(math.random * 0.2).toFloat
var age40_49num = de.format(math.random * 0.2).toFloat
var age10_19 = age10_19num
var age20_29 = age20_29num
var age30_39 = age30_39num
var age40_49 = age40_49num
var age50_59 = de.format(1 - age10_19num - age20_29num - age30_39num - age40_49num).toFloat
var game = de.format(math.random * 1).toFloat
var movie = de.format(math.random * 1).toFloat
var music = de.format(math.random * 1).toFloat
var art = de.format(math.random * 1).toFloat
var politics_news = de.format(math.random * 1).toFloat
var financial = de.format(math.random * 1).toFloat
var education_training = de.format(math.random * 1).toFloat
var health_care = de.format(math.random * 1).toFloat
var travel = de.format(math.random * 1).toFloat
var automobile = de.format(math.random * 1).toFloat
var house_property = de.format(math.random * 1).toFloat
var clothing_accessories = de.format(math.random * 1).toFloat
var beauty = de.format(math.random * 1).toFloat
var IT = de.format(math.random * 1).toFloat
var baby_Product = de.format(math.random * 1).toFloat
var food_service = de.format(math.random * 1).toFloat
var home_furnishing = de.format(math.random * 1).toFloat
var sports = de.format(math.random * 1).toFloat
var outdoor_activities = de.format(math.random * 1).toFloat
var medicine = de.format(math.random * 1).toFloat
"{" + "\"man\"" + ":" + man + "," + "\"woman\"" + ":" + woman + "," + "\"age10_19\"" + ":" + age10_19 + "," + "\"age20_29\"" + ":" + age20_29 + "," +
"\"age30_39\"" + ":" + age30_39 + "," + "\"age40_49\"" + ":" + age40_49 + "," + "\"age50_59\"" + ":" + age50_59 + "," + "\"game\"" + ":" + game + "," +
"\"movie\"" + ":" + movie + "," + "\"music\"" + ":" + music + "," + "\"art\"" + ":" + art + "," + "\"politics_news\"" + ":" + politics_news + "," +
"\"financial\"" + ":" + financial + "," + "\"education_training\"" + ":" + education_training + "," + "\"health_care\"" + ":" + health_care + "," +
"\"travel\"" + ":" + travel + "," + "\"automobile\"" + ":" + automobile + "," + "\"house_property\"" + ":" + house_property + "," + "\"clothing_accessories\"" + ":" + clothing_accessories + "," +
"\"beauty\"" + ":" + beauty + "," + "\"IT\"" + ":" + IT + "," + "\"baby_Product\"" + ":" + baby_Product + "," + "\"food_service\"" + ":" + food_service + "," +
"\"home_furnishing\"" + ":" + home_furnishing + "," + "\"sports\"" + ":" + sports + "," + "\"outdoor_activities\"" + ":" + outdoor_activities + "," + "\"medicine\"" + ":" + medicine +
"}";
}
def loadDataFrame(ctx: SQLContext, appkey: String, startDay: String, endDay: String): DataFrame = {
val path = s"$REP_HOME/appstatistic"
ctx.read.parquet(path)
.filter(s"timestamp is not null and appkey='$appkey' and day>='$startDay' and day<='$endDay'")
}
}
以上這篇DataFrame:通過(guò)SparkSql將scala類(lèi)轉(zhuǎn)為DataFrame的方法就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
實(shí)例分析python3實(shí)現(xiàn)并發(fā)訪(fǎng)問(wèn)水平切分表
在本文中小編給大家整理了關(guān)于python3實(shí)現(xiàn)并發(fā)訪(fǎng)問(wèn)水平切分表的相關(guān)知識(shí)點(diǎn)以及實(shí)例代碼,有興趣的朋友們參考下。2018-09-09
從零學(xué)python系列之新版本導(dǎo)入httplib模塊報(bào)ImportError解決方案
在使用新版python打開(kāi)舊版本代碼的時(shí)候,可能會(huì)有些報(bào)錯(cuò)或者不兼容的情況出現(xiàn),今天我們就來(lái)分析其中的一種情況2014-05-05
Python PyYAML庫(kù)解析YAML文件使用詳解
這篇文章主要為大家介紹了Python PyYAML庫(kù)解析YAML文件使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11
利用python腳本提取Abaqus場(chǎng)輸出數(shù)據(jù)的代碼
這篇文章主要介紹了利用python腳本提取Abaqus場(chǎng)輸出數(shù)據(jù),利用python腳本對(duì)Abaqus進(jìn)行數(shù)據(jù)提取時(shí),要對(duì)python腳本做前步的導(dǎo)入處理,本文通過(guò)實(shí)例代碼詳細(xì)講解需要的朋友可以參考下2022-11-11
python對(duì)ip地址進(jìn)行排序、分類(lèi)的方法詳解
這篇文章主要介紹了python對(duì)ip地址進(jìn)行排序、分類(lèi)的方法詳解,IP協(xié)議全稱(chēng)為“網(wǎng)際互連協(xié)議Internet?Protocol”,IP協(xié)議是TCP/IP體系中的網(wǎng)絡(luò)層協(xié)議,需要的朋友可以參考下2023-07-07

