鎖超時發(fā)現parallelStream并行流線程上下文坑解決
detached entity passed to persist問題
就我之前因為在處理jpa持久化對象上下文時,spring jpa關于線程池異步執(zhí)行導致detached entity passed to persist問題排查和解決
我這邊有個批量插入用戶OpenUser和應用OpenApp關聯關系數據的操作,由于耗時較長時間,所以準備用線程池異步執(zhí)行操作,然而卻遇到了一個jpa的detached entity passed to persist問題,我這邊的操作是批量保存一個OpenAppUser關聯關系表,所以需要先獲得對應OpenUser和OpenApp的引用,再設置到關聯對象OpenAppUser里,然后在保存,我這邊是先通過userRepository.findById(userId)獲取到OpenUser,然后openAppUser.setOpenUser(openUser),在執(zhí)行appUserRepository.save(openAppUser);時發(fā)生了如標題上的錯誤,說是OpenUser對象處于游離態(tài),無法保存。
經過排查,我這邊是因為OpenAppUser類里設置了@ManyToOne(cascade = CascadeType.ALL)級聯OpenUser,所以在保存OpenAppUser的時候會級聯操作OpenUser,本來在沒有開線程異步的情況下,因為OpenUser之前通過findById查出來了,所以在jpa的PersistenceContext里是有該OpenUser的脫管對象的,這時候就不會報錯,而在線程異步的情況下context里確沒有該脫管對象了
(這里說明一下,為啥不開線程有,開了線程沒有?)因為spring-boot默認jpa.open-in-view=true,會使用ThreadLocal在當前線程里保存EntityManager上下文信息,所以在整個controller里都是使用的同一個context
PersistenceContext持久性上下文有兩種類型
- 事務范圍的持久性上下文;當我們在事務中執(zhí)行任何操作時,EntityManager 會檢查持久性上下文。 如果存在,則將使用它。否則,它將創(chuàng)建一個持久性上下文
- 擴展范圍的持久性上下文;擴展持久性上下文可以跨越多個事務。我們可以在沒有事務的情況下持久化實體,但不能在沒有事務的情況下刷新它。
在@PersistenceContext注解里type可以指定范圍:PersistenceContextType.TRANSACTION;PersistenceContextType.EXTENDED
而當我們用線程池異步的時候,拿不到之前的EntityManager的配置信息,而spring jpa repository默認的方法上都會自帶一個事務,所以在執(zhí)行完userRepository.findById(userId)獲取到OpenUser之后,會commit,而commit操作會clear掉EntityManager里保存的脫管對象OpenUser,等到appUserRepository.save(openAppUser);保存的時候,由于引用的OpenUser已經沒有在PersistenceContext上下文里了,不是脫管對象了(具體可以看EntityState entityState = getEntityState( entity, entityName, entityEntry, source );里面的實現,有幾種判斷條件,是不是脫管對象,有沒有id、version等等屬性),就會報detached entity passed to persist這個異常
所以根據實際情況,我們只要參考open-in-view=true產生對應的OpenEntityManagerInViewInterceptor攔截器改造一下自己線程里的PersistenceContext上下文生效范圍,就可以解決該異常了
parallelStream并行流
parallelStream并行流給我的印象就是會讀不到父線程的上下文的,所以應該在父線程里的事務和在parallelStream里的事務應該是區(qū)分的,而不是共用同一個事務的,然而今天因為一個鎖超時的問題,發(fā)現并沒有那么簡單,下面我們一步一步來驗證。
鎖超時場景
具體的業(yè)務我不講了,就說下偽代碼
@PostMapping("/saveUser") @Transactional public void saveUser(@RequestBody List<Complex> list) { list.parallelStream().forEach(complex->{ Integer appId = complex.getAppId(); Integer userId = complex.getUserId(); GeneratedKeyHolder keyHolder = new GeneratedKeyHolder(); String sql = "insert ignore into open_app_user (app_id, open_id, user_status, creator, modifier, create_time, modify_time, status, version) values ("+appId+","+userId+",0,1,1,now(),now(),1,1)"; int id = jdbcTemplate.update(con -> con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS), keyHolder); }); //todo 業(yè)務邏輯... }
這里我有個批量保存的邏輯,需要先保存一個中間表open_app_user表(該表app_id和open_id是聯合唯一鍵)獲得id,拿到用戶的open_app_user_id后再進行其他業(yè)務邏輯,這里按我原來的理解是雖然我在controller的方法上加了@Transactional注解,但是parallelStream里的事務應該都是獨立的,不會是同一個事務,所以即使有數據重復,第一個線程插入后,第二個線程也只會插入失?。ú粫箦e,因為我加了ignore),所以即使并行也不會有問題的,然而卻發(fā)生了鎖超時的問題。
查看鎖超時以及定位的操作可以看我前面的文章,通過查找mysql的 http://chabaoo.cn/article/259480.htm
select * from information_schema.INNODB_TRX; select * from performance_schema.data_lock_waits; select * from performance_schema.data_locks;
定位到了這里,然而我也百思不得其解,為啥會鎖超時呢,這里應該都是馬上執(zhí)行就馬上釋放了啊,難道是其中的事務沒有提交?
因為現在都是spring的聲明式事務管理,spring是在有@Transactional注解的情況下,執(zhí)行完了才提交事務,在沒有@Transactional注解的情況下,每個方法都差不多可以理解成原子,比如我上面的jdbcTemplate.update()這個方法就是一個事務,執(zhí)行完了就直接提交事務了。
驗證
因為spring是把事務上下文放在ThreadLocal里了,主要是用TransactionSynchronizationManager這個類來管理,所以我寫了一個demo來進行驗證
@GetMapping("/get") @Transactional public String get() { List<Complex> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { list.add(new Complex(1, 1)); } list.parallelStream().forEach(complex->{ Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap(); System.err.println("count:"+resourceMap.size()); Integer appId = complex.getAppId(); Integer userId = complex.getUserId(); String sql = "insert ignore into open_app_user (app_id, open_id, user_status, creator, modifier, create_time, modify_time, status, version) values ("+appId+","+userId+",0,1,1,now(),now(),1,1)"; int update = jdbcTemplate.update(sql); }); return "hello, world! "; }
有趣的事情發(fā)生了,我在注釋掉@Transactional注解時,代碼里resourceMap.size()返回的內容是竟然不一樣,因為我的list有10條記錄,差不多就是10個并行,然而我的輸出卻是:
count:1
count:0
count:0
count:0
count:0
count:0
count:0
count:0
count:0
count:0
沒有注釋掉@Transactional注解時,輸出是:
count:2
count:0
count:0
count:0
count:0
count:0
count:0
count:0
count:0
count:0
并且還會出現鎖超時的現象,奇怪的地方就是為啥我用的parallelStream會有線程上下文里的值,我并沒有做什么操作,而且10個并行里只有一個(這里并不是說明固定只有一次,下面會說明)獲得了線程上下文的信息
測試
我又進一步測試,偽代碼改成:
@GetMapping("/get") public void get() { List<Complex> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { list.add(new Complex(1, 1)); } ThreadLocal local = new ThreadLocal(); local.set("parent_set_value"); list.parallelStream().forEach(complex->{ System.err.println(local.get()); }); }
結果如我所料,輸出為:
parent_set_value
null
null
null
null
null
null
null
null
null
使用parallelStream并不完全都是另開了線程,其中有一個是屬于主線程的,可以使用System.err.println(Thread.currentThread().getName());查看當前線程的名稱,我發(fā)現parallelStream會把當前主線程也作為一個執(zhí)行線程去執(zhí)行任務
后面我再去了解了一下parallelStream的實現,在這個方法上的注解里第一句話有個單詞是possibly,是“可能”返回并行流,原來參與并行處理的線程有主線程以及ForkJoinPool中的worker線程,所以parallelStream是有兩種情況的,一是可能只一個線程并發(fā)執(zhí)行,二是多個線程并行執(zhí)行,而我這里導致鎖超時,就是因為用到了主線程,所以在并行插入的時候,有個處理有事務上下文,導致一直沒有提交事務(@Transactional注釋方法的方法沒有跑完,這里也不可能跑完),所以其他線程的插入就一直等待這個,產生了鎖超時報錯
以上就是鎖超時發(fā)現parallelStream并行流線程上下文坑解決的詳細內容,更多關于parallelStream并行流線程坑的資料請關注腳本之家其它相關文章!
相關文章
SpringBoot Controller Post接口單元測試示例
今天小編就為大家分享一篇關于SpringBoot Controller Post接口單元測試示例,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-12-12Spring覆蓋容器中Bean的注解如何實現@OverrideBean
文章介紹了在項目開發(fā)中如何通過偷梁換柱的方式重寫Spring容器中的內置Bean,并指出了需要注意的兩點:1. 對應的Bean應基于接口注入;2. 如果不是基于接口注入,可以使用同包名同類名的方式重寫(可能存在潛在問題,不推薦),文章還強調了“基于接口編程”的好處2025-01-01mybatis執(zhí)行批量更新batch update 的方法(oracle,mysql兩種)
這篇文章主要介紹了mybatis執(zhí)行批量更新batch update 的方法,提供oracle和mysql兩種方法,非常不錯,需要的朋友參考下2017-01-01