Java和scala實現(xiàn) Spark RDD轉(zhuǎn)換成DataFrame的兩種方法小結(jié)
一:準備數(shù)據(jù)源
在項目下新建一個student.txt文件,里面的內(nèi)容為:
1,zhangsan,20 2,lisi,21 3,wanger,19 4,fangliu,18
二:實現(xiàn)
Java版:
1.首先新建一個student的Bean對象,實現(xiàn)序列化和toString()方法,具體代碼如下:
package com.cxd.sql; import java.io.Serializable; @SuppressWarnings("serial") public class Student implements Serializable { String sid; String sname; int sage; public String getSid() { return sid; } public void setSid(String sid) { this.sid = sid; } public String getSname() { return sname; } public void setSname(String sname) { this.sname = sname; } public int getSage() { return sage; } public void setSage(int sage) { this.sage = sage; } @Override public String toString() { return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]"; } }
2.轉(zhuǎn)換,具體代碼如下
package com.cxd.sql; import java.util.ArrayList; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class TxtToParquetDemo { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local"); SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); reflectTransform(spark);//Java反射 dynamicTransform(spark);//動態(tài)轉(zhuǎn)換 } /** * 通過Java反射轉(zhuǎn)換 * @param spark */ private static void reflectTransform(SparkSession spark) { JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD(); JavaRDD<Student> rowRDD = source.map(line -> { String parts[] = line.split(","); Student stu = new Student(); stu.setSid(parts[0]); stu.setSname(parts[1]); stu.setSage(Integer.valueOf(parts[2])); return stu; }); Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class); df.select("sid", "sname", "sage"). coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res"); } /** * 動態(tài)轉(zhuǎn)換 * @param spark */ private static void dynamicTransform(SparkSession spark) { JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD(); JavaRDD<Row> rowRDD = source.map( line -> { String[] parts = line.split(","); String sid = parts[0]; String sname = parts[1]; int sage = Integer.parseInt(parts[2]); return RowFactory.create( sid, sname, sage ); }); ArrayList<StructField> fields = new ArrayList<StructField>(); StructField field = null; field = DataTypes.createStructField("sid", DataTypes.StringType, true); fields.add(field); field = DataTypes.createStructField("sname", DataTypes.StringType, true); fields.add(field); field = DataTypes.createStructField("sage", DataTypes.IntegerType, true); fields.add(field); StructType schema = DataTypes.createStructType(fields); Dataset<Row> df = spark.createDataFrame(rowRDD, schema); df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1"); } }
scala版本:
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StructType import org.apache.spark.sql.Row import org.apache.spark.sql.types.IntegerType object RDD2Dataset { case class Student(id:Int,name:String,age:Int) def main(args:Array[String]) { val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate() import spark.implicits._ reflectCreate(spark) dynamicCreate(spark) } /** * 通過Java反射轉(zhuǎn)換 * @param spark */ private def reflectCreate(spark:SparkSession):Unit={ import spark.implicits._ val stuRDD=spark.sparkContext.textFile("student2.txt") //toDF()為隱式轉(zhuǎn)換 val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF() //stuDf.select("id","name","age").write.text("result") //對寫入文件指定列名 stuDf.printSchema() stuDf.createOrReplaceTempView("student") val nameDf=spark.sql("select name from student where age<20") //nameDf.write.text("result") //將查詢結(jié)果寫入一個文件 nameDf.show() } /** * 動態(tài)轉(zhuǎn)換 * @param spark */ private def dynamicCreate(spark:SparkSession):Unit={ val stuRDD=spark.sparkContext.textFile("student.txt") import spark.implicits._ val schemaString="id,name,age" val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema=StructType(fields) val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2))) val stuDf=spark.createDataFrame(rowRDD, schema) stuDf.printSchema() val tmpView=stuDf.createOrReplaceTempView("student") val nameDf=spark.sql("select name from student where age<20") //nameDf.write.text("result") //將查詢結(jié)果寫入一個文件 nameDf.show() } }
注:
1.上面代碼全都已經(jīng)測試通過,測試的環(huán)境為spark2.1.0,jdk1.8。
2.此代碼不適用于spark2.0以前的版本。
以上這篇Java和scala實現(xiàn) Spark RDD轉(zhuǎn)換成DataFrame的兩種方法小結(jié)就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot如何基于POI-tl和word模板導(dǎo)出龐大的Word文件
這篇文章主要介紹了SpringBoot如何基于POI-tl和word模板導(dǎo)出龐大的Word文件,poi-tl是一個基于Apache?POI的Word模板引擎,也是一個免費開源的Java類庫2022-08-08Java實戰(zhàn)項目練習之球館在線預(yù)約系統(tǒng)的實現(xiàn)
理論是遠遠不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SpringBoot+maven+freemark+Mysql實現(xiàn)一個球館在線預(yù)約系統(tǒng),大家可以在過程中查缺補漏,提升水平2022-01-01注解@TableName,@TableField,pgsql的模式對應(yīng)方式
這篇文章主要介紹了注解@TableName,@TableField,pgsql的模式對應(yīng)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-04-04Java關(guān)鍵字finally_動力節(jié)點Java學院整理
java關(guān)鍵字finally不管是否出現(xiàn)異常,finally子句總是在塊完成之前執(zhí)行。下面通過實現(xiàn)代碼給大家介紹Java關(guān)鍵字finally相關(guān)知識,需要的的朋友參考下吧2017-04-04SpringCloud Zuul服務(wù)功能與使用方法解析
這篇文章主要介紹了SpringCloud Zuul服務(wù)功能與使用方法解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-05-05springboot @RequiredArgsConstructor的概念與使用方式
這篇文章主要介紹了springboot @RequiredArgsConstructor的概念與使用方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-09-09