java實現(xiàn)cassandra高級操作之分頁實例(有項目具體需求)
上篇博客講到了cassandra的分頁,相信大家會有所注意:下一次的查詢依賴上一次的查詢(上一次查詢的最后一條記錄的全部主鍵),不像mysql那樣靈活,所以只能實現(xiàn)上一頁、下一頁這樣的功能,不能實現(xiàn)第多少頁那樣的功能(硬要實現(xiàn)的話性能就太低了)。
我們先看看驅(qū)動官方給的分頁做法
如果一個查詢得到的記錄數(shù)太大,一次性返回回來,那么效率非常低,并且很有可能造成內(nèi)存溢出,使得整個應(yīng)用都奔潰。所以了,驅(qū)動對結(jié)果集進行了分頁,并返回適當?shù)哪骋豁摰臄?shù)據(jù)。
一、設(shè)置抓取大?。⊿etting the fetch size)
抓取大小指的是一次從cassandra獲取到的記錄數(shù),換句話說,就是每一頁的記錄數(shù);我們能夠在創(chuàng)建cluster實例的時候給它的fetch size指定一個默認值,如果沒有指定,那么默認是5000
// At initialization:
Cluster cluster = Cluster.builder()
.addContactPoint("127.0.0.1")
.withQueryOptions(new QueryOptions().setFetchSize(2000))
.build();
// Or at runtime:
cluster.getConfiguration().getQueryOptions().setFetchSize(2000);
另外,statement上也能設(shè)置fetch size
Statement statement = new SimpleStatement("your query");
statement.setFetchSize(2000);
如果statement上設(shè)置了fetch size,那么statement的fetch size將起作用,否則則是cluster上的fetch size起作用。
注意:設(shè)置了fetch size并不意味著cassandra總是返回準確的結(jié)果集(等于fetch size),它可能返回比fetch size稍微多一點或者少一點的結(jié)果集。
二、結(jié)果集迭代
fetch size限制了每一頁返回的結(jié)果集的數(shù)量,如果你迭代某一頁,驅(qū)動會在后臺自動的抓取下一頁的記錄。如下例,fetch size = 20:
默認情況下,后臺自動抓取發(fā)生在最后一刻,也就是當某一頁的記錄被迭代完的時候。如果需要更好的控制,ResultSet接口提供了以下方法:
getAvailableWithoutFetching() and isFullyFetched() to check the current state;
fetchMoreResults() to force a page fetch;
以下是如何使用這些方法提前預(yù)取下一頁,以避免在某一頁迭代完后才抓取下一頁造成的性能下降:
ResultSet rs = session.execute("your query");
for (Row row : rs) {
if (rs.getAvailableWithoutFetching() == 100 && !rs.isFullyFetched())
rs.fetchMoreResults(); // this is asynchronous
// Process the row ...
System.out.println(row);
}
三、保存并重新使用分頁狀態(tài)
有時候,將分頁狀態(tài)保存起來,對以后的恢復(fù)是非常有用的,想象一下:有一個無狀態(tài)Web服務(wù),顯示結(jié)果列表,并顯示下一頁的鏈接,當用戶點擊這個鏈接的時候,我們需要執(zhí)行與之前完全相同的查詢,除了迭代應(yīng)該從上一頁停止的位置開始;相當于記住了上一頁迭代到了哪了,那么下一頁從這里開始即可。
為此,驅(qū)動程序會暴露一個PagingState對象,該對象表示下一頁被提取時我們在結(jié)果集中的位置。
ResultSet resultSet = session.execute("your query");
// iterate the result set...
PagingState pagingState = resultSet.getExecutionInfo().getPagingState();
// PagingState對象可以被序列化成字符串或字節(jié)數(shù)組
String string = pagingState.toString();
byte[] bytes = pagingState.toBytes();
PagingState對象被序列化后的內(nèi)容可以持久化存儲起來,也可用作分頁請求的參數(shù),以備后續(xù)再次被利用,反序列化成對象即可:
PagingState.fromBytes(byte[] bytes); PagingState.fromString(String str);
請注意,分頁狀態(tài)只能使用完全相同的語句重復(fù)使用(相同的查詢,相同的參數(shù))。而且,它是一個不透明的值,只是用來存儲一個可以被重新使用的狀態(tài)值,如果嘗試修改其內(nèi)容或?qū)⑵涫褂迷诓煌恼Z句上,驅(qū)動程序會拋出錯誤。
具體我們來看下代碼,下例是模擬頁面分頁的請求,實現(xiàn)遍歷teacher表中的全部記錄:
接口:
import java.util.Map;
import com.datastax.driver.core.PagingState;
public interface ICassandraPage
{
Map<String, Object> page(PagingState pagingState);
}
主體代碼:
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.datastax.driver.core.PagingState;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.huawei.cassandra.dao.ICassandraPage;
import com.huawei.cassandra.factory.SessionRepository;
import com.huawei.cassandra.model.Teacher;
public class CassandraPageDao implements ICassandraPage
{
private static final Session session = SessionRepository.getSession();
private static final String CQL_TEACHER_PAGE = "select * from mycas.teacher;";
@Override
public Map<String, Object> page(PagingState pagingState)
{
final int RESULTS_PER_PAGE = 2;
Map<String, Object> result = new HashMap<String, Object>(2);
List<Teacher> teachers = new ArrayList<Teacher>(RESULTS_PER_PAGE);
Statement st = new SimpleStatement(CQL_TEACHER_PAGE);
st.setFetchSize(RESULTS_PER_PAGE);
// 第一頁沒有分頁狀態(tài)
if (pagingState != null)
{
st.setPagingState(pagingState);
}
ResultSet rs = session.execute(st);
result.put("pagingState", rs.getExecutionInfo().getPagingState());
//請注意,我們不依賴RESULTS_PER_PAGE,因為fetch size并不意味著cassandra總是返回準確的結(jié)果集
//它可能返回比fetch size稍微多一點或者少一點,另外,我們可能在結(jié)果集的結(jié)尾
int remaining = rs.getAvailableWithoutFetching();
for (Row row : rs)
{
Teacher teacher = this.obtainTeacherFromRow(row);
teachers.add(teacher);
if (--remaining == 0)
{
break;
}
}
result.put("teachers", teachers);
return result;
}
private Teacher obtainTeacherFromRow(Row row)
{
Teacher teacher = new Teacher();
teacher.setAddress(row.getString("address"));
teacher.setAge(row.getInt("age"));
teacher.setHeight(row.getInt("height"));
teacher.setId(row.getInt("id"));
teacher.setName(row.getString("name"));
return teacher;
}
}
測試代碼:
import java.util.Map;
import com.datastax.driver.core.PagingState;
import com.huawei.cassandra.dao.ICassandraPage;
import com.huawei.cassandra.dao.impl.CassandraPageDao;
public class PagingTest
{
public static void main(String[] args)
{
ICassandraPage cassPage = new CassandraPageDao();
Map<String, Object> result = cassPage.page(null);
PagingState pagingState = (PagingState) result.get("pagingState");
System.out.println(result.get("teachers"));
while (pagingState != null)
{
// PagingState對象可以被序列化成字符串或字節(jié)數(shù)組
System.out.println("==============================================");
result = cassPage.page(pagingState);
pagingState = (PagingState) result.get("pagingState");
System.out.println(result.get("teachers"));
}
}
}
我們來看看Statement的setPagingState(pagingState)方法:

四、偏移查詢
保存分頁狀態(tài),能夠保證從某一頁移動到下一頁很好地運行(也可以實現(xiàn)上一頁),但是它不滿足隨機跳躍,比如直接跳到第10頁,因為我們不知道第10頁的前一頁的分頁狀態(tài)。像這樣需要偏移查詢的特點,并不被cassandra原生支持,理由是偏移查詢效率低下(性能與跳過的行數(shù)呈線性反比),所以cassandra官方不鼓勵使用偏移量。如果非要實現(xiàn)偏移查詢,我們可以在客戶端模擬實現(xiàn)。但是性能還是呈線性反比,也就說偏移量越大,性能越低,如果性能在我們的接受范圍內(nèi),那還是可以實現(xiàn)的。例如,每一頁顯示10行,最多顯示20頁,這就意味著,當顯示第20頁的時候,最多需要額外的多抓取190行,但這也不會對性能造成太大的降低,所以數(shù)據(jù)量不大的話,模擬實現(xiàn)偏移查詢還是可以的。
舉個例子,假設(shè)每頁顯示10條記錄,fetch size 是50,我們請求第12頁(也就是第110行到第119行):
1、第一次執(zhí)行查詢,結(jié)果集包含0到49行,我們不需要用到它,只需要分頁狀態(tài);
2、用第一次查詢得到的分頁狀態(tài),執(zhí)行第二次查詢;
3、用第二次查詢得到的分頁狀態(tài),執(zhí)行第三次查詢。結(jié)果集包含100到149行;
4、用第三次查詢得到的結(jié)果集,先過濾掉前10條記錄,然后讀取10條記錄,最后丟棄剩下的記錄,讀取的10條記錄則是第12頁需要顯示的記錄。
我們需要嘗試著找到最佳的fetch size來達到最佳平衡:太小就意味著后臺更多的查詢;太大則意味著返回了更大的信息量以及更多不需要的行。
另外,cassandra本身不支持偏移量查詢。在滿足性能的前提下,客戶端模擬偏移量的實現(xiàn)只是一種妥協(xié)。官方建議如下:
1、使用預(yù)期的查詢模式來測試代碼,以確保假設(shè)是正確的
2、設(shè)置最高頁碼的硬限制,以防止惡意用戶觸發(fā)跳過大量行的查詢
五、總結(jié)
Cassandra對分頁的支持有限,上一頁、下一頁比較好實現(xiàn)。不支持偏移量的查詢,硬要實現(xiàn)的話,可以采用客戶端模擬的方式,但是這種場景最好不要用在cassandra上,因為cassandra一般而言是用來解決大數(shù)據(jù)問題,而偏移量查詢一旦數(shù)據(jù)量太大,性能就不敢恭維了。
在我的項目中,索引修復(fù)用到了cassandra的分頁,場景如下:cassandra的表不建二級索引,用elasticsearch實現(xiàn)cassandra表的二級索引,那么就會涉及到索引的一致性修復(fù)的問題,這里就用到了cassandra的分頁,對cassandra的某張表進行全表遍歷,逐條與elasticsearch中的數(shù)據(jù)進行匹對,若elasticsearch中不存在,則在elasticsearch中新增,若存在而又不一致,則在elasticsearch中修復(fù)。具體elasticsearch怎么樣實現(xiàn)cassandra的索引功能,在我后續(xù)博客中會專門的講解,這里就不多說了。而在cassandra表進行全表遍歷的時候就需要用到分頁,因為表中數(shù)據(jù)量太大,億級別的數(shù)據(jù)不可能一次全部加載到內(nèi)存中。
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot使用 druid 連接池來優(yōu)化分頁語句
這篇文章主要介紹了SpringBoot使用 druid 連接池來優(yōu)化分頁語句,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-11-11
Spring中的@PropertySource注解源碼詳細解析
這篇文章主要介紹了Spring中的@PropertySource注解源碼詳細解析,@PropertySource注解,標注在配置類@Configuration上面,下面主要分析一下@PropertySource注解的處理過程,也就是怎么把配置信息從.properies文件放到environment中的,需要的朋友可以參考下2024-01-01
Springboot+WebSocket+Netty實現(xiàn)在線聊天/群聊系統(tǒng)
這篇文章主要實現(xiàn)在好友添加、建群、聊天對話、群聊功能,使用Java作為后端語言進行支持,界面友好,開發(fā)簡單,文章中有詳細的代碼示例供大家參考,需要的朋友可以參考下2023-08-08
Java 枚舉類和自定義枚舉類和enum聲明及實現(xiàn)接口的操作
這篇文章主要介紹了Java 枚舉類和自定義枚舉類和enum聲明及實現(xiàn)接口的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02

