亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java分布式學(xué)習(xí)之Kafka消息隊列

 更新時間:2022年07月28日 11:33:13   作者:kaico2018  
Kafka是由Apache軟件基金會開發(fā)的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費者在網(wǎng)站中的所有動作流數(shù)據(jù)

介紹

Apache Kafka 是分布式發(fā)布-訂閱消息系統(tǒng),在 kafka官網(wǎng)上對 kafka 的定義:一個分布式發(fā)布-訂閱消息傳遞系統(tǒng)。 它最初由LinkedIn公司開發(fā),Linkedin于2010年貢獻給了Apache基金會并成為頂級開源項目。Kafka是一種快速、可擴展的、設(shè)計內(nèi)在就是分布式的,分區(qū)的和可復(fù)制的提交日志服務(wù)。

注意:Kafka并沒有遵循JMS規(guī)范(),它只提供了發(fā)布和訂閱通訊方式。

kafka中文官網(wǎng):http://kafka.apachecn.org/quickstart.html

Kafka核心相關(guān)名稱

  1. Broker:Kafka節(jié)點,一個Kafka節(jié)點就是一個broker,多個broker可以組成一個Kafka集群
  2. Topic:一類消息,消息存放的目錄即主題,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能夠同時負(fù)責(zé)多個topic的分發(fā)
  3. massage: Kafka中最基本的傳遞對象。
  4. Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。Kafka里面實現(xiàn)分區(qū),一個broker就是表示一個區(qū)域。
  5. Segment:partition物理上由多個segment組成,每個Segment存著message信息
  6. Producer : 生產(chǎn)者,生產(chǎn)message發(fā)送到topic
  7. Consumer : 消費者,訂閱topic并消費message, consumer作為一個線程來消費
  8. Consumer Group:消費者組,一個Consumer Group包含多個consumer
  9. Offset:偏移量,理解為消息 partition 中消息的索引位置

主題和隊列的區(qū)別:

隊列是一個數(shù)據(jù)結(jié)構(gòu),遵循先進先出原則

kafka集群安裝

參考官方文檔:https://kafka.apachecn.org/quickstart.html

  • 每臺服務(wù)器上安裝jdk1.8環(huán)境
  • 安裝Zookeeper集群環(huán)境
  • 安裝kafka集群環(huán)境
  • 運行環(huán)境測試

安裝jdk環(huán)境和zookeeper這里不詳述了。

kafka為什么依賴于zookeeper:kafka會將mq信息存放到zookeeper上,為了使整個集群能夠方便擴展,采用zookeeper的事件通知相互感知。

kafka集群安裝步驟:

1、下載kafka的壓縮包,下載地址:https://kafka.apachecn.org/downloads.html

2、解壓安裝包

tar -zxvf kafka_2.11-1.0.0.tgz

3、修改kafka的配置文件 config/server.properties

配置文件修改內(nèi)容:

  • zookeeper連接地址:zookeeper.connect=192.168.1.19:2181
  • 監(jiān)聽的ip,修改為本機的iplisteners=PLAINTEXT://192.168.1.19:9092
  • kafka的brokerid,每臺broker的id都不一樣broker.id=0

4、依次啟動kafka

./kafka-server-start.sh -daemon config/server.properties

kafka使用

kafka文件存儲

topic是邏輯上的概念,而partition是物理上的概念,每個partition對應(yīng)于一個log文件,該log文件中存儲的就是Producer生成的數(shù)據(jù)。Producer生成的數(shù)據(jù)會被不斷追加到該log文件末端,為防止log文件過大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka采取了分片和索引機制,將每個partition分為多個segment,每個segment包括:“.index”文件、“.log”文件和.timeindex等文件。這些文件位于一個文件夾下,該文件夾的命名規(guī)則為:topic名稱+分區(qū)序號。

例如:執(zhí)行命令新建一個主題,分三個區(qū)存放放在三個broker中:

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kaico

  • 一個partition分為多個segment
  • .log 日志文件
  • .index 偏移量索引文件
  • .timeindex 時間戳索引文件
  • 其他文件(partition.metadata,leader-epoch-checkpoint)

Springboot整合kafka

maven依賴

 <dependencies>
        <!-- springBoot集成kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!-- SpringBoot整合Web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

yml配置

# kafka
spring:
  kafka:
    # kafka服務(wù)器地址(可以多個)
#    bootstrap-servers: 192.168.212.164:9092,192.168.212.167:9092,192.168.212.168:9092
    bootstrap-servers: www.kaicostudy.com:9092,www.kaicostudy.com:9093,www.kaicostudy.com:9094
    consumer:
      # 指定一個默認(rèn)的組名
      group-id: kafkaGroup1
      # earliest:當(dāng)各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
      # latest:當(dāng)各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產(chǎn)生的該分區(qū)下的數(shù)據(jù)
      # none:topic各分區(qū)都存在已提交的offset時,從offset后開始消費;只要有一個分區(qū)不存在已提交的offset,則拋出異常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 緩存容量
      buffer-memory: 524288
      # 服務(wù)器地址
      bootstrap-servers: www.kaicostudy.com:9092,www.kaicostudy.com:9093,www.kaicostudy.com:9094

生產(chǎn)者

@RestController
public class KafkaController {
	/**
	 * 注入kafkaTemplate
	 */
	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;
	/**
	 * 發(fā)送消息的方法
	 *
	 * @param key
	 *            推送數(shù)據(jù)的key
	 * @param data
	 *            推送數(shù)據(jù)的data
	 */
	private void send(String key, String data) {
		// topic 名稱 key   data 消息數(shù)據(jù)
		kafkaTemplate.send("kaico", key, data);
	}
	// test 主題 1 my_test 3
	@RequestMapping("/kafka")
	public String testKafka() {
		int iMax = 6;
		for (int i = 1; i < iMax; i++) {
			send("key" + i, "data" + i);
		}
		return "success";
	}
}

消費者

@Component
public class TopicKaicoConsumer {
    /**
     * 消費者使用日志打印消息
     */
    @KafkaListener(topics = "kaico") //監(jiān)聽的主題
    public void receive(ConsumerRecord<?, ?> consumer) {
        System.out.println("topic名稱:" + consumer.topic() + ",key:" +
                consumer.key() + "," +
                "分區(qū)位置:" + consumer.partition()
                + ", 下標(biāo)" + consumer.offset());
        //輸出key對應(yīng)的value的值
        System.out.println(consumer.value());
    }
}

到此這篇關(guān)于Java分布式學(xué)習(xí)之Kafka消息隊列的文章就介紹到這了,更多相關(guān)Java Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java中LinkedList詳解和使用示例_動力節(jié)點Java學(xué)院整理

    Java中LinkedList詳解和使用示例_動力節(jié)點Java學(xué)院整理

    LinkedList 是一個繼承于AbstractSequentialList的雙向鏈表。它也可以被當(dāng)作堆棧、隊列或雙端隊列進行操作。接下來通過示例代碼給大家詳細(xì)介紹java中l(wèi)inkedlist的使用,需要的朋友參考下吧
    2017-05-05
  • 帶大家深入了解Spring事務(wù)

    帶大家深入了解Spring事務(wù)

    Spring框架提供統(tǒng)一的事務(wù)抽象,通過統(tǒng)一的編程模型使得應(yīng)用程序可以很容易地在不同的事務(wù)框架之間進行切換. 在學(xué)習(xí)Spring事務(wù)前,我們先對數(shù)據(jù)庫事務(wù)進行簡單的介紹。,需要的朋友可以參考下
    2021-05-05
  • Mybatis延遲加載原理和延遲加載配置詳解

    Mybatis延遲加載原理和延遲加載配置詳解

    這篇文章主要介紹了Mybatis延遲加載原理和延遲加載配置詳解,MyBatis中的延遲加載,也稱為懶加載,是指在進行表的關(guān)聯(lián)查詢時,按照設(shè)置延遲規(guī)則推遲對關(guān)聯(lián)對象的select查詢,需要的朋友可以參考下
    2023-10-10
  • httpclient的CPool定義方法詳解

    httpclient的CPool定義方法詳解

    這篇文章主要為大家介紹了httpclient的CPool定義方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-10-10
  • 使用String類型小數(shù)值轉(zhuǎn)換為Long類型

    使用String類型小數(shù)值轉(zhuǎn)換為Long類型

    這篇文章主要介紹了使用String類型小數(shù)值轉(zhuǎn)換為Long類型操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • Java中文件的讀寫方法之IO流詳解

    Java中文件的讀寫方法之IO流詳解

    這篇文章主要介紹了Java中文件的讀寫方法之IO流詳解,本文中的代碼所涉及到的路徑或者文件都是本人的,大家得換成自己的,并且大家可以在網(wǎng)上自行找一些材料作為讀入或者寫入的材料,不過路徑最好是英文的,不要包含中文,防止JVM讀取失敗
    2022-04-04
  • JAVA十大排序算法之堆排序詳解

    JAVA十大排序算法之堆排序詳解

    這篇文章主要介紹了java中的冒泡排序,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考
    2021-08-08
  • JAVA設(shè)計模式之解釋器模式詳解

    JAVA設(shè)計模式之解釋器模式詳解

    這篇文章主要介紹了JAVA設(shè)計模式之解釋器模式詳解,解釋器模式是類的行為模式,給定一個語言之后,解釋器模式可以定義出其文法的一種表示,并同時提供一個解釋器,需要的朋友可以參考下
    2015-04-04
  • springboot如何集成Swagger2

    springboot如何集成Swagger2

    這篇文章主要介紹了springboot集成Swagger2的方法,幫助大家更好的理解和使用springboot框架,感興趣的朋友可以了解下
    2020-12-12
  • java不通過配置文件初始化logger示例

    java不通過配置文件初始化logger示例

    這篇文章主要介紹了java不通過配置文件初始化logger示例,需要的朋友可以參考下
    2014-05-05

最新評論