Lucene源碼系列多值編碼壓縮算法實(shí)例詳解
背景
在Lucene中,涉及到索引文件生成的時(shí)候,會(huì)看到比較多的PackedInts.Encoder,PackedWriter,DirectWriter,DirectMonotonicWriter等等對(duì)多個(gè)long進(jìn)行壓縮編碼解碼的使用,但是它們之間有什么區(qū)別和聯(lián)系呢?本文就是詳細(xì)介紹說(shuō)明Lucene中對(duì)正整數(shù)(int或者long)數(shù)組的壓縮編碼解碼方式。
雖然限制了只對(duì)正整數(shù)有用,但是其他數(shù)值可以通過(guò)一些轉(zhuǎn)化,先轉(zhuǎn)成正整數(shù),然后再使用本文介紹的壓縮編碼。比如,負(fù)整數(shù)可以通過(guò)zigzag先做一次編碼,float或者double可以通過(guò)把二進(jìn)制按int或者long來(lái)解析進(jìn)行預(yù)處理。
特別說(shuō)明
本文中說(shuō)到的壓縮,壓縮編碼器,編碼器指的都是同一個(gè)東西。另外,本文相關(guān)的源碼主要都是在進(jìn)行一些位運(yùn)算,我會(huì)有注釋,但是如果實(shí)在看不太懂的話,可以自己舉例子走一遍源碼就清楚了。
本文的重點(diǎn)還是壓縮編碼的思想,以及幾個(gè)工具類的不同使用場(chǎng)景的理解。
前置知識(shí)
在單值壓縮中,我們已經(jīng)說(shuō)過(guò)了,壓縮其實(shí)就是去除無(wú)用的信息,而對(duì)于數(shù)值來(lái)說(shuō),高位的連續(xù)0其實(shí)就是無(wú)用的信息。
假設(shè)有l(wèi)ong數(shù)組中的值為:
long[] values = {10, 4, 9, 16, 132};
它們的十進(jìn)制和二進(jìn)制分別是:
十進(jìn)制 | 二進(jìn)制 |
---|---|
10 | 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00001010 |
4 | 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000100 |
9 | 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00001001 |
16 | 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00010000 |
580 | 00000000 00000000 00000000 00000000 00000000 00000000 00000010 01000100 |
如果按照正常的二進(jìn)制編碼存儲(chǔ),一個(gè)long需要64bits,則values數(shù)組需要5*64=320bits。但是我們觀察所有value的二進(jìn)制,發(fā)現(xiàn)最大的580也只有最后10bits是有效的,因此,我們可以用10bits表示每一個(gè)value,這樣一共只需要50bits就能完成對(duì)values數(shù)組的存儲(chǔ),如下表所示:
十進(jìn)制 | 壓縮編碼 |
---|---|
10 | 00 00001010 |
4 | 00 00000100 |
9 | 00 00001001 |
16 | 00 00010000 |
580 | 10 10000100 |
所以,所謂的壓縮編碼其實(shí)就是只保留value的有效位。接下來(lái)先介紹兩個(gè)很重要的概念:
- bitsPerValue:要壓縮編碼的整數(shù)數(shù)組中,最大的值的二進(jìn)制有效位數(shù),如上面例子中的就是10。
- block:把用來(lái)存儲(chǔ)壓縮編碼后的value的空間分成一個(gè)個(gè)block,block大小一般是long或者byte。
Lucene中根據(jù)一個(gè)value編碼壓縮之后的存儲(chǔ)是否跨越多個(gè)block,分為兩類壓縮編解碼。下面我們以上面的例子來(lái)介紹這兩種編解碼的區(qū)別。
假設(shè)我們block的大小是int,也就是一個(gè)block是32位。在上面的例子中,每個(gè)value需要10bits存儲(chǔ)空間,所以一個(gè)int中能完整存儲(chǔ)3個(gè)value之后,還剩下2bits,兩個(gè)編碼的區(qū)別就是是否要使用這2bits,具體結(jié)果如下圖所示:
從上面的例子中,我們總結(jié)下兩種編碼的特點(diǎn):
單個(gè)value跨block(BulkOperationPacked和BulkOperationPacked*,*是從1-24。)
- 除了最后一個(gè)block,沒(méi)有浪費(fèi)存儲(chǔ)空間
- 增加解碼的復(fù)雜度
實(shí)現(xiàn)類:
bitsPerValue>24bitsPerValue>24bitsPerValue>24時(shí),使用BulkOperationPacked
bitsPerValue∈[1,24]bitsPerValue\in[1,24]bitsPerValue∈[1,24]時(shí),使用BulkOperationPacked*,*是從1到24。
單個(gè)value不跨block
- 存在空間浪費(fèi),浪費(fèi)程度和bitsPerValue有關(guān)
- 解碼比較簡(jiǎn)單
- 實(shí)現(xiàn)類:BulkOperationPackedSingleBlock
這兩種不同的編碼方式在Lucene中用兩個(gè)枚舉類來(lái)區(qū)分,PACKED和PACKED_SINGLE_BLOCK。
public enum Format { // 所有的值都是連續(xù)存儲(chǔ)在一起的,允許一個(gè)value編碼結(jié)果跨越多個(gè)block PACKED(0) { // 忽略不相關(guān)的代碼 }, @Deprecated // 一個(gè)value一個(gè)block PACKED_SINGLE_BLOCK(1) { // 忽略不相關(guān)的代碼 }; // 忽略不相關(guān)的代碼 }
特別需要注意的是,在9.0.0版本之前,Lucene中是嚴(yán)格實(shí)現(xiàn)了這兩種策略,但是從9.0.0開始,Lucene廢棄了PACKED_SINGLE_BLOCK,因此在本文中,我們重點(diǎn)介紹PACKED。
總覽
Lucene中對(duì)批量的正整數(shù)壓縮編碼設(shè)置了幾種可選的壓縮模式,有不同的內(nèi)存浪費(fèi)和不同的編解碼速度,實(shí)際上就是用空間換時(shí)間程度的差異:
- FASTEST:這種編碼速度允許浪費(fèi)7倍的內(nèi)存,是編碼速度最快的一種模式,在實(shí)現(xiàn)上基本就是一個(gè)block一個(gè)value。
- FAST:這種編碼速度允許浪費(fèi)50%的內(nèi)存,會(huì)選擇一種比較合適的bitsPerValue來(lái)獲取速度較快的編碼器。
- DEFAULT:這種編碼速度允許浪費(fèi)25%的內(nèi)存,編碼速度中規(guī)中矩。
- COMPACT:這種編碼不允許浪費(fèi)任何內(nèi)存,編碼速度是最慢的。
在PackedInts中有個(gè)工具方法fastestFormatAndBits,接受要編碼的數(shù)據(jù)量,bitsPerValue,以及預(yù)期可浪費(fèi)的最大內(nèi)存來(lái)調(diào)整bitsPerValue,從而獲得當(dāng)前條件下最快的編碼器。
// acceptableOverheadRatio:允許編碼結(jié)果浪費(fèi)的內(nèi)存比率 public static FormatAndBits fastestFormatAndBits( int valueCount, int bitsPerValue, float acceptableOverheadRatio) { if (valueCount == -1) { // 如果不清楚要編碼的數(shù)據(jù)量,就把valueCount設(shè)為-1,會(huì)按最大的數(shù)量來(lái)估計(jì) valueCount = Integer.MAX_VALUE; } // acceptableOverheadRatio不能小于COMPACT acceptableOverheadRatio = Math.max(COMPACT, acceptableOverheadRatio); // acceptableOverheadRatio不能超過(guò)FASTEST acceptableOverheadRatio = Math.min(FASTEST, acceptableOverheadRatio); // 每個(gè)value可以允許浪費(fèi)的比例 float acceptableOverheadPerValue = acceptableOverheadRatio * bitsPerValue; // 每個(gè)value最大可能使用的bit數(shù) int maxBitsPerValue = bitsPerValue + (int) acceptableOverheadPerValue; int actualBitsPerValue = -1; // block一般是int或者是long,所以8,16,32,64的編碼速度是比較快的 if (bitsPerValue <= 8 && maxBitsPerValue >= 8) { actualBitsPerValue = 8; } else if (bitsPerValue <= 16 && maxBitsPerValue >= 16) { actualBitsPerValue = 16; } else if (bitsPerValue <= 32 && maxBitsPerValue >= 32) { actualBitsPerValue = 32; } else if (bitsPerValue <= 64 && maxBitsPerValue >= 64) { actualBitsPerValue = 64; } else { actualBitsPerValue = bitsPerValue; } // 強(qiáng)制使用Format.PACKED,已經(jīng)不使用PACKED_SINGLE_BLOCK了 return new FormatAndBits(Format.PACKED, actualBitsPerValue); }
有了format和bitsPerValue之后,BulkOperation中使用策略模式實(shí)現(xiàn)了獲取相應(yīng)編碼器的邏輯:
public static Encoder getEncoder(Format format, int version, int bitsPerValue) { checkVersion(version); return BulkOperation.of(format, bitsPerValue); }
可以看到策略模式中就是獲取bitsPerValue對(duì)應(yīng)的提前創(chuàng)建好的編解碼器:
public static BulkOperation of(PackedInts.Format format, int bitsPerValue) { switch (format) { case PACKED: assert packedBulkOps[bitsPerValue - 1] != null; return packedBulkOps[bitsPerValue - 1]; case PACKED_SINGLE_BLOCK: // 棄用,不用看了 assert packedSingleBlockBulkOps[bitsPerValue - 1] != null; return packedSingleBlockBulkOps[bitsPerValue - 1]; default: throw new AssertionError(); } }
對(duì)于bitsPerValue的每個(gè)可能值,都有一個(gè)對(duì)應(yīng)的編碼器:
private static final BulkOperation[] packedBulkOps = new BulkOperation[] { new BulkOperationPacked1(), new BulkOperationPacked2(), new BulkOperationPacked3(), new BulkOperationPacked4(), new BulkOperationPacked5(), new BulkOperationPacked6(), new BulkOperationPacked7(), new BulkOperationPacked8(), new BulkOperationPacked9(), new BulkOperationPacked10(), new BulkOperationPacked11(), new BulkOperationPacked12(), new BulkOperationPacked13(), new BulkOperationPacked14(), new BulkOperationPacked15(), new BulkOperationPacked16(), new BulkOperationPacked17(), new BulkOperationPacked18(), new BulkOperationPacked19(), new BulkOperationPacked20(), new BulkOperationPacked21(), new BulkOperationPacked22(), new BulkOperationPacked23(), new BulkOperationPacked24(), new BulkOperationPacked(25), new BulkOperationPacked(26), new BulkOperationPacked(27), new BulkOperationPacked(28), new BulkOperationPacked(29), new BulkOperationPacked(30), new BulkOperationPacked(31), new BulkOperationPacked(32), new BulkOperationPacked(33), new BulkOperationPacked(34), new BulkOperationPacked(35), new BulkOperationPacked(36), new BulkOperationPacked(37), new BulkOperationPacked(38), new BulkOperationPacked(39), new BulkOperationPacked(40), new BulkOperationPacked(41), new BulkOperationPacked(42), new BulkOperationPacked(43), new BulkOperationPacked(44), new BulkOperationPacked(45), new BulkOperationPacked(46), new BulkOperationPacked(47), new BulkOperationPacked(48), new BulkOperationPacked(49), new BulkOperationPacked(50), new BulkOperationPacked(51), new BulkOperationPacked(52), new BulkOperationPacked(53), new BulkOperationPacked(54), new BulkOperationPacked(55), new BulkOperationPacked(56), new BulkOperationPacked(57), new BulkOperationPacked(58), new BulkOperationPacked(59), new BulkOperationPacked(60), new BulkOperationPacked(61), new BulkOperationPacked(62), new BulkOperationPacked(63), new BulkOperationPacked(64), };
前面我們已經(jīng)說(shuō)過(guò)了,PACKED_SINGLE_BLOCK被棄用了,因此我們下面重點(diǎn)介紹PACKED的情況。
編解碼
上面的BulkOperation中預(yù)置的編解碼器看著很復(fù)雜,其實(shí)是比較簡(jiǎn)單的。從名字中帶BulkOperation就知道,這是對(duì)數(shù)據(jù)的批量操作,也就是對(duì)數(shù)據(jù)的批量編解碼,下面我們來(lái)詳細(xì)介紹相關(guān)的類的邏輯。
BulkOperation
在BulkOperation中除了預(yù)置所有bitsPerValue對(duì)應(yīng)的編解碼器和根據(jù)bitsPerValue獲取對(duì)應(yīng)的編解碼器之外,還有一個(gè)比較重要的方法,涉及到一個(gè)概念:iteration。
在理解iteration之前,我們先想想,如果要編碼的數(shù)據(jù)量特別大,我們一次性全加載進(jìn)內(nèi)存,豈不是會(huì)占用很大的內(nèi)存空間,甚至可能出現(xiàn)OOM。因此,需要控制編碼過(guò)程中內(nèi)存的使用情況,這就涉及到了iteration。
各個(gè)編碼器在自己內(nèi)部會(huì)限定每個(gè)iteration最多能夠處理的value個(gè)數(shù),通過(guò)下面的方法可以算出在一定的內(nèi)存限制條件下最多可以使用多少個(gè)iteration,這樣就可以間接知道一次編碼可以處理多少個(gè)value了。
// ramBudget限制在編碼過(guò)程中使用的內(nèi)存上限,單位是字節(jié) public final int computeIterations(int valueCount, int ramBudget) { // byteBlockCount():一個(gè)iteration需要幾個(gè)byte存儲(chǔ)編碼后的結(jié)果 // byteValueCount(): 一個(gè)iteration需要幾個(gè)byte存儲(chǔ)待編碼的value個(gè)數(shù),value是都按long類型處理 // 這里先算出來(lái)ramBudget可以支撐的最大的iteration,并沒(méi)有考慮實(shí)際的valueCount,后面再調(diào)整 final int iterations = ramBudget / (byteBlockCount() + 8 * byteValueCount()); if (iterations == 0) { // 至少是1 return 1; } else if ((iterations - 1) * byteValueCount() >= valueCount) { // 如果不需要那么多的迭代次數(shù) // 根據(jù)真實(shí)的valueCount計(jì)算合適的iteration return (int) Math.ceil((double) valueCount / byteValueCount()); } else { return iterations; } }
BulkOperationPacked
BulkOperationPacked是一個(gè)通用的編解碼器實(shí)現(xiàn),它支持所有的bitsPerValue。
成員變量
需要重點(diǎn)理解下longBlockCount,longValueCount,byteBlockCount,byteValueCount四個(gè)變量的意思,具體這四個(gè)變量是得來(lái)的,邏輯在構(gòu)造器中。
private final int bitsPerValue; // 當(dāng)block是long的時(shí)候,一個(gè)iteration最少會(huì)使用幾個(gè)block private final int longBlockCount; // 一個(gè)iteration中,使用longBlockCount個(gè)block可以編碼的value數(shù)量 private final int longValueCount; // 當(dāng)block是byte的時(shí)候,一個(gè)iteration最少會(huì)使用幾個(gè)block private final int byteBlockCount; // 一個(gè)iteration中,使用byteBlockCount個(gè)block可以編碼的value數(shù)量 private final int byteValueCount; // 用來(lái)獲取value中有效位的掩碼 private final long mask; // mask的int格式 private final int intMask;
構(gòu)造器
在構(gòu)造器中,根據(jù)固定的規(guī)則計(jì)算longBlockCount,longValueCount,byteBlockCount,byteValueCount。
這里我不知道為什么使用這樣的規(guī)則,如果有朋友知道其中奧秘的可以評(píng)論分享。
public BulkOperationPacked(int bitsPerValue) { this.bitsPerValue = bitsPerValue; assert bitsPerValue > 0 && bitsPerValue <= 64; int blocks = bitsPerValue; while ((blocks & 1) == 0) { // 如果是2的倍數(shù)則除以2,直到不是2的倍數(shù) blocks >>>= 1; } // 一次迭代用幾個(gè)long this.longBlockCount = blocks; // 一次迭代可以處理幾個(gè)value this.longValueCount = 64 * longBlockCount / bitsPerValue; // 一次迭代用幾個(gè)byte int byteBlockCount = 8 * longBlockCount; // 一次迭代用處理幾個(gè)value int byteValueCount = longValueCount; while ((byteBlockCount & 1) == 0 && (byteValueCount & 1) == 0) { // 縮小成直到不是2的倍數(shù) byteBlockCount >>>= 1; byteValueCount >>>= 1; } this.byteBlockCount = byteBlockCount; this.byteValueCount = byteValueCount; // 算掩碼 if (bitsPerValue == 64) { this.mask = ~0L; } else { this.mask = (1L << bitsPerValue) - 1; } this.intMask = (int) mask; assert longValueCount * bitsPerValue == 64 * longBlockCount; }
編碼
根據(jù)block大小和要處理的value的類型的不同,一共有四個(gè)編碼方法:
以long為block的單位,編碼long數(shù)組
public void encode( long[] values, int valuesOffset, long[] blocks, int blocksOffset, int iterations) { // 初始化下一個(gè)block的值 long nextBlock = 0; // 當(dāng)前block中還剩下多少位可以用來(lái)編碼 int bitsLeft = 64; // longValueCount * iterations:總共可以處理的數(shù)據(jù) for (int i = 0; i < longValueCount * iterations; ++i) { // 當(dāng)前block使用bitsPerValue來(lái)進(jìn)行編碼 bitsLeft -= bitsPerValue; if (bitsLeft > 0) { // 當(dāng)前block放下當(dāng)前value后還有空間 // 把當(dāng)前value放到當(dāng)前block中 nextBlock |= values[valuesOffset++] << bitsLeft; } else if (bitsLeft == 0) { // 剛好可以存放最后一個(gè)數(shù)據(jù)的編碼結(jié)果 nextBlock |= values[valuesOffset++]; // 最后一個(gè)數(shù)據(jù)的編碼結(jié)果放在nextBlock中 blocks[blocksOffset++] = nextBlock; // 存儲(chǔ)block nextBlock = 0; // 重置block bitsLeft = 64; // 重置bitsLeft } else { // 需要跨block處理 nextBlock |= values[valuesOffset] >>> -bitsLeft; // 當(dāng)前block剩下的空間存儲(chǔ)部分value blocks[blocksOffset++] = nextBlock; // (1L << -bitsLeft) - 1):取出value剩余數(shù)據(jù)的掩碼 // (values[valuesOffset++] & ((1L << -bitsLeft) - 1)): 取出剩余數(shù)據(jù) // << (64 + bitsLeft):放到最前面 nextBlock = (values[valuesOffset++] & ((1L << -bitsLeft) - 1)) << (64 + bitsLeft); // 更新block可用空間 bitsLeft += 64; } } }
以long為block的單位,編碼int數(shù)組
和上面的方法邏輯其實(shí)一模一樣,只是在處理value的時(shí)候,先和0xFFFFFFFFL做按位與運(yùn)算轉(zhuǎn)成long。
public void encode( int[] values, int valuesOffset, long[] blocks, int blocksOffset, int iterations) { long nextBlock = 0; int bitsLeft = 64; for (int i = 0; i < longValueCount * iterations; ++i) { bitsLeft -= bitsPerValue; if (bitsLeft > 0) { nextBlock |= (values[valuesOffset++] & 0xFFFFFFFFL) << bitsLeft; } else if (bitsLeft == 0) { nextBlock |= (values[valuesOffset++] & 0xFFFFFFFFL); blocks[blocksOffset++] = nextBlock; nextBlock = 0; bitsLeft = 64; } else { // bitsLeft < 0 nextBlock |= (values[valuesOffset] & 0xFFFFFFFFL) >>> -bitsLeft; blocks[blocksOffset++] = nextBlock; nextBlock = (values[valuesOffset++] & ((1L << -bitsLeft) - 1)) << (64 + bitsLeft); bitsLeft += 64; } } }
以byte為block的單位,編碼long數(shù)組
public void encode( long[] values, int valuesOffset, byte[] blocks, int blocksOffset, int iterations) { int nextBlock = 0; int bitsLeft = 8; for (int i = 0; i < byteValueCount * iterations; ++i) { final long v = values[valuesOffset++]; assert PackedInts.unsignedBitsRequired(v) <= bitsPerValue; if (bitsPerValue < bitsLeft) { // 如果當(dāng)前block還可以容納value的編碼 nextBlock |= v << (bitsLeft - bitsPerValue); bitsLeft -= bitsPerValue; } else { // 除了當(dāng)前block可以容納之外,還剩下多少bits int bits = bitsPerValue - bitsLeft; // 當(dāng)前block剩下的空間把value的高位部分先存 blocks[blocksOffset++] = (byte) (nextBlock | (v >>> bits)); while (bits >= 8) { // 如果還需要處理的bits超過(guò)一個(gè)byte bits -= 8; blocks[blocksOffset++] = (byte) (v >>> bits); } // 剩下的bits不足一個(gè)byte bitsLeft = 8 - bits; // (v & ((1L << bits) - 1):取出當(dāng)前value的高bits位 // 存放在nextBlock的高bits位 nextBlock = (int) ((v & ((1L << bits) - 1)) << bitsLeft); } } assert bitsLeft == 8; }
以byte為block的單位,編碼int數(shù)組
和上面的方法邏輯一模一樣
public void encode( int[] values, int valuesOffset, byte[] blocks, int blocksOffset, int iterations) { int nextBlock = 0; int bitsLeft = 8; for (int i = 0; i < byteValueCount * iterations; ++i) { final int v = values[valuesOffset++]; assert PackedInts.bitsRequired(v & 0xFFFFFFFFL) <= bitsPerValue; if (bitsPerValue < bitsLeft) { nextBlock |= v << (bitsLeft - bitsPerValue); bitsLeft -= bitsPerValue; } else { int bits = bitsPerValue - bitsLeft; blocks[blocksOffset++] = (byte) (nextBlock | (v >>> bits)); while (bits >= 8) { bits -= 8; blocks[blocksOffset++] = (byte) (v >>> bits); } bitsLeft = 8 - bits; nextBlock = (v & ((1 << bits) - 1)) << bitsLeft; } } assert bitsLeft == 8; }
解碼
同樣地,和編碼一一對(duì)應(yīng),也有四個(gè)解碼方法:
以long為block的單位,解碼成long數(shù)組
public void decode( long[] blocks, int blocksOffset, long[] values, int valuesOffset, int iterations) { // 當(dāng)前block中還剩多少位未解碼 int bitsLeft = 64; for (int i = 0; i < longValueCount * iterations; ++i) { bitsLeft -= bitsPerValue; if (bitsLeft < 0) { // 跨block的解碼 values[valuesOffset++] = ((blocks[blocksOffset++] & ((1L << (bitsPerValue + bitsLeft)) - 1)) << -bitsLeft) | (blocks[blocksOffset] >>> (64 + bitsLeft)); bitsLeft += 64; } else { values[valuesOffset++] = (blocks[blocksOffset] >>> bitsLeft) & mask; } } }
以long為block的單位,解碼成int數(shù)組
邏輯和上面的方法一樣。
public void decode( long[] blocks, int blocksOffset, int[] values, int valuesOffset, int iterations) { if (bitsPerValue > 32) { throw new UnsupportedOperationException( "Cannot decode " + bitsPerValue + "-bits values into an int[]"); } int bitsLeft = 64; for (int i = 0; i < longValueCount * iterations; ++i) { bitsLeft -= bitsPerValue; if (bitsLeft < 0) { values[valuesOffset++] = (int) (((blocks[blocksOffset++] & ((1L << (bitsPerValue + bitsLeft)) - 1)) << -bitsLeft) | (blocks[blocksOffset] >>> (64 + bitsLeft))); bitsLeft += 64; } else { values[valuesOffset++] = (int) ((blocks[blocksOffset] >>> bitsLeft) & mask); } } }
以byte為block的單位,解碼成long數(shù)組
public void decode( byte[] blocks, int blocksOffset, long[] values, int valuesOffset, int iterations) { long nextValue = 0L; int bitsLeft = bitsPerValue; for (int i = 0; i < iterations * byteBlockCount; ++i) { final long bytes = blocks[blocksOffset++] & 0xFFL; if (bitsLeft > 8) { bitsLeft -= 8; nextValue |= bytes << bitsLeft; } else { int bits = 8 - bitsLeft; values[valuesOffset++] = nextValue | (bytes >>> bits); while (bits >= bitsPerValue) { bits -= bitsPerValue; values[valuesOffset++] = (bytes >>> bits) & mask; } bitsLeft = bitsPerValue - bits; nextValue = (bytes & ((1L << bits) - 1)) << bitsLeft; } } assert bitsLeft == bitsPerValue; }
以byte為block的單位,解碼成int數(shù)組
public void decode( byte[] blocks, int blocksOffset, int[] values, int valuesOffset, int iterations) { int nextValue = 0; int bitsLeft = bitsPerValue; for (int i = 0; i < iterations * byteBlockCount; ++i) { final int bytes = blocks[blocksOffset++] & 0xFF; if (bitsLeft > 8) { bitsLeft -= 8; nextValue |= bytes << bitsLeft; } else { int bits = 8 - bitsLeft; values[valuesOffset++] = nextValue | (bytes >>> bits); while (bits >= bitsPerValue) { bits -= bitsPerValue; values[valuesOffset++] = (bytes >>> bits) & intMask; } bitsLeft = bitsPerValue - bits; nextValue = (bytes & ((1 << bits) - 1)) << bitsLeft; } } assert bitsLeft == bitsPerValue; }
BulkOperationPacked*
BulkOperationPacked中的實(shí)現(xiàn)支持所有bitsPerValue的值,BulkOperationPacked*繼承了BulkOperationPacked,對(duì)于解碼的實(shí)現(xiàn)針對(duì)特定的bitsPerValue有了特定的實(shí)現(xiàn),性能比通用實(shí)現(xiàn)更高效(猜測(cè)是特殊實(shí)現(xiàn)沒(méi)有分支,避免了分支預(yù)測(cè)錯(cuò)誤的性能損耗),那為什么只對(duì)解碼做針對(duì)實(shí)現(xiàn),編碼怎么不一起實(shí)現(xiàn)?編碼也是可以做針對(duì)實(shí)現(xiàn)的,但是編碼只在構(gòu)建索引的時(shí)候使用一次,而解碼是會(huì)多次使用的,因此只對(duì)解碼做了特殊實(shí)現(xiàn)。那為什么只對(duì)bitsPerValue是25以下的做特殊實(shí)現(xiàn),這個(gè)不清楚,25以上不常見?
雖然Lucene中實(shí)現(xiàn)了24個(gè)BulkOperationPacked*,但是所有BulkOperationPacked*的主要邏輯都相同:一個(gè)iteration可以處理幾個(gè)value產(chǎn)生幾個(gè)block是已知的(在BulkOperationPacked的構(gòu)造器中計(jì)算得到),然后直接從這些block解碼得到所有的value,下面我們看一個(gè)例子就好:
當(dāng)bitsPerValue=2的時(shí)候,我們可以知道:
longBlockCount=1
longValueCount=32
byteBlockCount=1
byteValueCount=4
所以當(dāng)以long為block的單位時(shí),一個(gè)iteration可以從1個(gè)block中解析出32個(gè)value,當(dāng)以byte為block的單位是,可以從1個(gè)block中解析出4個(gè)value。
final class BulkOperationPacked2 extends BulkOperationPacked { public BulkOperationPacked2() { super(2); } @Override public void decode( long[] blocks, int blocksOffset, int[] values, int valuesOffset, int iterations) { for (int i = 0; i < iterations; ++i) { // 只會(huì)用到一個(gè)block final long block = blocks[blocksOffset++]; // 需要解析出32個(gè)value for (int shift = 62; shift >= 0; shift -= 2) { values[valuesOffset++] = (int) ((block >>> shift) & 3); } } } @Override public void decode( byte[] blocks, int blocksOffset, int[] values, int valuesOffset, int iterations) { for (int j = 0; j < iterations; ++j) { // 只會(huì)用到一個(gè)block final byte block = blocks[blocksOffset++]; // 需要解析出4個(gè)value values[valuesOffset++] = (block >>> 6) & 3; values[valuesOffset++] = (block >>> 4) & 3; values[valuesOffset++] = (block >>> 2) & 3; values[valuesOffset++] = block & 3; } } @Override public void decode( long[] blocks, int blocksOffset, long[] values, int valuesOffset, int iterations) { for (int i = 0; i < iterations; ++i) { // 只會(huì)用到一個(gè)block final long block = blocks[blocksOffset++]; // 需要解析出32個(gè)value for (int shift = 62; shift >= 0; shift -= 2) { values[valuesOffset++] = (block >>> shift) & 3; } } } @Override public void decode( byte[] blocks, int blocksOffset, long[] values, int valuesOffset, int iterations) { for (int j = 0; j < iterations; ++j) { // 只會(huì)用到一個(gè)block final byte block = blocks[blocksOffset++]; // 需要解析出4個(gè)value values[valuesOffset++] = (block >>> 6) & 3; values[valuesOffset++] = (block >>> 4) & 3; values[valuesOffset++] = (block >>> 2) & 3; values[valuesOffset++] = block & 3; } } }
應(yīng)用
有了上面批量編解碼的基礎(chǔ)邏輯,我們?cè)倏纯椿诰幗獯a器有哪些應(yīng)用,而這些應(yīng)用之間有什么區(qū)別,各自又是針對(duì)什么場(chǎng)景的?
PackedWriter
PackedWriter是已知所要處理的數(shù)據(jù)的bitsPerValue,根據(jù)這個(gè)bitsPerValue獲取對(duì)應(yīng)的編碼器,因此所有的數(shù)據(jù)都使用一樣的編碼器。
PackedWriter適用于所有數(shù)據(jù)量大小差不多的情況,如果數(shù)據(jù)中存在少量值比較大的,則會(huì)影響壓縮效果,因?yàn)樗衯alue是按最大的值來(lái)計(jì)算bitsPerValue的。
final class PackedWriter extends PackedInts.Writer { // 是否構(gòu)建結(jié)束 boolean finished; // 9.0.0之后的版本都是Packed final PackedInts.Format format; // 根據(jù)format和bitsPerValue決定encoder final BulkOperation encoder; // 編碼結(jié)果的buffer final byte[] nextBlocks; // 待編碼的value的buffer final long[] nextValues; // 通過(guò)編碼器計(jì)算得到的iterations final int iterations; // nextValues可以寫入的位置 int off; // 已處理的value的個(gè)數(shù) int written; PackedWriter( PackedInts.Format format, DataOutput out, int valueCount, int bitsPerValue, int mem) { super(out, valueCount, bitsPerValue); this.format = format; encoder = BulkOperation.of(format, bitsPerValue); iterations = encoder.computeIterations(valueCount, mem); // 創(chuàng)建合適的大小 nextBlocks = new byte[iterations * encoder.byteBlockCount()]; nextValues = new long[iterations * encoder.byteValueCount()]; off = 0; written = 0; finished = false; } @Override protected PackedInts.Format getFormat() { return format; } @Override public void add(long v) throws IOException { if (valueCount != -1 && written >= valueCount) { throw new EOFException("Writing past end of stream"); } // 把value臨時(shí)存儲(chǔ)到nextValues中,等到編碼 nextValues[off++] = v; if (off == nextValues.length) { // 如果nextValues滿了,則進(jìn)行編碼 flush(); } ++written; } @Override public void finish() throws IOException { if (valueCount != -1) { // 如果數(shù)據(jù)沒(méi)有達(dá)到預(yù)期的數(shù)量,則補(bǔ)0 while (written < valueCount) { add(0L); } } flush(); finished = true; } private void flush() throws IOException { // 編碼 encoder.encode(nextValues, 0, nextBlocks, 0, iterations); // 以byte作為block的單位,一共幾個(gè)byte final int blockCount = (int) format.byteCount(PackedInts.VERSION_CURRENT, off, bitsPerValue); // 編碼的結(jié)果持久化 out.writeBytes(nextBlocks, blockCount); // 清空nextValues Arrays.fill(nextValues, 0L); off = 0; } @Override public int ord() { return written - 1; } }
分段處理
需要說(shuō)明下按照類名原本應(yīng)該是分block的,但是在我們底層編解碼中涉及到block的概念,如果這里再來(lái)一個(gè)block,則可能會(huì)混淆,所以在這里我們可以理解是把要處理的數(shù)據(jù)流分成多段處理。PackedWriter其實(shí)也 可以理解是分段的(設(shè)定內(nèi)存閾值下可以處理的數(shù)據(jù)量),但是它每段都是使用一樣的編碼器,而這個(gè)小節(jié)要介紹的,每一段是可以使用不一樣的編碼器。
AbstractBlockPackedWriter
前面我們介紹PackedWriter的時(shí)候說(shuō),如果大批量數(shù)據(jù)中存在少量的數(shù)值較大的value會(huì)影響壓縮效果。針對(duì)這種情況的處理,Lucene按分段處理,每段包含部分value,如果存在少量的數(shù)值較大,則只會(huì)影響其所在的段中,不會(huì)影響所有的數(shù)據(jù)的編碼。具體我們看下源碼,注意下面的源碼中忽略一些無(wú)關(guān)緊要的代碼:
abstract class AbstractBlockPackedWriter { // 用來(lái)持久化編碼結(jié)果的 protected DataOutput out; // 暫存待編碼的value protected final long[] values; // 存儲(chǔ)編碼后的block protected byte[] blocks; protected int off; // 處理的第幾個(gè)value protected long ord; protected boolean finished; // blockSize:段大小 protected AbstractBlockPackedWriter(DataOutput out, int blockSize) { checkBlockSize(blockSize, MIN_BLOCK_SIZE, MAX_BLOCK_SIZE); reset(out); values = new long[blockSize]; } public void add(long l) throws IOException { checkNotFinished(); if (off == values.length) { // 如果values滿了,也就是已經(jīng)收集到了一段數(shù)據(jù)了,則進(jìn)行編碼 flush(); } values[off++] = l; ++ord; } public void finish() throws IOException { checkNotFinished(); if (off > 0) { // 如果有數(shù)據(jù)則flush flush(); } finished = true; } // flush中計(jì)算的就是當(dāng)前段的bitsPerValue,由子類實(shí)現(xiàn) protected abstract void flush() throws IOException; // 執(zhí)行編碼,在flush中被調(diào)用,bitsRequired是flush中計(jì)算得到的 protected final void writeValues(int bitsRequired) throws IOException { // 每一段都有自己的編碼器 final PackedInts.Encoder encoder = PackedInts.getEncoder(PackedInts.Format.PACKED, PackedInts.VERSION_CURRENT, bitsRequired); final int iterations = values.length / encoder.byteValueCount(); final int blockSize = encoder.byteBlockCount() * iterations; if (blocks == null || blocks.length < blockSize) { blocks = new byte[blockSize]; } if (off < values.length) { Arrays.fill(values, off, values.length, 0L); } encoder.encode(values, 0, blocks, 0, iterations); final int blockCount = (int) PackedInts.Format.PACKED.byteCount(PackedInts.VERSION_CURRENT, off, bitsRequired); // 持久化編碼后的數(shù)據(jù) out.writeBytes(blocks, blockCount); } }
AbstractBlockPackedWriter有兩個(gè)實(shí)現(xiàn)類,分別是通用實(shí)現(xiàn)和處理單調(diào)遞增數(shù)據(jù)的,其實(shí)就是針對(duì)不同的場(chǎng)景計(jì)算bitsPerValue的邏輯不一樣。
BlockPackedWriter
BlockPackedWriter是通用的分段實(shí)現(xiàn),它先找出最小值,所有待編碼的value都減去最小值,這樣就可以縮小bitsPerValue,達(dá)到更優(yōu)的編碼效果。
public final class BlockPackedWriter extends AbstractBlockPackedWriter { public BlockPackedWriter(DataOutput out, int blockSize) { super(out, blockSize); } @Override protected void flush() throws IOException { long min = Long.MAX_VALUE, max = Long.MIN_VALUE; for (int i = 0; i < off; ++i) { // 尋找最大值和最小值 min = Math.min(values[i], min); max = Math.max(values[i], max); } // 最大值和最小值的差值 final long delta = max - min; // 差值的bitsRequired,下面會(huì)把所有的value更新為value和min的差值,所以bitsRequired就是更新后的bitsPerValue int bitsRequired = delta == 0 ? 0 : PackedInts.unsignedBitsRequired(delta); if (bitsRequired == 64) { // 如果最大和最小的差值都需要64bits來(lái)存儲(chǔ),那就不用去更新value了 min = 0L; } else if (min > 0L) { // 讓min盡量小,因?yàn)閙in會(huì)用VLong編碼,這樣盡可能使用更少的字節(jié) min = Math.max(0L, max - PackedInts.maxValue(bitsRequired)); } // bitsRequired左移一位,最后一位是標(biāo)記位,標(biāo)記min是否是0 final int token = (bitsRequired << BPV_SHIFT) | (min == 0 ? MIN_VALUE_EQUALS_0 : 0); out.writeByte((byte) token); if (min != 0) { // VLong編碼min writeVLong(out, zigZagEncode(min) - 1); } if (bitsRequired > 0) { if (min != 0) { for (int i = 0; i < off; ++i) { // 所有的value更新為與最小值的差值 values[i] -= min; } } // 編碼并持久化 writeValues(bitsRequired); } off = 0; } }
MonotonicBlockPackedWriter
MonotonicBlockPackedWriter處理的是數(shù)據(jù)是遞增的情況,MonotonicBlockPackedWriter先算每?jī)蓚€(gè)value之間的平均步長(zhǎng),通過(guò)這個(gè)平均步長(zhǎng)就可以計(jì)算每個(gè)位置的期望expectValue,然后把value都更新為真實(shí)value和期望expectValue的差值,這樣可能會(huì)有負(fù)數(shù)的value,所以需要一步是找出理論上的最小值min,保證所有的value和expectValue差值都是正數(shù)。
public final class MonotonicBlockPackedWriter extends AbstractBlockPackedWriter { public MonotonicBlockPackedWriter(DataOutput out, int blockSize) { super(out, blockSize); } @Override protected void flush() throws IOException { // 計(jì)算平均步長(zhǎng) final float avg = off == 1 ? 0f : (float) (values[off - 1] - values[0]) / (off - 1); long min = values[0]; for (int i = 1; i < off; ++i) { // 計(jì)算出期望的最小值,保證根據(jù)平均步長(zhǎng)計(jì)算每個(gè)位置的期望值都是正數(shù) final long actual = values[i]; // 相同位置的期望value final long expected = expected(min, avg, i); if (expected > actual) { // 如果期望值大于實(shí)際值 // 調(diào)小期望的最小值 min -= (expected - actual); } } // 最大差值,用來(lái)計(jì)算bitsPerValue的 long maxDelta = 0; for (int i = 0; i < off; ++i) { // 所有的值都更新為和期望值的差值 values[i] = values[i] - expected(min, avg, i); maxDelta = Math.max(maxDelta, values[i]); } out.writeZLong(min); out.writeInt(Float.floatToIntBits(avg)); if (maxDelta == 0) { // 所有的值都一樣 out.writeVInt(0); } else { final int bitsRequired = PackedInts.bitsRequired(maxDelta); out.writeVInt(bitsRequired); // 編碼并持久化 writeValues(bitsRequired); } off = 0; } }
DirectWriter
DirectWriter比較特殊,它沒(méi)有使用BulkOperationPacked系列的編碼器。DirectWriter綜合考慮空間浪費(fèi)和編碼解碼的速度,并沒(méi)有支持全部的bitsPerValue,它支持的bitsPerValue的列表是:1, 2, 4, 8, 12, 16, 20, 24, 28, 32, 40, 48, 56, 64。
在存儲(chǔ)的時(shí)候,按照bitsPerValue不同的值分為3種處理邏輯,每種處理邏輯的block都是不一樣的:
bitsPerValue是8的倍數(shù)時(shí): 8, 16, 24, 32, 40, 48, 56, 64
- 40,48,56,64的時(shí)候,用long來(lái)存儲(chǔ)一個(gè)value,此時(shí)block就是long,并且一個(gè)block中只有一個(gè)value。
- 24,32的時(shí)候,用int來(lái)存儲(chǔ)一個(gè)value,此時(shí)block就是int,并且一個(gè)block中只有一個(gè)value。
- 16的時(shí)候,用short來(lái)存儲(chǔ)一個(gè)value,此時(shí)block就是short,并且一個(gè)block中只有一個(gè)value。
- 8的時(shí)候,用byte來(lái)存儲(chǔ)一個(gè)value,此時(shí)block就是byte,并且一個(gè)block中只有一個(gè)value。
bitsPerValue是1,2,4
用long來(lái)存儲(chǔ)多個(gè)value,此時(shí)block就是long,一個(gè)block中有64/bitsPerValue個(gè)value。
bitsPerValue是12,20,28
- 12:用int存儲(chǔ)兩個(gè)value,此時(shí)block就是int,一個(gè)block中有兩個(gè)value。
- 20,28:用long存儲(chǔ)兩個(gè)value,此時(shí)block就是long,一個(gè)block中有兩個(gè)value。
那如果是其他bitsPerValue的值,DirectWriter會(huì)找上述支持的列表中大于bitsPerValue的第一個(gè)來(lái)進(jìn)行處理,無(wú)非是壓縮率低一些。比如當(dāng)前的bitsPerValue=17,則DirectWriter會(huì)使用20的方案來(lái)處理。
public final class DirectWriter { // 每個(gè)value使用幾個(gè)bit存儲(chǔ) final int bitsPerValue; // 有幾個(gè)需要存儲(chǔ)的value final long numValues; // 數(shù)據(jù)存儲(chǔ)的目的地 final DataOutput output; // 統(tǒng)計(jì)已經(jīng)處理了幾個(gè)數(shù)據(jù) long count; // 是否已經(jīng)處理完成了 boolean finished; // nextValues的下一個(gè)寫入的位置 int off; // 存儲(chǔ)block final byte[] nextBlocks; // 臨時(shí)存儲(chǔ)數(shù)據(jù),待構(gòu)建成block final long[] nextValues; // DirectWriter支持的bitsPerValue參數(shù)列表 static final int[] SUPPORTED_BITS_PER_VALUE = new int[] {1, 2, 4, 8, 12, 16, 20, 24, 28, 32, 40, 48, 56, 64}; DirectWriter(DataOutput output, long numValues, int bitsPerValue) { this.output = output; this.numValues = numValues; this.bitsPerValue = bitsPerValue; // 可用的內(nèi)存限制,是nextBlocks和nextValues兩個(gè)數(shù)組合起來(lái)的大小限制 final int memoryBudgetInBits = Math.multiplyExact(Byte.SIZE, PackedInts.DEFAULT_BUFFER_SIZE); // nextValues的大小 int bufferSize = memoryBudgetInBits / (Long.SIZE + bitsPerValue); // 把bufferSize調(diào)整成下一個(gè)64的倍數(shù)的值 bufferSize = Math.toIntExact(bufferSize + 63) & 0xFFFFFFC0; nextValues = new long[bufferSize]; // add 7 bytes in the end so that any value could be written as a long nextBlocks = new byte[bufferSize * bitsPerValue / Byte.SIZE + Long.BYTES - 1]; } // 新增一個(gè)value public void add(long l) throws IOException { // 校驗(yàn)是否超出了一開始設(shè)定的value總數(shù) if (count >= numValues) { throw new EOFException("Writing past end of stream"); } // 把數(shù)據(jù)暫存在nextValues中 nextValues[off++] = l; // 如果nextValues滿了,則進(jìn)行flush編碼 if (off == nextValues.length) { flush(); } // 處理的value數(shù)+1 count++; } // 構(gòu)建一個(gè)block private void flush() throws IOException { if (off == 0) { return; } // 把off之后的所有位置設(shè)為0 Arrays.fill(nextValues, off, nextValues.length, 0L); // 將nextValues中的數(shù)據(jù)編碼存儲(chǔ)到nextBlocks中 encode(nextValues, off, nextBlocks, bitsPerValue); // 一共需要幾個(gè)byte存儲(chǔ)所有編碼好的數(shù)據(jù) final int blockCount = (int) PackedInts.Format.PACKED.byteCount(PackedInts.VERSION_CURRENT, off, bitsPerValue); // 存儲(chǔ)編碼后的數(shù)據(jù) output.writeBytes(nextBlocks, blockCount); off = 0; } private static void encode(long[] nextValues, int upTo, byte[] nextBlocks, int bitsPerValue) { if ((bitsPerValue & 7) == 0) {// bitsPerValue的值是8的倍數(shù): 8, 16, 24, 32, 40, 48, 56, 64 // 一個(gè)value需要幾個(gè)byte final int bytesPerValue = bitsPerValue / Byte.SIZE; // i是nextValues中的下標(biāo) // o是在nextBlocks中下一個(gè)寫入的下標(biāo) for (int i = 0, o = 0; i < upTo; ++i, o += bytesPerValue) { final long l = nextValues[i]; if (bitsPerValue > Integer.SIZE) { // 大于32位的都用long存 BitUtil.VH_LE_LONG.set(nextBlocks, o, l); } else if (bitsPerValue > Short.SIZE) { // 24,32都用int存 BitUtil.VH_LE_INT.set(nextBlocks, o, (int) l); } else if (bitsPerValue > Byte.SIZE) { // 16用short存 BitUtil.VH_LE_SHORT.set(nextBlocks, o, (short) l); } else { // 8用byte存 nextBlocks[o] = (byte) l; } } } else if (bitsPerValue < 8) {// bitsPerValue的值是 1, 2, 4 // 一個(gè)long可以存幾個(gè)value final int valuesPerLong = Long.SIZE / bitsPerValue; // o是在nextBlocks中的下一個(gè)可以寫入的位置 for (int i = 0, o = 0; i < upTo; i += valuesPerLong, o += Long.BYTES) { long v = 0; // 構(gòu)建一個(gè)block,一個(gè)block就是一個(gè)long for (int j = 0; j < valuesPerLong; ++j) { v |= nextValues[i + j] << (bitsPerValue * j); } BitUtil.VH_LE_LONG.set(nextBlocks, o, v); } } else {// bitsPerValue的值是 12, 20, 28,此時(shí)不是一個(gè)字節(jié)的整數(shù)倍,所以是2個(gè)2個(gè)處理 // 2個(gè)value需要幾個(gè)字節(jié) final int numBytesFor2Values = bitsPerValue * 2 / Byte.SIZE; // o是在nextBlocks中下一個(gè)可以寫入的位置 for (int i = 0, o = 0; i < upTo; i += 2, o += numBytesFor2Values) { final long l1 = nextValues[i]; final long l2 = nextValues[i + 1]; // 2個(gè)合成一個(gè) final long merged = l1 | (l2 << bitsPerValue); if (bitsPerValue <= Integer.SIZE / 2) { // 如果bitsPerValue=12,則以int的方式存兩個(gè)value BitUtil.VH_LE_INT.set(nextBlocks, o, (int) merged); } else { // bitsPerValue=20或者28,則以long的方式存兩個(gè)value BitUtil.VH_LE_LONG.set(nextBlocks, o, merged); } } } } public void finish() throws IOException { if (count != numValues) { throw new IllegalStateException( "Wrong number of values added, expected: " + numValues + ", got: " + count); } // 為nextValues中剩下的value編碼 flush(); // DirectWriter的編碼方案存在填充字節(jié) int paddingBitsNeeded; if (bitsPerValue > Integer.SIZE) { paddingBitsNeeded = Long.SIZE - bitsPerValue; } else if (bitsPerValue > Short.SIZE) { paddingBitsNeeded = Integer.SIZE - bitsPerValue; } else if (bitsPerValue > Byte.SIZE) { paddingBitsNeeded = Short.SIZE - bitsPerValue; } else { paddingBitsNeeded = 0; } final int paddingBytesNeeded = (paddingBitsNeeded + Byte.SIZE - 1) / Byte.SIZE; for (int i = 0; i < paddingBytesNeeded; i++) { output.writeByte((byte) 0); } finished = true; } // 獲取一個(gè)DirectWriter的實(shí)例 public static DirectWriter getInstance(DataOutput output, long numValues, int bitsPerValue) { if (Arrays.binarySearch(SUPPORTED_BITS_PER_VALUE, bitsPerValue) < 0) { throw new IllegalArgumentException( "Unsupported bitsPerValue " + bitsPerValue + ". Did you use bitsRequired?"); } return new DirectWriter(output, numValues, bitsPerValue); } // 如果bitsRequired是不支持的,則需要糾正 private static int roundBits(int bitsRequired) { int index = Arrays.binarySearch(SUPPORTED_BITS_PER_VALUE, bitsRequired); if (index < 0) { // 如果bitsRequired不是支持的參數(shù),則使用最接近且大于bitsRequired的參數(shù) return SUPPORTED_BITS_PER_VALUE[-index - 1]; } else { return bitsRequired; // 如果是支持的,則使用它自己 } } // 通過(guò)有符號(hào)的最大值計(jì)算需要的bitsPerValue public static int bitsRequired(long maxValue) { return roundBits(PackedInts.bitsRequired(maxValue)); } // 通過(guò)無(wú)符號(hào)的最大值計(jì)算需要的bitsPerValue public static int unsignedBitsRequired(long maxValue) { return roundBits(PackedInts.unsignedBitsRequired(maxValue)); } }
DirectMonotonicWriter
DirectMonotonicWriter對(duì)單調(diào)遞增的數(shù)據(jù)有預(yù)處理,邏輯和MonotonicBlockPackedWriter非常相似,編碼使用的是DirectWriter。
public final class DirectMonotonicWriter { public static final int MIN_BLOCK_SHIFT = 2; public static final int MAX_BLOCK_SHIFT = 22; final IndexOutput meta; final IndexOutput data; final long numValues; final long baseDataPointer; final long[] buffer; int bufferSize; long count; boolean finished; long previous = Long.MIN_VALUE; DirectMonotonicWriter(IndexOutput metaOut, IndexOutput dataOut, long numValues, int blockShift) { if (blockShift < MIN_BLOCK_SHIFT || blockShift > MAX_BLOCK_SHIFT) { throw new IllegalArgumentException( "blockShift must be in [" + MIN_BLOCK_SHIFT + "-" + MAX_BLOCK_SHIFT + "], got " + blockShift); } if (numValues < 0) { throw new IllegalArgumentException("numValues can't be negative, got " + numValues); } final long numBlocks = numValues == 0 ? 0 : ((numValues - 1) >>> blockShift) + 1; if (numBlocks > ArrayUtil.MAX_ARRAY_LENGTH) { throw new IllegalArgumentException( "blockShift is too low for the provided number of values: blockShift=" + blockShift + ", numValues=" + numValues + ", MAX_ARRAY_LENGTH=" + ArrayUtil.MAX_ARRAY_LENGTH); } this.meta = metaOut; this.data = dataOut; this.numValues = numValues; final int blockSize = 1 << blockShift; this.buffer = new long[(int) Math.min(numValues, blockSize)]; this.bufferSize = 0; this.baseDataPointer = dataOut.getFilePointer(); } private void flush() throws IOException { assert bufferSize != 0; // 估計(jì)數(shù)值遞增的平均步長(zhǎng) final float avgInc = (float) ((double) (buffer[bufferSize - 1] - buffer[0]) / Math.max(1, bufferSize - 1)); for (int i = 0; i < bufferSize; ++i) { // 按平均步長(zhǎng)算的當(dāng)前位置的期望值 final long expected = (long) (avgInc * (long) i); // 當(dāng)前真實(shí)值和期望值之差,注意此時(shí)buffer中可能存在負(fù)數(shù) buffer[i] -= expected; } // 尋找最小的值 long min = buffer[0]; for (int i = 1; i < bufferSize; ++i) { min = Math.min(buffer[i], min); } long maxDelta = 0; for (int i = 0; i < bufferSize; ++i) { buffer[i] -= min; // 經(jīng)過(guò)這一步,buffer中所有的都是正數(shù) maxDelta |= buffer[i]; } meta.writeLong(min); meta.writeInt(Float.floatToIntBits(avgInc)); meta.writeLong(data.getFilePointer() - baseDataPointer); if (maxDelta == 0) { // 經(jīng)過(guò)預(yù)處理之后,所有的值都相等 meta.writeByte((byte) 0); } else { // 使用DirectWriter來(lái)進(jìn)行編碼 final int bitsRequired = DirectWriter.unsignedBitsRequired(maxDelta); DirectWriter writer = DirectWriter.getInstance(data, bufferSize, bitsRequired); for (int i = 0; i < bufferSize; ++i) { writer.add(buffer[i]); } writer.finish(); meta.writeByte((byte) bitsRequired); } bufferSize = 0; } public void add(long v) throws IOException { if (v < previous) { // 如果不是遞增的,則不應(yīng)該使用DirectMonotonicWriter throw new IllegalArgumentException("Values do not come in order: " + previous + ", " + v); } if (bufferSize == buffer.length) { // buffer滿了 flush(); } buffer[bufferSize++] = v; previous = v; count++; } public void finish() throws IOException { if (count != numValues) { throw new IllegalStateException( "Wrong number of values added, expected: " + numValues + ", got: " + count); } if (finished) { throw new IllegalStateException("#finish has been called already"); } if (bufferSize > 0) { // 如果還有數(shù)據(jù),則先編碼 flush(); } finished = true; } }
總結(jié)
之前在看索引文件構(gòu)建的源碼時(shí),會(huì)經(jīng)常碰到對(duì)于正整數(shù)的批量壓縮應(yīng)用,而且有好幾個(gè)不同的類,當(dāng)時(shí)都是把它們當(dāng)成黑盒忽略,現(xiàn)在花了點(diǎn)時(shí)間,總算是把這些不同應(yīng)用場(chǎng)景都區(qū)分了。本文沒(méi)有羅列Lucene中全部的編碼壓縮應(yīng)用,后面如果有再碰到影響我看源碼的壓縮相關(guān)的,我會(huì)補(bǔ)充進(jìn)來(lái)。
以上就是Lucene源碼系列多值編碼壓縮算法實(shí)例詳解的詳細(xì)內(nèi)容,更多關(guān)于Lucene多值編碼壓縮算法的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Springboot接收?Form?表單數(shù)據(jù)的示例詳解
這篇文章主要介紹了Springboot接收?Form?表單數(shù)據(jù)的實(shí)例代碼,本文通過(guò)圖文實(shí)例代碼相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-08-08關(guān)于注解式的分布式Elasticsearch的封裝案例
這篇文章主要介紹了關(guān)于注解式的分布式Elasticsearch的封裝案例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-01-01spring boot自定義404錯(cuò)誤信息的方法示例
這篇文章主要介紹了spring boot自定義404錯(cuò)誤信息的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-09-09IntelliJ IDEA基于Maven構(gòu)建Java項(xiàng)目
在 Java 開發(fā)中,使用 Maven 是一種廣泛采用的構(gòu)建工具,本文主要介紹了IntelliJ IDEA基于Maven構(gòu)建Java項(xiàng)目,具有一定的參考價(jià)值,感興趣的可以了解一下2024-03-03一文徹底弄懂零拷貝原理以及java實(shí)現(xiàn)
零拷貝(英語(yǔ): Zero-copy) 技術(shù)是指計(jì)算機(jī)執(zhí)行操作時(shí),CPU不需要先將數(shù)據(jù)從某處內(nèi)存復(fù)制到另一個(gè)特定區(qū)域,下面這篇文章主要給大家介紹了關(guān)于零拷貝原理以及java實(shí)現(xiàn)的相關(guān)資料,需要的朋友可以參考下2021-08-08Sentinel源碼解析入口類和SlotChain構(gòu)建過(guò)程詳解
這篇文章主要為大家介紹了Sentinel源碼解析入口類和SlotChain構(gòu)建過(guò)程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09