Spring boot集成Kafka+Storm的示例代碼
前言
由于業(yè)務(wù)需求需要把Strom與kafka整合到spring boot項目里,實現(xiàn)其他服務(wù)輸出日志至kafka訂閱話題,storm實時處理該話題完成數(shù)據(jù)監(jiān)控及其他數(shù)據(jù)統(tǒng)計,但是網(wǎng)上教程較少,今天想寫的就是如何整合storm+kafka 到spring boot,順帶說一說我遇到的坑。
使用工具及環(huán)境配置
1. java 版本jdk-1.8
2. 編譯工具使用IDEA-2017
3. maven作為項目管理
4.spring boot-1.5.8.RELEASE
需求體現(xiàn)
1.為什么需要整合到spring boot
為了使用spring boot 統(tǒng)一管理各種微服務(wù),及同時避免多個分散配置
2.具體思路及整合原因
使用spring boot統(tǒng)一管理kafka、storm、redis等所需要的bean,通過其他服務(wù)日志收集至Kafka,KafKa實時發(fā)送日志至storm,在strom bolt時進(jìn)行相應(yīng)的處理操作
遇到的問題
1.使用spring boot并沒有相關(guān)整合storm
2.以spring boot啟動方式不知道如何觸發(fā)提交Topolgy
3.提交Topology時遇到numbis not client localhost 問題
4.Storm bolt中無法通過注解獲得實例化bean進(jìn)行相應(yīng)的操作
解決思路
在整合之前我們需要知道相應(yīng)的spring boot 的啟動方式及配置(如果你在閱讀本文時,默認(rèn)你已經(jīng)對storm,kafka及spring boot有相關(guān)了解及使用)
spring boot 對storm進(jìn)行整合的例子在網(wǎng)上很少,但是因為有相應(yīng)的需求,因此我們還是需要整合.
首先導(dǎo)入所需要jar包:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.1</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> <exclusions> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>spring-boot-actuator</artifactId> <groupId>org.springframework.boot</groupId> </exclusion> <exclusion> <artifactId>kafka-clients</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusions> <exclusion> <artifactId>kafka-clients</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop</artifactId> <version>2.5.0.RELEASE</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> <exclusion> <artifactId>netty</artifactId> <groupId>io.netty</groupId> </exclusion> <exclusion> <artifactId>jackson-core-asl</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>curator-client</artifactId> <groupId>org.apache.curator</groupId> </exclusion> <exclusion> <artifactId>jettison</artifactId> <groupId>org.codehaus.jettison</groupId> </exclusion> <exclusion> <artifactId>jackson-mapper-asl</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>jackson-jaxrs</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>snappy-java</artifactId> <groupId>org.xerial.snappy</groupId> </exclusion> <exclusion> <artifactId>jackson-xc</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>hadoop-mapreduce-client-core</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.4</version> <exclusions> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>netty</artifactId> <groupId>io.netty</groupId> </exclusion> <exclusion> <artifactId>hadoop-common</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>hadoop-annotations</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hadoop-yarn-common</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> <exclusions> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> <exclusion> <artifactId>curator-client</artifactId> <groupId>org.apache.curator</groupId> </exclusion> <exclusion> <artifactId>jackson-mapper-asl</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>jackson-core-asl</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> <exclusion> <artifactId>snappy-java</artifactId> <groupId>org.xerial.snappy</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>hadoop-auth</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>commons-lang</artifactId> <groupId>commons-lang</groupId> </exclusion> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-examples</artifactId> <version>2.7.3</version> <exclusions> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> <exclusion> <artifactId>netty</artifactId> <groupId>io.netty</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <!--storm--> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <scope>${provided.scope}</scope> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>1.1.1</version> <exclusions> <exclusion> <artifactId>kafka-clients</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency>
其中去除jar包是因為需要相與項目構(gòu)建依賴有多重依賴問題,storm版本為1.1.0 spring boot相關(guān)依賴為
```java
<!-- spring boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>${mybatis-spring.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
ps:maven的jar包僅因為項目使用需求,不是最精簡,僅供大家參考.
項目結(jié)構(gòu):
config-存儲不同環(huán)境配置文件
存儲構(gòu)建spring boot 相關(guān)實現(xiàn)類 其他如構(gòu)建名
啟動spring boot的時候我們會發(fā)現(xiàn)
其實開始整合前,對storm了解的較少,屬于剛開始沒有接觸過,后面參考發(fā)現(xiàn)整合到spring boot里面啟動spring boot之后并沒有相應(yīng)的方式去觸發(fā)提交Topolgy的函數(shù),所以也造成了以為啟動spring boot之后就完事了結(jié)果等了半個小時什么事情都沒發(fā)生才發(fā)現(xiàn)沒有實現(xiàn)觸發(fā)提交函數(shù).
為了解決這個問題我的想法是: 啟動spring boot->創(chuàng)建kafka監(jiān)聽Topic然后啟動Topolgy完成啟動,可是這樣的問題kafka監(jiān)聽這個主題會重復(fù)觸發(fā)Topolgy,這明顯不是我們想要的.看了一會后發(fā)現(xiàn)spring 有相關(guān)啟動完成之后執(zhí)行某個時間方法,這個對我來說簡直是救星啊.所以現(xiàn)在觸發(fā)Topolgy的思路變?yōu)?
啟動spring boot ->執(zhí)行觸發(fā)方法->完成相應(yīng)的觸發(fā)條件
構(gòu)建方法為:
/** * @author Leezer * @date 2017/12/28 * spring加載完后自動自動提交Topology **/ @Configuration @Component public class AutoLoad implements ApplicationListener<ContextRefreshedEvent> { private static String BROKERZKSTR; private static String TOPIC; private static String HOST; private static String PORT; public AutoLoad(@Value("${storm.brokerZkstr}") String brokerZkstr, @Value("${zookeeper.host}") String host, @Value("${zookeeper.port}") String port, @Value("${kafka.default-topic}") String topic ){ BROKERZKSTR = brokerZkstr; HOST= host; TOPIC= topic; PORT= port; } @Override public void onApplicationEvent(ContextRefreshedEvent event) { try { //實例化topologyBuilder類。 TopologyBuilder topologyBuilder = new TopologyBuilder(); //設(shè)置噴發(fā)節(jié)點(diǎn)并分配并發(fā)數(shù),該并發(fā)數(shù)將會控制該對象在集群中的線程數(shù)。 BrokerHosts brokerHosts = new ZkHosts(BROKERZKSTR); // 配置Kafka訂閱的Topic,以及zookeeper中數(shù)據(jù)節(jié)點(diǎn)目錄和名字 SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, TOPIC, "/storm", "s32"); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConfig.zkServers = Collections.singletonList(HOST); spoutConfig.zkPort = Integer.parseInt(PORT); //從Kafka最新輸出日志讀取 spoutConfig.startOffsetTime = OffsetRequest.LatestTime(); KafkaSpout receiver = new KafkaSpout(spoutConfig); topologyBuilder.setSpout("kafka-spout", receiver, 1).setNumTasks(2); topologyBuilder.setBolt("alarm-bolt", new AlarmBolt(), 1).setNumTasks(2).shuffleGrouping("kafka-spout"); Config config = new Config(); config.setDebug(false); /*設(shè)置該topology在storm集群中要搶占的資源slot數(shù),一個slot對應(yīng)這supervisor節(jié)點(diǎn)上的以個worker進(jìn)程,如果你分配的spot數(shù)超過了你的物理節(jié)點(diǎn)所擁有的worker數(shù)目的話,有可能提交不成功,加入你的集群上面已經(jīng)有了一些topology而現(xiàn)在還剩下2個worker資源,如果你在代碼里分配4個給你的topology的話,那么這個topology可以提交但是提交以后你會發(fā)現(xiàn)并沒有運(yùn)行。 而當(dāng)你kill掉一些topology后釋放了一些slot后你的這個topology就會恢復(fù)正常運(yùn)行。 */ config.setNumWorkers(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology()); } catch (Exception e) { e.printStackTrace(); } } }
注:
啟動項目時因為使用的是內(nèi)嵌tomcat進(jìn)行啟動,可能會報如下錯誤
[Tomcat-startStop-1] ERROR o.a.c.c.ContainerBase - A child container failed during start java.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Tomcat].StandardHost[localhost].TomcatEmbeddedContext[]] at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_144] at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_144] at org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:939) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.core.StandardHost.startInternal(StandardHost.java:872) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1419) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1409) [tomcat-embed-core-8.5.23.jar:8.5.23] at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_144] at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_144] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]
這是因為有相應(yīng)導(dǎo)入的jar包引入了servlet-api版本低于內(nèi)嵌版本,我們需要做的就是打開maven依賴把其去除
<exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion>
然后重新啟動就可以了.
啟動過程中還有可能報:
org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:90
這個問題我思考了很久,發(fā)現(xiàn)網(wǎng)上的解釋都是因為storm配置問題導(dǎo)致不對,可是我的storm是部署在服務(wù)器上的.并沒有相關(guān)的配置,按理也應(yīng)該去服務(wù)器上讀取相關(guān)配置,可是結(jié)果并不是這樣的。最后嘗試了幾個做法發(fā)現(xiàn)都不對,這里才發(fā)現(xiàn),在構(gòu)建集群的時候storm提供了相應(yīng)的本地集群
LocalCluster cluster = new LocalCluster();
進(jìn)行本地測試,如果在本地測試就使用其進(jìn)行部署測試,如果部署到服務(wù)器上需要把:
cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology()); //修正為: StormSubmitter.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
進(jìn)行任務(wù)提交;
以上解決了上面所述的問題1-3
問題4:是在bolt中使用相關(guān)bean實例,我發(fā)現(xiàn)我把其使用@Component加入spring中也無法獲取到實例:我的猜想是在我們構(gòu)建提交Topolgy的時候,它會在:
topologyBuilder.setBolt("alarm-bolt",new AlarmBolt(),1).setNumTasks(2).shuffleGrouping("kafka-spout");
執(zhí)行bolt相關(guān):
@Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; StormLauncher stormLauncher = StormLauncher.getStormLauncher(); dataRepositorys =(AlarmDataRepositorys) stormLauncher.getBean("alarmdataRepositorys"); }
而不會實例化bolt,導(dǎo)致線程不一而spring 獲取不到.(這里我也不是太明白,如果有大佬知道可以分享一波)
而我們使用spring boot的意義就在于這些獲取這些繁雜的對象,這個問題困擾了我很久.最終想到,我們可以通過上下文getbean獲取實例不知道能不能行,然后我就開始了定義:
例如我需要在bolt中使用一個服務(wù):
/** * @author Leezer * @date 2017/12/27 * 存儲操作失敗時間 **/ @Service("alarmdataRepositorys") public class AlarmDataRepositorys extends RedisBase implements IAlarmDataRepositorys { private static final String ERRO = "erro"; /** * @param type 類型 * @param key key值 * @return 錯誤次數(shù) **/ @Override public String getErrNumFromRedis(String type,String key) { if(type==null || key == null){ return null; }else { ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue(); return valueOper.get(String.format("%s:%s:%s",ERRO,type,key)); } } /** * @param type 錯誤類型 * @param key key值 * @param value 存儲值 **/ @Override public void setErrNumToRedis(String type, String key,String value) { try { ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue(); valueOper.set(String.format("%s:%s:%s", ERRO,type, key), value, Dictionaries.ApiKeyDayOfLifeCycle, TimeUnit.SECONDS); }catch (Exception e){ logger.info(Dictionaries.REDIS_ERROR_PREFIX+String.format("key為%s存入redis失敗",key)); } }
這里我指定了該bean的名稱,則在bolt執(zhí)行prepare時:使用getbean方法獲取了相關(guān)bean就能完成相應(yīng)的操作.
然后kafka訂閱主題發(fā)送至我bolt進(jìn)行相關(guān)的處理.而這里getbean的方法是在啟動bootmain函數(shù)定義:
@SpringBootApplication @EnableTransactionManagement @ComponentScan({"service","storm"}) @EnableMongoRepositories(basePackages = {"storm"}) @PropertySource(value = {"classpath:service.properties", "classpath:application.properties","classpath:storm.properties"}) @ImportResource(locations = { "classpath:/configs/spring-hadoop.xml", "classpath:/configs/spring-hbase.xml"}) public class StormLauncher extends SpringBootServletInitializer { //設(shè)置 安全線程launcher實例 private volatile static StormLauncher stormLauncher; //設(shè)置上下文 private ApplicationContext context; public static void main(String[] args) { SpringApplicationBuilder application = new SpringApplicationBuilder(StormLauncher.class); // application.web(false).run(args);該方式是spring boot不以web形式啟動 application.run(args); StormLauncher s = new StormLauncher(); s.setApplicationContext(application.context()); setStormLauncher(s); } private static void setStormLauncher(StormLauncher stormLauncher) { StormLauncher.stormLauncher = stormLauncher; } public static StormLauncher getStormLauncher() { return stormLauncher; } @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(StormLauncher.class); } /** * 獲取上下文 * * @return the application context */ public ApplicationContext getApplicationContext() { return context; } /** * 設(shè)置上下文. * * @param appContext 上下文 */ private void setApplicationContext(ApplicationContext appContext) { this.context = appContext; } /** * 通過自定義name獲取 實例 Bean. * * @param name the name * @return the bean */ public Object getBean(String name) { return context.getBean(name); } /** * 通過class獲取Bean. * * @param <T> the type parameter * @param clazz the clazz * @return the bean */ public <T> T getBean(Class<T> clazz) { return context.getBean(clazz); } /** * 通過name,以及Clazz返回指定的Bean * * @param <T> the type parameter * @param name the name * @param clazz the clazz * @return the bean */ public <T> T getBean(String name, Class<T> clazz) { return context.getBean(name, clazz); }
到此集成storm 和kafka至spring boot已經(jīng)結(jié)束了,相關(guān)kafka及其他配置我會放入github上面
對了這里還有一個kafkaclient的坑:
Async loop died! java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.
項目會報kafka client 問題,這是因為storm-kafka中,kafka使用的是0.8版本,而NetworkSend是0.9以上的版本,這里集成需要與你集成的kafka相關(guān)版本一致.
雖然集成比較簡單,但是參考都比較少,加之剛開始接觸storm所以思考比較多,也在這記錄一下.
項目地址 - github
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
spring?boot使用@Async注解解決異步多線程入庫的問題
最近在寫項目是需要添加異步操作來提高效率,所以下面這篇文章主要給大家介紹了關(guān)于spring?boot使用@Async注解解決異步多線程入庫問題的相關(guān)資料,文中通過實例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-05-05java編寫一個花名隨機(jī)抽取器的實現(xiàn)示例
這篇文章主要介紹了java編寫一個花名隨機(jī)抽取器的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-03-03