使用Spark?SQL實現(xiàn)讀取不帶表頭的txt文件
spark SQL讀取不帶表頭的txt文件時,如果不傳入schema信息,則會自動給列命名_c0
、_c1
等。而且也無法通過調用df.as()
方法轉換成dataset對象(甚至因為樣例類的屬性名稱與df的列名不一致而拋出異常)。
這時候可以通過下面的方式添加schema
// 定義schema List<StructField> fields = new ArrayList<>(); fields.add(DataTypes.createStructField("word", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); Dataset<Row> dataFrame = sparkSession.createDataFrame(RowRDD, schema);// rdd -> dataframe
但是如果已經是dataframe對象則無法更新schema。 所以我們需要在加載文件的時候通過調用schema()
方法傳入構造好的StructType
對象以創(chuàng)建dataframe。 例如:
// 定義schema List<StructField> fields = new ArrayList<>(); fields.add(DataTypes.createStructField("word", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("cnt", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); Dataset<Row> citydf = reader.format("text") .option("delimiter", "\t") .option("header", true) .schema(schema) .csv("D:\project\sparkDemo\inputs\city_info.txt");
那么這時候就有問題了,如果需要加載的文件很多,全都要手動創(chuàng)建列表逐個添加字段會非常麻煩。
那么可以封裝StructType
對象的實例化方法,傳入目標字段名稱以及數(shù)據(jù)類型。 字段名稱以及數(shù)據(jù)類型可以通過樣例類獲取。
StructType對象的實例化方法
package src.main.utils; import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import javax.activation.UnsupportedDataTypeException; public class SchemaMaker { private LinkedHashMap<String, String> schemaMap = new LinkedHashMap<>(); final private List<String> valueTypes = Stream.of("string", "integer", "double", "long").collect(Collectors.toList()); List<StructField> fields = new ArrayList<>(); public SchemaMaker(){ this.fields.clear(); } public SchemaMaker(ArrayList<ArrayList<String>> dataList){ this.fields.clear(); for (ArrayList<String> data : dataList) { int size = data.size(); if (size != 2){ throw new RuntimeException("每個數(shù)據(jù)必須為2個參數(shù),第一個為字段名,第二個為字段類型"); } String fieldName = data.get(0); String fieldType = getLowCase(data.get(1)); if (checkType(fieldType)){ this.schemaMap.put(fieldName, fieldType); }else { throw new RuntimeException("數(shù)據(jù)類型不符合預期" + this.valueTypes.toString()); } } } public void add(String fieldName, String fieldType){ String fieldtype = getLowCase(fieldType); if (checkType(fieldtype)){ this.schemaMap.put(fieldName, fieldtype); }else { throw new RuntimeException("數(shù)據(jù)類型不符合預期" + this.valueTypes.toString()); } } private String getLowCase(String s){ return s.toLowerCase(); } private boolean checkType(String typeValue){ return this.valueTypes.contains(typeValue); } private DataType getDataType (String typeValue) throws UnsupportedDataTypeException { if (typeValue.equals("string")){ return DataTypes.StringType; } else if (typeValue.equals("integer")) { return DataTypes.IntegerType; } else if (typeValue.equals("long")) { return DataTypes.LongType; } else if (typeValue.equals("double")) { return DataTypes.DoubleType; }else { throw new UnsupportedDataTypeException(typeValue); } } public StructType getStructType() throws UnsupportedDataTypeException { for (Map.Entry<String, String> schemaValue : schemaMap.entrySet()) { String fieldName = schemaValue.getKey(); String fieldType = schemaValue.getValue(); DataType fieldDataType = getDataType(fieldType); this.fields.add(DataTypes.createStructField(fieldName, fieldDataType, true)); } return DataTypes.createStructType(this.fields); } }
封裝一層,通過傳入的Object.class().getDeclaredFields()
方法獲取的字段信息構造StructType
public static StructType getStructType(Field[] fields) throws UnsupportedDataTypeException { ArrayList<ArrayList<String>> lists = new ArrayList<>(); for (Field field : fields) { String name = field.getName(); AnnotatedType annotatedType = field.getAnnotatedType(); String[] typeSplit = annotatedType.getType().getTypeName().split("\."); String type = typeSplit[typeSplit.length - 1]; ArrayList<String> tmpList = new ArrayList<String>(); tmpList.add(name); tmpList.add(type); lists.add(tmpList); } SchemaMaker schemaMaker = new SchemaMaker(lists); return schemaMaker.getStructType(); }
樣例類的定義
public static class City implements Serializable{ private Long cityid; private String cityname; private String area; public City(Long cityid, String cityname, String area) { this.cityid = cityid; this.cityname = cityname; this.area = area; } public Long getCityid() { return cityid; } public void setCityid(Long cityid) { this.cityid = cityid; } public String getCityname() { return cityname; } public void setCityname(String cityname) { this.cityname = cityname; } public String getArea() { return area; } public void setArea(String area) { this.area = area; } } public static class Product implements Serializable{ private Long productid; private String product; private String product_from; public Long getProductid() { return productid; } public void setProductid(Long productid) { this.productid = productid; } public String getProduct() { return product; } public void setProduct(String product) { this.product = product; } public String getProduct_from() { return product_from; } public void setProduct_from(String product_from) { this.product_from = product_from; } public Product(Long productid, String product, String product_from) { this.productid = productid; this.product = product; this.product_from = product_from; } } public static class UserVisitAction implements Serializable{ private String date; private Long user_id; private String session_id; private Long page_id; private String action_time; private String search_keyword; private Long click_category_id; private Long click_product_id; private String order_category_ids; private String order_product_ids; private String pay_category_ids; private String pay_product_ids; private Long city_id; public String getDate() { return date; } public void setDate(String date) { this.date = date; } public Long getUser_id() { return user_id; } public void setUser_id(Long user_id) { this.user_id = user_id; } public String getSession_id() { return session_id; } public void setSession_id(String session_id) { this.session_id = session_id; } public Long getPage_id() { return page_id; } public void setPage_id(Long page_id) { this.page_id = page_id; } public String getAction_time() { return action_time; } public void setAction_time(String action_time) { this.action_time = action_time; } public String getSearch_keyword() { return search_keyword; } public void setSearch_keyword(String search_keyword) { this.search_keyword = search_keyword; } public Long getClick_category_id() { return click_category_id; } public void setClick_category_id(Long click_category_id) { this.click_category_id = click_category_id; } public Long getClick_product_id() { return click_product_id; } public void setClick_product_id(Long click_product_id) { this.click_product_id = click_product_id; } public String getOrder_category_ids() { return order_category_ids; } public void setOrder_category_ids(String order_category_ids) { this.order_category_ids = order_category_ids; } public String getOrder_product_ids() { return order_product_ids; } public void setOrder_product_ids(String order_product_ids) { this.order_product_ids = order_product_ids; } public String getPay_category_ids() { return pay_category_ids; } public void setPay_category_ids(String pay_category_ids) { this.pay_category_ids = pay_category_ids; } public String getPay_product_ids() { return pay_product_ids; } public void setPay_product_ids(String pay_product_ids) { this.pay_product_ids = pay_product_ids; } public Long getCity_id() { return city_id; } public void setCity_id(Long city_id) { this.city_id = city_id; } public UserVisitAction(String date, Long user_id, String session_id, Long page_id, String action_time, String search_keyword, Long click_category_id, Long click_product_id, String order_category_ids, String order_product_ids, String pay_category_ids, String pay_product_ids, Long city_id) { this.date = date; this.user_id = user_id; this.session_id = session_id; this.page_id = page_id; this.action_time = action_time; this.search_keyword = search_keyword; this.click_category_id = click_category_id; this.click_product_id = click_product_id; this.order_category_ids = order_category_ids; this.order_product_ids = order_product_ids; this.pay_category_ids = pay_category_ids; this.pay_product_ids = pay_product_ids; this.city_id = city_id; } }
主程序部分
DataFrameReader reader = sparkSession.read(); StructType citySchema = getStructType(City.class.getDeclaredFields()); StructType productSchema = getStructType(Product.class.getDeclaredFields()); StructType actionSchema = getStructType(UserVisitAction.class.getDeclaredFields()); Dataset<Row> citydf = reader.format("text") .option("delimiter", "\t") .option("header", true) .schema(citySchema) .csv("D:\project\sparkDemo\inputs\city_info.txt"); Dataset<Row> productdf = reader.format("text") .option("delimiter", "\t") .option("header", true) .schema(productSchema) .csv("D:\project\sparkDemo\inputs\product_info.txt"); Dataset<Row> actiondf = reader.format("text") .option("delimiter", "\t") .option("header", true) .schema(actionSchema) .csv("D:\project\sparkDemo\inputs\user_visit_action.txt"); Dataset<City> cityDataset = citydf.as(Encoders.bean(City.class)); // 轉換為ds對象 // cityDataset.show(); citydf.write().format("jdbc").option("url", "jdbc:mysql://172.20.143.219:3306/test") .option("driver", "com.mysql.cj.jdbc.Driver").option("user", "root") .option("password", "mysql").option("dbtable", "city_info").mode("overwrite").save(); productdf.write().format("jdbc").option("url", "jdbc:mysql://172.20.143.219:3306/test") .option("driver", "com.mysql.cj.jdbc.Driver").option("user", "root") .option("password", "mysql").option("dbtable", "product_info").mode("overwrite").save(); actiondf.write().format("jdbc").option("url", "jdbc:mysql://172.20.143.219:3306/test") .option("driver", "com.mysql.cj.jdbc.Driver").option("user", "root") .option("password", "mysql").option("dbtable", "user_visit_action").mode("overwrite").save();
通過這個方法自定義了樣例類之后可以進行批量讀取與處理txt文件了。
PS:在缺乏文件信息的時候不要貿然加載文件,否則可能會造成嚴重的后果。
以上就是使用Spark SQL實現(xiàn)讀取不帶表頭的txt文件的詳細內容,更多關于Spark SQL讀取txt的資料請關注腳本之家其它相關文章!
相關文章
java實現(xiàn)ReadWriteLock讀寫鎖的示例
ReadWriteLock是Java并發(fā)包中的接口,定義了讀鎖和寫鎖,讀鎖允許多線程同時訪問共享資源,而寫鎖則要求獨占,這種機制適用于讀多寫少的場景,可以提高并發(fā)效率同時保證數(shù)據(jù)一致性,本文就來詳細的介紹一下如何實現(xiàn),感興趣的可以了解一下2024-09-09MyBatis Plus整合Redis實現(xiàn)分布式二級緩存的問題
Mybatis內置的二級緩存在分布式環(huán)境下存在分布式問題,無法使用,但是我們可以整合Redis來實現(xiàn)分布式的二級緩存,這篇文章給大家介紹MyBatis Plus整合Redis實現(xiàn)分布式二級緩存,感興趣的朋友跟隨小編一起看看吧2023-11-11MyBatis-Plus?ORM數(shù)據(jù)庫和實體類映射方式
本文詳細介紹了MyBatis-Plus(MP)在數(shù)據(jù)庫和Java對象映射方面的功能,包括基本映射、主鍵生成策略、復雜映射(如嵌套對象和集合類型)以及自定義SQL的使用,MP通過豐富的注解和XML配置,簡化了數(shù)據(jù)庫操作,提高了開發(fā)效率2025-01-01springboot配置文件屬性變量引用方式${}和@@用法及區(qū)別說明
這篇文章主要介紹了springboot配置文件屬性變量引用方式${}和@@用法及區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03JdbcTemplate操作數(shù)據(jù)庫的具體方法
這篇文章主要介紹了JdbcTemplate操作數(shù)據(jù)庫的具體操作方法,準備工作需要大家先導入相關的jar包,建個數(shù)據(jù)庫,具體操作方法跟隨小編一起看看吧2022-03-03springboot中pom.xml文件注入test測試依賴時報錯的解決
這篇文章主要介紹了springboot中pom.xml文件注入test測試依賴時報錯的解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03SpringMVC @RequestMapping注解應用方法示例講解
通過@RequestMapping注解可以定義不同的處理器映射規(guī)則,下面這篇文章主要給大家介紹了關于SpringMVC中@RequestMapping注解用法的相關資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2022-09-09