亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

kafka生產(chǎn)實(shí)踐(詳解)

 更新時(shí)間:2017年08月02日 09:20:15   投稿:jingxian  
下面小編就為大家?guī)?lái)一篇kafka生產(chǎn)實(shí)踐(詳解)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧

1.引言

最近接觸到一個(gè)APP流量分析的項(xiàng)目,類(lèi)似于友盟。涉及到幾個(gè)C端(客戶(hù)端)高并發(fā)的接口,這幾個(gè)接口主要用于C端數(shù)據(jù)的提交。在沒(méi)有任何緩沖的情況下,一個(gè)接口涉及到5張表的提交。壓測(cè)的結(jié)果很不理想,主要瓶頸就在與RDS的交互。

一臺(tái)雙核,16G機(jī)子,單實(shí)例,jdbc最大連接數(shù)100,吞吐量竟然只有50TPS。

能想到的改造方案就是引入一層緩沖,讓C端接口不與RDS直接交互,很自然就想到了rabbitmq,但是rabbitmq對(duì)分布式的支持比較一般,我們的數(shù)據(jù)體量也比較大,所以我們借鑒了友盟,引入了kafka,Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),起初在不做任何kafka優(yōu)化的時(shí)候,簡(jiǎn)單地將C端提交的數(shù)據(jù)直接send到單節(jié)點(diǎn)kafka,就這樣,我們的吞吐量達(dá)到了100TPS.還是有點(diǎn)小驚喜的。

最近一段時(shí)間研究了一下kafka,對(duì)一些參數(shù)進(jìn)行調(diào)整,目前接口的吞吐量已經(jīng)達(dá)到220TPS,寫(xiě)這篇文章主要想記錄一下自己優(yōu)化和部署經(jīng)歷。

2.kafka簡(jiǎn)介

kafka的結(jié)構(gòu)圖

這張圖很好的詮釋了kafka的結(jié)構(gòu),但是遺漏了一點(diǎn),就是group的概念,我這里補(bǔ)充一下,一個(gè)組可以包含多個(gè)consumer對(duì)多個(gè)topic進(jìn)行消費(fèi),但是不同組的消費(fèi)都是獨(dú)立的。

也就是說(shuō)同一個(gè)topic的同一條消息可以被不同組的consumer消費(fèi)。

我這里的主要的優(yōu)化途徑就是將kafka集群化,多partition化,使其并發(fā)度更高。

集群化都很好理解,那什么是多partition?

partition是topic的一個(gè)概念,即對(duì)topic進(jìn)行分組,不同partition之間的消費(fèi)相互獨(dú)立,并且有序。并且一個(gè)partiton只能被一個(gè)消費(fèi)者消費(fèi),所以咯,假如topic只有一個(gè)partition的話(huà),那么消費(fèi)者實(shí)例不能大于一個(gè),那實(shí)例再多也沒(méi)用,受限于kafka的partition。

上面都是講消費(fèi),其實(shí)send操作也是一樣的,要保證有序必然要等上一個(gè)發(fā)送ack之后,下一個(gè)發(fā)送才能進(jìn)行,如果只有一個(gè)partition,那send之后的ack的等待時(shí)間必然會(huì)阻塞下面一次send,設(shè)計(jì)多個(gè)partition之后,可以同時(shí)往多個(gè)partition發(fā)送消息,自然吞吐量也就上去。

3.kafka集群的搭建以及參數(shù)配置

集群搭建

準(zhǔn)備兩臺(tái)機(jī)子,然后去官網(wǎng)(http://kafka.apache.org/downloads)下載一個(gè)包。通過(guò)scp到服務(wù)器上,解壓進(jìn)入config目錄,編輯server.config.

第一臺(tái)機(jī)子配置(172.18.240.36):

broker.id=0 每臺(tái)服務(wù)器的broker.id都不能相同


#hostname
host.name=172.18.240.36

#在log.retention.hours=168 下面新增下面三項(xiàng)
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#設(shè)置zookeeper的連接端口
zookeeper.connect=172.18.240.36:4001
#默認(rèn)partition數(shù)
num.partitions=2

第二臺(tái)機(jī)子配置(172.18.240.62):

broker.id=1 每臺(tái)服務(wù)器的broker.id都不能相同

#hostname
host.name=172.18.240.62

#在log.retention.hours=168 下面新增下面三項(xiàng)
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#設(shè)置zookeeper的連接端口
zookeeper.connect=172.18.240.36:4001
#默認(rèn)partition數(shù)
num.partitions=2

新增或者修改成以上配置。

對(duì)了,在此之前請(qǐng)先安裝zookeeper,如果你用的是zookeeper集群的話(huà),zookeeper.connect可以填寫(xiě)多個(gè),中間用逗號(hào)隔開(kāi)。

然后啟動(dòng)

nohup ./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &

測(cè)試一下:

在第一臺(tái)機(jī)子上開(kāi)啟一個(gè)producer

./kafka-console-producer.sh --broker-list 172.18.240.36:9092 --topic test-test

在第二臺(tái)機(jī)子上開(kāi)啟一個(gè)consumer

./kafka-console-consumer.sh --bootstrap-server 172.18.240.62:9092 --topic test-test --from-beginning

第一臺(tái)機(jī)子發(fā)送一條消息

第二臺(tái)機(jī)子立馬收到消息

這樣kafka的集群部署就完成了。就下來(lái)我們來(lái)看看,java的客戶(hù)端代碼如何編寫(xiě)。

4.kafka客戶(hù)端代碼示例

我這里的工程是建立在spring boot 之下的,僅供參考。

在 application.yml下添加如下配置:

kafka:
 consumer:
 default:
  server: 172.18.240.36:9092,172.18.240.62:9092
  enableAutoCommit: false
  autoCommitIntervalMs: 100
  sessionTimeoutMs: 15000
  groupId: data_analysis_group
  autoOffsetReset: latest
 producer:
 default:
  server: 172.18.240.36:9092,172.18.240.62:9092
  retries: 0
  batchSize: 4096
  lingerMs: 1
  bufferMemory: 40960

添加兩個(gè)配置類(lèi)

package com.dtdream.analysis.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;

import java.util.HashMap;
import java.util.Map;

@ConfigurationProperties(
  prefix = "kafka.consumer.default"
)
@EnableKafka
@Configuration
public class KafkaConsumerConfig {


 private static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class);
 private static String autoCommitIntervalMs;

 private static String sessionTimeoutMs;

 private static Class keyDeserializerClass = StringDeserializer.class;

 private static Class valueDeserializerClass = StringDeserializer.class;

 private static String groupId = "test-group";

 private static String autoOffsetReset = "latest";

 private static String server;

 private static boolean enableAutoCommit;

 public static String getServer() {
  return server;
 }

 public static void setServer(String server) {
  KafkaConsumerConfig.server = server;
 }

 public static boolean isEnableAutoCommit() {
  return enableAutoCommit;
 }

 public static void setEnableAutoCommit(boolean enableAutoCommit) {
  KafkaConsumerConfig.enableAutoCommit = enableAutoCommit;
 }

 public static String getAutoCommitIntervalMs() {
  return autoCommitIntervalMs;
 }

 public static void setAutoCommitIntervalMs(String autoCommitIntervalMs) {
  KafkaConsumerConfig.autoCommitIntervalMs = autoCommitIntervalMs;
 }

 public static String getSessionTimeoutMs() {
  return sessionTimeoutMs;
 }

 public static void setSessionTimeoutMs(String sessionTimeoutMs) {
  KafkaConsumerConfig.sessionTimeoutMs = sessionTimeoutMs;
 }

 public static Class getKeyDeserializerClass() {
  return keyDeserializerClass;
 }

 public static void setKeyDeserializerClass(Class keyDeserializerClass) {
  KafkaConsumerConfig.keyDeserializerClass = keyDeserializerClass;
 }

 public static Class getValueDeserializerClass() {
  return valueDeserializerClass;
 }

 public static void setValueDeserializerClass(Class valueDeserializerClass) {
  KafkaConsumerConfig.valueDeserializerClass = valueDeserializerClass;
 }

 public static String getGroupId() {
  return groupId;
 }

 public static void setGroupId(String groupId) {
  KafkaConsumerConfig.groupId = groupId;
 }

 public static String getAutoOffsetReset() {
  return autoOffsetReset;
 }

 public static void setAutoOffsetReset(String autoOffsetReset) {
  KafkaConsumerConfig.autoOffsetReset = autoOffsetReset;
 }


 @Bean
 public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  factory.setConcurrency(10);
  factory.getContainerProperties().setPollTimeout(3000);
  factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {
   @Override
   public boolean filter(ConsumerRecord<String, String> consumerRecord) {
    log.debug("partition is {},key is {},topic is {}",
      consumerRecord.partition(), consumerRecord.key(), consumerRecord.topic());
    return false;
   }
  });
  return factory;
 }

 private ConsumerFactory<String, String> consumerFactory() {
  return new DefaultKafkaConsumerFactory<>(consumerConfigs());
 }

 private Map<String, Object> consumerConfigs() {
  Map<String, Object> propsMap = new HashMap<>();
  propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
  propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
  propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
  propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
  propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
  propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
  propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  return propsMap;

 }


 /* @Bean
 public Listener listener() {
  return new Listener();
 }*/
}
package com.dtdream.analysis.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * Created with IntelliJ IDEA.
 * User: chenqimiao
 * Date: 2017/7/24
 * Time: 9:43
 * To change this template use File | Settings | File Templates.
 */
@ConfigurationProperties(
  prefix = "kafka.producer.default",
  ignoreInvalidFields = true
)//注入一些屬性域
@EnableKafka
@Configuration//使得@Bean注解生效
public class KafkaProducerConfig {
 private static String server;
 private static Integer retries;
 private static Integer batchSize;
 private static Integer lingerMs;
 private static Integer bufferMemory;
 private static Class keySerializerClass = StringSerializer.class;
 private static Class valueSerializerClass = StringSerializer.class;

 private Map<String, Object> producerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
  props.put(ProducerConfig.RETRIES_CONFIG, retries);
  props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
  props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
  return props;
 }

 private ProducerFactory<String, String> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfigs());
 }

 public static String getServer() {
  return server;
 }

 public static void setServer(String server) {
  KafkaProducerConfig.server = server;
 }

 public static Integer getRetries() {
  return retries;
 }

 public static void setRetries(Integer retries) {
  KafkaProducerConfig.retries = retries;
 }

 public static Integer getBatchSize() {
  return batchSize;
 }

 public static void setBatchSize(Integer batchSize) {
  KafkaProducerConfig.batchSize = batchSize;
 }

 public static Integer getLingerMs() {
  return lingerMs;
 }

 public static void setLingerMs(Integer lingerMs) {
  KafkaProducerConfig.lingerMs = lingerMs;
 }

 public static Integer getBufferMemory() {
  return bufferMemory;
 }

 public static void setBufferMemory(Integer bufferMemory) {
  KafkaProducerConfig.bufferMemory = bufferMemory;
 }

 public static Class getKeySerializerClass() {
  return keySerializerClass;
 }

 public static void setKeySerializerClass(Class keySerializerClass) {
  KafkaProducerConfig.keySerializerClass = keySerializerClass;
 }

 public static Class getValueSerializerClass() {
  return valueSerializerClass;
 }

 public static void setValueSerializerClass(Class valueSerializerClass) {
  KafkaProducerConfig.valueSerializerClass = valueSerializerClass;
 }

 @Bean(name = "kafkaTemplate")
 public KafkaTemplate<String, String> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
 }
}

利用kafkaTemplate即可完成發(fā)送。

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;


 @RequestMapping(
   value = "/openApp",
   method = RequestMethod.POST,
   produces = MediaType.APPLICATION_JSON_UTF8_VALUE,
   consumes = MediaType.APPLICATION_JSON_UTF8_VALUE
 )
 @ResponseBody
 public ResultDTO openApp(@RequestBody ActiveLogPushBo activeLogPushBo, HttpServletRequest request) {

  logger.info("openApp: activeLogPushBo {}, dateTime {}", JSONObject.toJSONString(activeLogPushBo),new DateTime().toString("yyyy-MM-dd HH:mm:ss.SSS"));

  String ip = (String) request.getAttribute("ip");

  activeLogPushBo.setIp(ip);

  activeLogPushBo.setDate(new Date());

  //ResultDTO resultDTO = dataCollectionService.collectOpenInfo(activeLogPushBo);

  kafkaTemplate.send("data_collection_open",JSONObject.toJSONString(activeLogPushBo));

  // logger.info("openApp: resultDTO {} ,dateTime {}", resultDTO.toJSONString(),new DateTime().toString("yyyy-MM-dd HH:mm:ss.SSS"));

  return new ResultDTO().success();
 }

kafkaTemplate的send方法會(huì)更根據(jù)你指定的key進(jìn)行hash,再對(duì)partition數(shù)進(jìn)行去模,最后決定發(fā)送到那一個(gè)分區(qū),假如沒(méi)有指定key,那send方法對(duì)分區(qū)的選擇是隨機(jī)。具體怎么隨機(jī)的話(huà),這里就不展開(kāi)講了,有興趣的同學(xué)可以自己看源碼,我們可以交流交流。

接著配置一個(gè)監(jiān)聽(tīng)器

package com.dtdream.analysis.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;

import java.util.Optional;
@Component
public class Listener {

 private Logger logger = LoggerFactory.getLogger(this.getClass());

 @KafkaListener(topics = {"test-topic"})
 public void listen(ConsumerRecord<?, ?> record) {
  Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  if (kafkaMessage.isPresent()) {
   Object message = kafkaMessage.get();
   logger.info("message is {} ", message);
  }
 }
}

@KafkaListener其實(shí)可以具體指定消費(fèi)哪個(gè)分區(qū),如果不指定的話(huà),并且只有一個(gè)消費(fèi)者實(shí)例,那么這個(gè)實(shí)例會(huì)消費(fèi)所有的分區(qū)的消息。

消費(fèi)者的數(shù)量是一定要少于partition的數(shù)量的,不然沒(méi)有任何意義。會(huì)出現(xiàn)消費(fèi)者過(guò)剩的情況。

消費(fèi)者數(shù)量和partition數(shù)量的多與少,會(huì)動(dòng)態(tài)影響消費(fèi)節(jié)點(diǎn)所消費(fèi)的partition數(shù)目,最終會(huì)在整個(gè)集群中達(dá)到一種動(dòng)態(tài)平衡。

5.總結(jié)

理論上只要cpu核心數(shù)無(wú)限,那么partition數(shù)也可以無(wú)上限,與此同時(shí)消費(fèi)者節(jié)點(diǎn)和生產(chǎn)者節(jié)點(diǎn)也可以無(wú)上限,最終會(huì)使單個(gè)topic的并發(fā)無(wú)上限。單機(jī)的cpu的核心數(shù)總是會(huì)達(dá)到一個(gè)上限,kafka作為分布式系統(tǒng),可以很好利用集群的運(yùn)算能力,進(jìn)行動(dòng)態(tài)擴(kuò)展,在DT時(shí)代,應(yīng)該會(huì)慢慢成為主流吧。

以上這篇kafka生產(chǎn)實(shí)踐(詳解)就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Spring?Security實(shí)現(xiàn)統(tǒng)一登錄與權(quán)限控制的示例代碼

    Spring?Security實(shí)現(xiàn)統(tǒng)一登錄與權(quán)限控制的示例代碼

    這篇文章主要介紹了Spring?Security實(shí)現(xiàn)統(tǒng)一登錄與權(quán)限控制,本文通過(guò)示例代碼重點(diǎn)看一下統(tǒng)一認(rèn)證中心和業(yè)務(wù)網(wǎng)關(guān)的建設(shè),需要的朋友可以參考下
    2022-03-03
  • IDEA快捷鍵和各種實(shí)用功能小結(jié)

    IDEA快捷鍵和各種實(shí)用功能小結(jié)

    這篇文章主要介紹了IDEA快捷鍵總結(jié)和各種實(shí)用功能,包括IDEA中內(nèi)容輔助鍵和快捷鍵,修改自動(dòng)補(bǔ)全快捷鍵,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-08-08
  • 心動(dòng)嗎?正大光明的免費(fèi)使用IntelliJ IDEA商業(yè)版

    心動(dòng)嗎?正大光明的免費(fèi)使用IntelliJ IDEA商業(yè)版

    這篇文章主要介紹了正大光明的免費(fèi)使用IntelliJ IDEA商業(yè)版,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2020-02-02
  • Springcloud seata nacos環(huán)境搭建過(guò)程圖解

    Springcloud seata nacos環(huán)境搭建過(guò)程圖解

    這篇文章主要介紹了Springcloud seata nacos環(huán)境搭建過(guò)程圖解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-03-03
  • 教你快速搭建sona服務(wù)及idea使用sona的方法

    教你快速搭建sona服務(wù)及idea使用sona的方法

    Sonar 是一個(gè)用于代碼質(zhì)量管理的開(kāi)放平臺(tái)。通過(guò)插件機(jī)制,Sonar 可以集成不同的測(cè)試工具,代碼分析工具,以及持續(xù)集成工具,本文給大家分享搭建sona服務(wù)及idea使用sona的方法,感興趣的朋友一起看看吧
    2021-06-06
  • java高并發(fā)ScheduledThreadPoolExecutor與Timer區(qū)別

    java高并發(fā)ScheduledThreadPoolExecutor與Timer區(qū)別

    這篇文章主要為大家介紹了java高并發(fā)ScheduledThreadPoolExecutor與Timer區(qū)別,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-10-10
  • SpringBoot整合RabbitMQ處理死信隊(duì)列和延遲隊(duì)列

    SpringBoot整合RabbitMQ處理死信隊(duì)列和延遲隊(duì)列

    這篇文章將通過(guò)示例為大家詳細(xì)介紹SpringBoot整合RabbitMQ時(shí)如何處理死信隊(duì)列和延遲隊(duì)列,文中的示例代碼講解詳細(xì),需要的可以參考一下
    2022-05-05
  • SpringBoot實(shí)現(xiàn)列表數(shù)據(jù)導(dǎo)出為Excel文件

    SpringBoot實(shí)現(xiàn)列表數(shù)據(jù)導(dǎo)出為Excel文件

    這篇文章主要為大家詳細(xì)介紹了在Spring?Boot框架中如何將列表數(shù)據(jù)導(dǎo)出為Excel文件,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解下
    2024-02-02
  • SpringBoot3結(jié)合Vue3實(shí)現(xiàn)用戶(hù)登錄功能

    SpringBoot3結(jié)合Vue3實(shí)現(xiàn)用戶(hù)登錄功能

    最近項(xiàng)目需求搭建一個(gè)結(jié)合Vue.js前端框架和Spring Boot后端框架的登錄系統(tǒng),本文主要介紹了SpringBoot3結(jié)合Vue3實(shí)現(xiàn)用戶(hù)登錄功能,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-03-03
  • 淺談Java中的克隆close()和賦值引用的區(qū)別

    淺談Java中的克隆close()和賦值引用的區(qū)別

    下面小編就為大家?guī)?lái)一篇淺談Java中的克隆close()和賦值引用的區(qū)別。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2016-09-09

最新評(píng)論