Java API如何實現(xiàn)向Hive批量導(dǎo)入數(shù)據(jù)
Java API實現(xiàn)向Hive批量導(dǎo)入數(shù)據(jù)
Java程序中產(chǎn)生的數(shù)據(jù),如果導(dǎo)入oracle或者mysql庫,可以通過jdbc連接insert批量操作完成,但是當前版本的hive并不支持批量insert操作,因為需要先將結(jié)果數(shù)據(jù)寫入hdfs文件,然后插入Hive表中。
package com.enn.idcard;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* <p>Description: </p>
* @author kangkaia
* @date 2017年12月26日 下午1:42:24
*/
public class HiveJdbc {
public static void main(String[] args) throws IOException {
List<List> argList = new ArrayList<List>();
List<String> arg = new ArrayList<String>();
arg.add("12345");
arg.add("m");
argList.add(arg);
arg = new ArrayList<String>();
arg.add("54321");
arg.add("f");
argList.add(arg);
// System.out.println(argList.toString());
String dst = "/test/kk.txt";
createFile(dst,argList);
loadData2Hive(dst);
}
/**
* 將數(shù)據(jù)插入hdfs中,用于load到hive表中,默認分隔符是"\001"
* @param dst
* @param contents
* @throws IOException
*/
public static void createFile(String dst , List<List> argList) throws IOException{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path dstPath = new Path(dst); //目標路徑
//打開一個輸出流
FSDataOutputStream outputStream = fs.create(dstPath);
StringBuffer sb = new StringBuffer();
for(List<String> arg:argList){
for(String value:arg){
sb.append(value).append("\001");
}
sb.deleteCharAt(sb.length() - 4);//去掉最后一個分隔符
sb.append("\n");
}
sb.deleteCharAt(sb.length() - 2);//去掉最后一個換行符
byte[] contents = sb.toString().getBytes();
outputStream.write(contents);
outputStream.close();
fs.close();
System.out.println("文件創(chuàng)建成功!");
}
/**
* 將HDFS文件load到hive表中
* @param dst
*/
public static void loadData2Hive(String dst) {
String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
String CONNECTION_URL = "jdbc:hive2://server-13:10000/default;auth=noSasl";
String username = "admin";
String password = "admin";
Connection con = null;
try {
Class.forName(JDBC_DRIVER);
con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);
Statement stmt = con.createStatement();
String sql = " load data inpath '"+dst+"' into table population.population_information ";
stmt.execute(sql);
System.out.println("loadData到Hive表成功!");
} catch (SQLException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}finally {
// 關(guān)閉rs、ps和con
if(con != null){
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
注意:
本例使用mvn搭建,conf配置文件放在src/main/resources目錄下。
Hive提供的默認文件存儲格式有textfile、sequencefile、rcfile等。用戶也可以通過實現(xiàn)接口來自定義輸入輸?shù)奈募袷健?/p>
在實際應(yīng)用中,textfile由于無壓縮,磁盤及解析的開銷都很大,一般很少使用。Sequencefile以鍵值對的形式存儲的二進制的格式,其支持針對記錄級別和塊級別的壓縮。rcfile是一種行列結(jié)合的存儲方式(text file和sequencefile都是行表[row table]),其保證同一條記錄在同一個hdfs塊中,塊以列式存儲。一般而言,對于OLTP而言,行表優(yōu)勢大于列表,對于OLAP而言,列表的優(yōu)勢大于行表,特別容易想到當做聚合操作時,列表的復(fù)雜度將會比行表小的多,雖然單獨rcfile的列運算不一定總是存在的,但是rcfile的高壓縮率確實減少文件大小,因此實際應(yīng)用中,rcfile總是成為不二的選擇,達觀數(shù)據(jù)平臺在選擇文件存儲格式時也大量選擇了rcfile方案。
通過hdfs導(dǎo)入hive的表默認是textfile格式的,因此可以改變存儲格式,具體方法是先創(chuàng)建sequencefile、rcfile等格式的空表,然后重新插入數(shù)據(jù)即可。
insert overwrite table seqfile_table select * from textfile_table; …… insert overwrite table rcfile_table select * from textfile_table;
java 批量插入hive中轉(zhuǎn)在HDFS
稍微修改了下,這文章是通過將數(shù)據(jù)存盤后,加載到HIVE.
模擬數(shù)據(jù)放到HDFS然后加載到HIVE,請大家記得添加HIVE JDBC依賴否則會報錯。
加載前的數(shù)據(jù)表最好用外部表,否則會drop表的時候元數(shù)據(jù)會一起刪除!
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.1.0</version> </dependency>
代碼
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class Demo {
public static void main(String[] args) throws Exception {
List<List> argList = new ArrayList<List>();
List<String> arg = new ArrayList<String>();
arg.add("12345");
arg.add("m");
argList.add(arg);
arg = new ArrayList<String>();
arg.add("54321");
arg.add("f");
argList.add(arg);
// System.out.println(argList.toString());
String dst = "/test/kk.txt";
createFile(dst,argList);
// loadData2Hive(dst);
}
/**
* 將數(shù)據(jù)插入hdfs中,用于load到hive表中,默認分隔符是"|"
* @param dst
* @param contents
* @throws IOException
* @throws Exception
* @throws InterruptedException
*/
public static void createFile(String dst , List<List> argList) throws IOException, InterruptedException, Exception{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"),conf,"root");
Path dstPath = new Path(dst); //目標路徑
//打開一個輸出流
FSDataOutputStream outputStream = fs.create(dstPath);
StringBuffer sb = new StringBuffer();
for(List<String> arg:argList){
for(String value:arg){
sb.append(value).append("|");
}
sb.deleteCharAt(sb.length() - 1);//去掉最后一個分隔符
sb.append("\n");
}
byte[] contents = sb.toString().getBytes();
outputStream.write(contents);
outputStream.flush();;
outputStream.close();
fs.close();
System.out.println("文件創(chuàng)建成功!");
}
/**
* 將HDFS文件load到hive表中
* @param dst
*/
public static void loadData2Hive(String dst) {
String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
String CONNECTION_URL = "jdbc:hive2://hadoop:10000/default";
String username = "root";
String password = "root";
Connection con = null;
try {
Class.forName(JDBC_DRIVER);
con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);
Statement stmt = con.createStatement();
String sql = " load data inpath '"+dst+"' into table test ";//test 為插入的表
stmt.execute(sql);
System.out.println("loadData到Hive表成功!");
} catch (SQLException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}finally {
// 關(guān)閉rs、ps和con
if(con != null){
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
java數(shù)據(jù)結(jié)構(gòu)基礎(chǔ):線性表
這篇文章主要介紹了Java的數(shù)據(jù)解構(gòu)基礎(chǔ),希望對廣大的程序愛好者有所幫助,同時祝大家有一個好成績,需要的朋友可以參考下,希望能給你帶來幫助2021-07-07
java?Long類型轉(zhuǎn)為json后數(shù)據(jù)損失精度的處理方式
這篇文章主要介紹了java?Long類型轉(zhuǎn)為json后數(shù)據(jù)損失精度的處理方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01
spring cloud如何修復(fù)zuul跨域配置異常的問題
最近的開發(fā)過程中,使用spring集成了spring-cloud-zuul,在配置zuul跨域的時候遇到了問題,下面這篇文章主要給大家介紹了關(guān)于spring cloud如何修復(fù)zuul跨域配置異常的問題,需要的朋友可以參考借鑒,下面來一起看看吧。2017-09-09
使用Spring AOP實現(xiàn)MySQL數(shù)據(jù)庫讀寫分離案例分析(附demo)
分布式環(huán)境下數(shù)據(jù)庫的讀寫分離策略是解決數(shù)據(jù)庫讀寫性能瓶頸的一個關(guān)鍵解決方案,這篇文章主要介紹了使用Spring AOP實現(xiàn)MySQL數(shù)據(jù)庫讀寫分離案例分析(附demo),有興趣的可以了解一下。2017-01-01
Netty分布式ByteBuf使用的底層實現(xiàn)方式源碼解析
這篇文章主要為大家介紹了Netty分布式ByteBuf使用底層實現(xiàn)方式源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-03-03
eclipse 如何創(chuàng)建 user library 方法詳解
這篇文章主要介紹了eclipse 如何創(chuàng)建 user library 方法詳解的相關(guān)資料,需要的朋友可以參考下2017-04-04

