SpringBoot操作spark處理hdfs文件的操作方法
SpringBoot操作spark處理hdfs文件

1、導(dǎo)入依賴
<!-- spark依賴-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>3.2.2</version>
</dependency>2、配置spark信息
建立一個配置文件,配置spark信息
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//將文件交于spring管理
@Configuration
public class SparkConfig {
//使用yml中的配置
@Value("${spark.master}")
private String sparkMaster;
@Value("${spark.appName}")
private String sparkAppName;
@Value("${hdfs.user}")
private String hdfsUser;
@Value("${hdfs.path}")
private String hdfsPath;
@Bean
public SparkConf sparkConf() {
SparkConf conf = new SparkConf();
conf.setMaster(sparkMaster);
conf.setAppName(sparkAppName);
// 添加HDFS配置
conf.set("fs.defaultFS", hdfsPath);
conf.set("spark.hadoop.hdfs.user",hdfsUser);
return conf;
}
@Bean
public SparkSession sparkSession() {
return SparkSession.builder()
.config(sparkConf())
.getOrCreate();
}
}3、controller和service
controller類
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import xyz.zzj.traffic_main_code.service.SparkService;
@RestController
@RequestMapping("/spark")
public class SparkController {
@Autowired
private SparkService sparkService;
@GetMapping("/run")
public String runSparkJob() {
//讀取Hadoop HDFS文件
String filePath = "hdfs://192.168.44.128:9000/subwayData.csv";
sparkService.executeHadoopSparkJob(filePath);
return "Spark job executed successfully!";
}
}處理地鐵數(shù)據(jù)的service
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import xyz.zzj.traffic_main_code.service.SparkReadHdfs;
import java.io.IOException;
import java.net.URI;
import static org.apache.spark.sql.functions.*;
@Service
public class SparkReadHdfsImpl implements SparkReadHdfs {
private final SparkSession spark;
@Value("${hdfs.user}")
private String hdfsUser;
@Value("${hdfs.path}")
private String hdfsPath;
@Autowired
public SparkReadHdfsImpl(SparkSession spark) {
this.spark = spark;
}
/**
* 讀取HDFS上的CSV文件并上傳到HDFS
* @param filePath
*/
@Override
public void sparkSubway(String filePath) {
try {
// 設(shè)置Hadoop配置
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Configuration hadoopConf = jsc.hadoopConfiguration();
hadoopConf.set("fs.defaultFS", hdfsPath);
hadoopConf.set("hadoop.user.name", hdfsUser);
// 讀取HDFS上的文件
Dataset<Row> df = spark.read()
.option("header", "true") // 指定第一行是列名
.option("inferSchema", "true") // 自動推斷列的數(shù)據(jù)類型
.csv(filePath);
// 顯示DataFrame的所有數(shù)據(jù)
// df.show(Integer.MAX_VALUE, false);
// 對DataFrame進(jìn)行清洗和轉(zhuǎn)換操作
// 檢查缺失值
df.select("number", "people", "dateTime").na().drop().show();
// 對數(shù)據(jù)進(jìn)行類型轉(zhuǎn)換
Dataset<Row> df2 = df.select(
col("number").cast(DataTypes.IntegerType),
col("people").cast(DataTypes.IntegerType),
to_date(col("dateTime"), "yyyy年MM月dd日").alias("dateTime")
);
// 去重
Dataset<Row> df3 = df2.dropDuplicates();
// 數(shù)據(jù)過濾,確保people列沒有負(fù)數(shù)
Dataset<Row> df4 = df3.filter(col("people").geq(0));
// df4.show();
// 數(shù)據(jù)聚合,按dateTime分組,統(tǒng)計每天的總客流量
Dataset<Row> df6 = df4.groupBy("dateTime").agg(sum("people").alias("total_people"));
// df6.show();
sparkForSubway(df6,"/time_subwayData.csv");
//數(shù)據(jù)聚合,獲取每天人數(shù)最多的地鐵number
Dataset<Row> df7 = df4.groupBy("dateTime").agg(max("people").alias("max_people"));
sparkForSubway(df7,"/everyday_max_subwayData.csv");
//數(shù)據(jù)聚合,計算每天的客流強(qiáng)度:每天總people除以632840
Dataset<Row> df8 = df4.groupBy("dateTime").agg(sum("people").divide(632.84).alias("strength"));
sparkForSubway(df8,"/everyday_strength_subwayData.csv");
} catch (Exception e) {
e.printStackTrace();
}
}
private static void sparkForSubway(Dataset<Row> df6, String hdfsPath) throws IOException {
// 保存處理后的數(shù)據(jù)到HDFS
df6.coalesce(1)
.write().mode("overwrite")
.option("header", "true")
.csv("hdfs://192.168.44.128:9000/time_subwayData");
// 創(chuàng)建Hadoop配置
Configuration conf = new Configuration();
// 獲取FileSystem實例
FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.44.128:9000"), conf);
// 定義臨時目錄和目標(biāo)文件路徑
Path tempDir = new Path("/time_subwayData");
FileStatus[] files = fs.listStatus(tempDir);
// 檢查目標(biāo)文件是否存在,如果存在則刪除
Path targetFile1 = new Path(hdfsPath);
if (fs.exists(targetFile1)) {
fs.delete(targetFile1, true); // true 表示遞歸刪除
}
for (FileStatus file : files) {
if (file.isFile() && file.getPath().getName().startsWith("part-")) {
Path targetFile = new Path(hdfsPath);
fs.rename(file.getPath(), targetFile);
}
}
// 刪除臨時目錄
fs.delete(tempDir, true);
}
}4、運(yùn)行
- 項目運(yùn)行完后,打開瀏覽器
- spark處理地鐵數(shù)據(jù)
- http://localhost:8686/spark/dispose
- 觀察spark和hdfs
- http://192.168.44.128:8099/
- http://192.168.44.128:9870/explorer.html#/

到此這篇關(guān)于SpringBoot操作spark處理hdfs文件的文章就介紹到這了,更多相關(guān)SpringBoot spark處理hdfs文件內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
如何解決創(chuàng)建maven工程時,產(chǎn)生“找不到插件的錯誤”問題
這篇文章主要介紹了如何解決創(chuàng)建maven工程時,產(chǎn)生“找不到插件的錯誤”問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12
SpringCloud Alibaba Nacos 整合SpringBoot A
這篇文章主要介紹了SpringCloud Alibaba Nacos 整合SpringBoot Admin實戰(zhàn),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
jackson json序列化實現(xiàn)首字母大寫,第二個字母需小寫
這篇文章主要介紹了jackson json序列化實現(xiàn)首字母大寫,第二個字母需小寫方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06
Spring MVC中使用Google kaptcha驗證碼的方法詳解
kaptcha 是一個非常實用的驗證碼生成工具。有了它,你可以生成各種樣式的驗證碼,因為它是可配置的,下面這篇文章主要給大家介紹了關(guān)于Spring MVC中使用Google kaptcha驗證碼的方法,需要的朋友可以參考借鑒,下面來一起看看吧。2017-10-10
Java中的List接口實現(xiàn)類LinkList和ArrayList詳解
這篇文章主要介紹了Java中的List接口實現(xiàn)類LinkList和ArrayList詳解,List接口繼承自Collection接口,是單列集合的一個重要分支,實現(xiàn)了List接口的對象稱為List集合,在List集合中允許出現(xiàn)重復(fù)的元素,所有的元素是以一種線性方式進(jìn)行存儲的,需要的朋友可以參考下2024-01-01
SpringBoot整合Dubbo+Zookeeper實現(xiàn)RPC調(diào)用
這篇文章主要給大家介紹了Spring Boot整合Dubbo+Zookeeper實現(xiàn)RPC調(diào)用的步驟詳解,文中有詳細(xì)的代碼示例,對我們的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2023-07-07
解決springboot讀取application.properties中文亂碼問題
初用properties,讀取java properties文件的時候如果value是中文,會出現(xiàn)亂碼的問題,所以本文小編將給大家介紹如何解決springboot讀取application.properties中文亂碼問題,需要的朋友可以參考下2023-11-11

