java kafka寫(xiě)入數(shù)據(jù)到HDFS問(wèn)題
java kafka寫(xiě)入數(shù)據(jù)到HDFS
安裝kafka,見(jiàn)我以前的文章
http://chabaoo.cn/server/2968144y7.htm
向Hdfs寫(xiě)入文件,控制臺(tái)會(huì)輸出以下錯(cuò)誤信息:
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=s00356746, access=WRITE, inode="/user":root:supergroup:drwxr-xr-x
從中很容易看出是因?yàn)楫?dāng)前執(zhí)行Spark Application的用戶(hù)沒(méi)有Hdfs“/user”目錄的寫(xiě)入權(quán)限。這個(gè)問(wèn)題無(wú)論是在Windows下還是Linux下提交Spark Application都經(jīng)常會(huì)遇到
如果是歐拉操作系統(tǒng)
需做如下處理
chattr -i etc/passwd chattr -i /etc/shadow chattr -i /etc/group chattr -i /etc/passwd- chattr -i /etc/shadow- chattr -i /etc/group- lsattr passwd* 都需要沒(méi)有 i 屬性
如果是Linux環(huán)境
將執(zhí)行操作的用戶(hù)添加到supergroup用戶(hù)組。
groupadd supergroup usermod -a -G supergroup s00356746
如果是Windows用戶(hù)
在hdfs namenode所在機(jī)器添加新用戶(hù),用戶(hù)名為執(zhí)行操作的Windows用戶(hù)名,然后將此用戶(hù)添加到supergroup用戶(hù)組。
adduser s00356746 groupadd supergroup usermod -a -G supergroup s00356746
這樣,以后每次執(zhí)行類(lèi)似操作可以將文件寫(xiě)入Hdfs中屬于s00356746用戶(hù)的目錄內(nèi),而不會(huì)出現(xiàn)上面的Exception。
生產(chǎn)者代碼
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaProducer {
private final Producer<String, String> producer;
public final static String TOPIC = "test";
private KafkaProducer(){
Properties props = new Properties();
//此處配置的是kafka的端口
props.put("metadata.broker.list", "10.175.118.105:9092");
//配置value的序列化類(lèi)
props.put("serializer.class", "kafka.serializer.StringEncoder");
//配置key的序列化類(lèi)
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks","-1");
producer = new Producer<String, String>(new ProducerConfig(props));
}
void produce() {
int messageNo = 1000;
final int COUNT = 10000;
while (messageNo < COUNT) {
String key = String.valueOf(messageNo);
String data = "hello kafka message " + key;
producer.send(new KeyedMessage<String, String>(TOPIC, key ,data));
System.out.println(data);
messageNo ++;
}
}
public static void main( String[] args )
{
new KafkaProducer().produce();
}
}kafka寫(xiě)入Hdfs
package com.huawei.hwclouds.dbs.ops.huatuo.diagnosis.service.impl;
import com.huawei.hwclouds.dbs.common.exception.DBSErrorCode;
import com.huawei.hwclouds.dbs.common.exception.DBSException;
import com.huawei.hwclouds.dbs.constants.VolumeIoType;
import com.huawei.hwclouds.dbs.coremodel.model.dto.DBSInstanceDto;
import com.huawei.hwclouds.dbs.coremodel.model.dto.DBSNodeDto;
import com.huawei.hwclouds.dbs.coremodel.resource.dto.DBSResourceSpecDto;
import com.huawei.hwclouds.dbs.coremodel.resource.dto.DBSVolumeDto;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
public class KafkaToHdfs extends Thread {
private static String kafkaHost = null;
private static String kafkaGroup = null;
private static String kafkaTopic = null;
private static String hdfsUri = null;
private static String hdfsDir = null;
private static String hadoopUser = null;
private static Boolean isDebug = false;
private ConsumerConnector consumer = null;
private static Configuration hdfsConf = null;
private static FileSystem hadoopFS = null;
public static void main(String[] args) {
// if (args.length < 6) {
// useage();
// System.exit(0);
// }
// Map<String, String> user = new HashMap<String, String>();
// user = System.getenv();
// user.put("HADOOP_USER_NAME","hadoop");
// if (user.get("HADOOP_USER_NAME") == null) {
// System.out.println("請(qǐng)?jiān)O(shè)定hadoop的啟動(dòng)的用戶(hù)名,環(huán)境變量名稱(chēng):HADOOP_USER_NAME,對(duì)應(yīng)的值是hadoop的啟動(dòng)的用戶(hù)名");
// System.exit(0);
// } else {
// hadoopUser = user.get("HADOOP_USER_NAME");
// }
hadoopUser = "hadoop";
init(args);
System.out.println("開(kāi)始啟動(dòng)服務(wù)...");
hdfsConf = new Configuration();
try {
hdfsConf.set("fs.defaultFS", hdfsUri);
hdfsConf.set("dfs.support.append", "true");
hdfsConf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
hdfsConf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
} catch (Exception e) {
System.out.println(e);
}
//創(chuàng)建好相應(yīng)的目錄
try {
hadoopFS = FileSystem.get(hdfsConf);
//如果hdfs的對(duì)應(yīng)的目錄不存在,則進(jìn)行創(chuàng)建
if (!hadoopFS.exists(new Path("/" + hdfsDir))) {
hadoopFS.mkdirs(new Path("/" + hdfsDir));
}
hadoopFS.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
KafkaToHdfs selfObj = new KafkaToHdfs();
selfObj.start();
System.out.println("服務(wù)啟動(dòng)完畢,監(jiān)聽(tīng)執(zhí)行中");
}
public void run() {
Properties props = new Properties();
props.put("zookeeper.connect", kafkaHost);
props.put("group.id", kafkaGroup);
props.put("zookeeper.session.timeout.ms", "10000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
props.put("format", "binary");
props.put("auto.commit.enable", "true");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
this.consumer = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(kafkaTopic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(kafkaTopic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
String tmp = new String(it.next().message());
String fileContent = null;
if (!tmp.endsWith("\n"))
fileContent = new String(tmp + "\n");
else
fileContent = tmp;
debug("receive: " + fileContent);
try {
hadoopFS = FileSystem.get(hdfsConf);
String fileName = "/" + hdfsDir + "/" +
(new SimpleDateFormat("yyyy-MM-dd").format(Calendar.getInstance().getTime())) + ".txt";
Path dst = new Path(fileName);
if (!hadoopFS.exists(dst)) {
FSDataOutputStream output = hadoopFS.create(dst);
output.close();
}
InputStream in = new ByteArrayInputStream(fileContent.getBytes("UTF-8"));
OutputStream out = hadoopFS.append(dst);
IOUtils.copyBytes(in, out, 4096, true);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
hadoopFS.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void init(String[] args) {
kafkaHost = "10.175.118.105:2182";
kafkaGroup = "test-consumer-group";
kafkaTopic = "test";
hdfsUri = "hdfs://10.175.118.105:9000";
hdfsDir = "shxsh";
if (args.length > 5) {
if (args[5].equals("true")) {
isDebug = true;
}
}
debug("初始化服務(wù)參數(shù)完畢,參數(shù)信息如下");
debug("KAFKA_HOST: " + kafkaHost);
debug("KAFKA_GROUP: " + kafkaGroup);
debug("KAFKA_TOPIC: " + kafkaTopic);
debug("HDFS_URI: " + hdfsUri);
debug("HDFS_DIRECTORY: " + hdfsDir);
debug("HADOOP_USER: " + hadoopUser);
debug("IS_DEBUG: " + isDebug);
}
private static void debug(String str) {
if (isDebug) {
System.out.println(str);
}
}
private static void useage() {
System.out.println("* kafka寫(xiě)入到hdfs的Java工具使用說(shuō)明 ");
System.out.println("# java -cp kafkatohdfs.jar KafkaToHdfs KAFKA_HOST KAFKA_GROUP KAFKA_TOPIC HDFS_URI HDFS_DIRECTORY IS_DEBUG");
System.out.println("* 參數(shù)說(shuō)明:");
System.out.println("* KAFKA_HOST : 代表kafka的主機(jī)名或IP:port,例如:namenode:2181,datanode1:2181,datanode2:2181");
System.out.println("* KAFKA_GROUP : 代表kafka的組,例如:test-consumer-group");
System.out.println("* KAFKA_TOPIC : 代表kafka的topic名稱(chēng) ,例如:usertags");
System.out.println("* HDFS_URI : 代表hdfs鏈接uri ,例如:hdfs://namenode:9000");
System.out.println("* HDFS_DIRECTORY : 代表hdfs目錄名稱(chēng) ,例如:usertags");
System.out.println("* 可選參數(shù):");
System.out.println("* IS_DEBUG : 代表是否開(kāi)啟調(diào)試模式,true是,false否,默認(rèn)為false");
}
}
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
一篇文章帶你了解JavaSE的數(shù)據(jù)類(lèi)型
這篇文章主要給大家介紹了關(guān)于JavaSE的數(shù)據(jù)類(lèi)型,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-09-09
使用JPA主鍵@Id,@IdClass,@Embeddable,@EmbeddedId問(wèn)題
這篇文章主要介紹了使用JPA主鍵@Id,@IdClass,@Embeddable,@EmbeddedId問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06
java工具類(lèi)StringUtils使用實(shí)例詳解
這篇文章主要為大家介紹了java工具類(lèi)StringUtils使用實(shí)例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05
使用feign服務(wù)調(diào)用添加Header參數(shù)
這篇文章主要介紹了使用feign服務(wù)調(diào)用添加Header參數(shù)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06
JAVA正則表達(dá)式過(guò)濾文件的實(shí)現(xiàn)方法
這篇文章主要介紹了JAVA正則表達(dá)式過(guò)濾文件的實(shí)現(xiàn)方法的相關(guān)資料,希望通過(guò)本文大家能夠掌握理解這部分內(nèi)容,需要的朋友可以參考下2017-09-09
使用spring動(dòng)態(tài)獲取接口的不同實(shí)現(xiàn)類(lèi)
這篇文章主要介紹了使用spring動(dòng)態(tài)獲取接口的不同實(shí)現(xiàn)類(lèi),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02
最簡(jiǎn)單的spring boot打包docker鏡像的實(shí)現(xiàn)
這篇文章主要介紹了最簡(jiǎn)單的spring boot打包docker鏡像的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10

