從匯編代碼開始全面解析synchronized還原最真實的偏向鎖
前言
我們都知道java之所以跨平臺能力強,是因為java在編譯期沒有被編譯成機器碼,而是被編譯成字節(jié)碼。早期的jvm會將編譯好的字節(jié)碼翻譯成機器碼解釋執(zhí)行,我們在jvm的源碼中還可以看到早期的解釋器——bytecodeInterpreter.cpp(雖然已經(jīng)不再使用)。對于字節(jié)碼這種總數(shù)固定,解釋邏輯固定的命令,現(xiàn)代jvm將其執(zhí)行進(jìn)行了優(yōu)化,在jvm初始化的時候,直接將每個字節(jié)碼指令將要執(zhí)行的匯編代碼加載到內(nèi)存中,在運行時執(zhí)行某段字節(jié)碼時直接調(diào)用內(nèi)存中對應(yīng)的匯編代碼即可,這樣的解釋器就時模板解釋器——templateTable.cpp。而synchronized修飾代碼塊時,其編譯成字節(jié)碼后就是monitorenter和monitorexit(關(guān)于如何查看編譯后的字節(jié)碼可以查看筆者往期的博客)。
所以要想看現(xiàn)代jvm的synchronized實現(xiàn)還要從模板解釋器(templateTable)的monitorenter方法看起(網(wǎng)上許多文章都是從bytecodeInterpreter開始分析,雖然大致邏輯一樣,更有甚者將偏向鎖撤銷邏輯硬是理解成偏向鎖加鎖邏輯,非?;靵y),本文筆者就從模板解釋器匯編源碼開始分析還原最真實的偏向鎖實現(xiàn),解釋monitorenter字節(jié)碼命令的方法開始,從匯編代碼開始全面解析synchronized。
一.TemplateTable::monitorenter()
關(guān)于這個monitorenter()方法,主要包括在方法棧幀中獲取lockRecord以及若lockRecord不夠則擴容的邏輯,由于這部分代碼是將字節(jié)碼直接解釋成機器碼,所以以方法名的形式將機器碼封裝成了對應(yīng)的匯編命令,我們碰到的匯編方法將其當(dāng)成對應(yīng)的匯編命令即可(值得注意的是里面又很多jmp,jcc,jccb等跳轉(zhuǎn)指令,由于篇幅有限本文就不過多介紹,有興趣的讀者可以自行了解,本文就將其當(dāng)成跳轉(zhuǎn)指令),其他匯編命令也比較簡單,這里就不過多介紹,讀者如果碰到相關(guān)不熟悉的命令可以自行搜索下相關(guān)概念,好了話不多說我們直接看源碼:
void TemplateTable::monitorenter() { transition(atos, vtos); // 檢查對象是否為null,此時對象存在rax寄存器中 __ null_check(rax); // rbp是堆棧寄存器,通常指向棧底 // 棧幀中存在一個monitor數(shù)組用于保存鎖相關(guān)信息,又叫l(wèi)ockRecord(后面都統(tǒng)稱為lockRecord) // frame::interpreter_frame_monitor_block_top_offset和frame::interpreter_frame_initial_sp_offset // 表示monitor top 和monitor bot偏移量 // Address(x, j)表示距離x地址j偏移量的地址 // 所以這里聲明的兩個變量我們可以簡單理解為棧幀中的monitor top 和monitor bot地址 const Address monitor_block_top( rbp, frame::interpreter_frame_monitor_block_top_offset * wordSize); const Address monitor_block_bot( rbp, frame::interpreter_frame_initial_sp_offset * wordSize); const int entry_size = frame::interpreter_frame_monitor_size() * wordSize; Label allocated; // 初始化c_rarg1寄存器中的值(這里本質(zhì)是一個異或運算) __ xorl(c_rarg1, c_rarg1); // points to free slot or NULL // 這部分代碼邏輯是循環(huán)從lockRecord數(shù)組中找到一個空的槽位,并將其放入c_rarg1寄存器中 { Label entry, loop, exit; __ movptr(c_rarg3, monitor_block_top); __ lea(c_rarg2, monitor_block_bot); // 直接跳到entry標(biāo)簽位 __ jmpb(entry); // 綁定loop標(biāo)簽開始循環(huán) __ bind(loop); // 檢查當(dāng)前LockRecord是否被使用 __ cmpptr(Address(c_rarg3, BasicObjectLock::obj_offset_in_bytes()), (int32_t) NULL_WORD); // 沒有被使用則將其放到c_rarg1 __ cmov(Assembler::equal, c_rarg1, c_rarg3); // 檢查和當(dāng)前對象是否一樣 __ cmpptr(rax, Address(c_rarg3, BasicObjectLock::obj_offset_in_bytes())); // 如果一樣則表示重入,跳出循環(huán) __ jccb(Assembler::equal, exit); // 否則則跳到下一個entry __ addptr(c_rarg3, entry_size); // 綁定entry標(biāo)簽 __ bind(entry); // 比較c_rarg3與c_rarg2寄存器中的值,即是否相等 __ cmpptr(c_rarg3, c_rarg2); // 若不等則跳到loop繼續(xù)循環(huán) __ jcc(Assembler::notEqual, loop); __ bind(exit); } //檢測一個空槽位是否被找到(如果是重入則不會跳轉(zhuǎn)會去新申請) __ testptr(c_rarg1, c_rarg1); //找到則跳到 allocated標(biāo)簽 __ jcc(Assembler::notZero, allocated); // 如果沒有空的slot則申請一個,這里還包括了申請后調(diào)整位置的邏輯 { Label entry, loop; // 將lockrecord底部的指針放到c_rarg1寄存器中 __ movptr(c_rarg1, monitor_block_bot); // 計算并移動棧頂和棧底到新位置,均移動entry_size(rsp寄存器指向棧頂) __ subptr(rsp, entry_size); __ subptr(c_rarg1, entry_size); // 設(shè)置新的棧頂位置和棧底位置分別到c_rarg3寄存器和monitor_block_bot地址上 __ mov(c_rarg3, rsp); __ movptr(monitor_block_bot, c_rarg1); // 跳到entry標(biāo)簽——為了先比較下然后開始循環(huán) // c_rarg1則是新的空slot __ jmp(entry); __ bind(loop); // 這兩行是將老棧頂位置的值存到新棧頂位置 __ movptr(c_rarg2, Address(c_rarg3, entry_size)); __ movptr(Address(c_rarg3, 0), c_rarg2); // 推進(jìn)到下一個位置 __ addptr(c_rarg3, wordSize); __ bind(entry); __ cmpptr(c_rarg3, c_rarg1); __ jcc(Assembler::notEqual, loop); } // 綁定allocated標(biāo)簽 __ bind(allocated); __ increment(r13); // 保存對象到lockRecord中,locrRecord對象有兩個屬性分別是對象指針和鎖 // BasicObjectLock::obj_offset_in_bytes()也表示偏移量 __ movptr(Address(c_rarg1, BasicObjectLock::obj_offset_in_bytes()), rax); // 加鎖方法 __ lock_object(c_rarg1); // 檢查以確保該監(jiān)視器在鎖定后不會導(dǎo)致堆棧溢出 __ save_bcp(); __ generate_stack_overflow_check(0); // 調(diào)用下一個指令 __ dispatch_next(vtos); }
我們看到下一個方法是_lock_object()方法,這個方法我們等下在分析,在這之前筆者先介紹下我們源碼中看到的lockRecord,其實時basicLock.cpp中的BasicObjectLock類:
//只有兩個屬性 class BasicObjectLock VALUE_OBJ_CLASS_SPEC { private: //鎖對象 BasicLock _lock; //表示持有鎖的對象 oop _obj; } //再來看看鎖對象——只有一個屬性 class BasicLock VALUE_OBJ_CLASS_SPEC { private: //markword一般保存的是持有鎖對象的markword volatile markOop _displaced_header; }
可以看到lockRecord是用于關(guān)聯(lián)對象和鎖的關(guān)系的,如果在當(dāng)前方法中有加鎖的對象,就會在解釋棧幀中添加一個lockRecord用于記錄相應(yīng)的對象和鎖的關(guān)系,不僅如此lockRecord還會隱式的鎖重入的計數(shù)器,當(dāng)發(fā)生重入時,就會為同一個對象創(chuàng)建多個lockRecord。從源碼中我們也可以看到在解釋的方法執(zhí)行期間,lockRecord的數(shù)組會根據(jù)持有的鎖數(shù)量增長或縮小。
二.lock_object():
接下來我們來一起看看lock_object()方法:
//在interp_masm_x86_64.cpp文件中 void InterpreterMacroAssembler::lock_object(Register lock_reg) { assert(lock_reg == c_rarg1, "The argument is only for looks. It must be c_rarg1"); //判斷是否強制使用重鎖,默認(rèn)是false if (UseHeavyMonitors) { call_VM(noreg, CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter), lock_reg); } else { //定義完成標(biāo)簽 Label done; const Register swap_reg = rax; const Register obj_reg = c_rarg3; //聲明一些偏移量 const int obj_offset = BasicObjectLock::obj_offset_in_bytes(); const int lock_offset = BasicObjectLock::lock_offset_in_bytes (); const int mark_offset = lock_offset + BasicLock::displaced_header_offset_in_bytes(); Label slow_case; // 傳入的basicObjectLock中的對象地址存到obj_reg中,即c_rarg3寄存器中 movptr(obj_reg, Address(lock_reg, obj_offset)); //使用偏向鎖 if (UseBiasedLocking) { //偏向鎖加鎖方法 biased_locking_enter(lock_reg, obj_reg, swap_reg, rscratch1, false, done, &slow_case); } //后面的方法是關(guān)于偏向鎖撤銷和升級的,不是本文重點,本文先略過 movl(swap_reg, 1); orptr(swap_reg, Address(obj_reg, 0)); movptr(Address(lock_reg, mark_offset), swap_reg); assert(lock_offset == 0, "displached header must be first word in BasicObjectLock"); if (os::is_MP()) lock(); cmpxchgptr(lock_reg, Address(obj_reg, 0)); if (PrintBiasedLockingStatistics) { cond_inc32(Assembler::zero, ExternalAddress((address) BiasedLocking::fast_path_entry_count_addr())); } jcc(Assembler::zero, done); subptr(swap_reg, rsp); andptr(swap_reg, 7 - os::vm_page_size()); movptr(Address(lock_reg, mark_offset), swap_reg); if (PrintBiasedLockingStatistics) { cond_inc32(Assembler::zero, ExternalAddress((address) BiasedLocking::fast_path_entry_count_addr())); } jcc(Assembler::zero, done); bind(slow_case); call_VM(noreg, CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter), lock_reg); bind(done); } }
三.biased_locking_enter()
1).參數(shù)
lock_object()調(diào)用的biased_locking_enter()方法中才是真正偏向鎖邏輯,我們這里介紹下傳入的幾個參數(shù)(便于我們后續(xù)分析):
lock_reg
:lock_object方法傳入的空的lockRecord,其內(nèi)部已經(jīng)保存了當(dāng)前要加鎖的對象
obj_reg
:持有對象的寄存器,其內(nèi)部保存了將要加鎖的對象
swap_reg
:目前是一個空的寄存器,會用于保存中間值
rscratch1
: 臨時寄存器,用于保存中間值
done
:done標(biāo)簽方便直接跳到方法結(jié)束
&slow_case
:slow_case標(biāo)簽方便直接跳到鎖升級邏輯
比較重要的是前兩個參數(shù),分別保存我們要判斷的lockRecord和對象,后面兩個參數(shù)其實是方便我們直接跳轉(zhuǎn)到對應(yīng)邏輯的標(biāo)簽。
2).概念
這個方法中還會涉及到一些概念,網(wǎng)上也有一些介紹,筆者先簡單介紹下,方便大家閱讀:
markword
:一般用二進(jìn)制表示,對象頭中的markword,主要用來表示對象的線程鎖狀態(tài),另外還可以用來配合GC、存放該對象的hashCode
這張圖就表示markword的幾個狀態(tài)。
klassword
:一個指向方法區(qū)中Class信息的指針一般用二進(jìn)制表示,通過指針可以獲取其相應(yīng)的klass對象(即方法區(qū)中表示class信息的對象)
偏向模式:表示對象是否當(dāng)前是偏向狀態(tài),即markword最后三位是否是101,這里需要注意的是不光普通對象具有偏向模式,klass對象也有偏向模式,具體可以在systemDictionary.cpp 的update_dictionary方法中可以看到,所有創(chuàng)建的klass鎖狀態(tài)起始是001,然后會被更新為101。創(chuàng)建普通對象時會將klass中的markword填充到oop對象中。Klass對象除了再剛開始創(chuàng)建時鎖狀態(tài)時001,再進(jìn)行批量偏向鎖撤銷的時候也會恢復(fù)成001(這部分不是本文重點,具體細(xì)節(jié)就先不分析)。所以當(dāng)一個對象是偏向模式時,其不一定是持有偏向鎖的,因為對象剛創(chuàng)建出來其markword后三位即101,需要通過線程ID,epoch來判斷其是否持有偏向鎖。
3).源碼
讓我們繼續(xù)看biased_locking_enter()方法:
//調(diào)用的是macroAssembler_x86.cpp中的方法 int MacroAssembler::biased_locking_enter(Register lock_reg, Register obj_reg, Register swap_reg, Register tmp_reg, bool swap_reg_contains_mark, Label& done, Label* slow_case, BiasedLockingCounters* counters) { ...... bool need_tmp_reg = false; //noreg是一個宏,表示空的寄存器 if (tmp_reg == noreg) { need_tmp_reg = true; tmp_reg = lock_reg; } else { assert_different_registers(lock_reg, obj_reg, swap_reg, tmp_reg); } //定義一些地址,分別是markword,klass和lockRecord中的鎖對象地址 Address mark_addr (obj_reg, oopDesc::mark_offset_in_bytes()); Address klass_addr (obj_reg, oopDesc::klass_offset_in_bytes()); Address saved_mark_addr(lock_reg, 0); // 偏向鎖邏輯開始 // 分支1:查看當(dāng)前對象是否開啟偏向模式 Label cas_label; int null_check_offset = -1; //swap_reg_contains_mark傳入的是false,表示swap_reg不包括markword地址 if (!swap_reg_contains_mark) { null_check_offset = offset(); //將對象的markword放入swap_reg movl(swap_reg, mark_addr); } if (need_tmp_reg) { push(tmp_reg); } //將對象的markword放入tmp_reg movl(tmp_reg, swap_reg); //取其鎖標(biāo)記位(與指令) //markOopDesc::biased_lock_mask_in_place=111 這里是取markword的后三位到tmp_reg寄存器中 andl(tmp_reg, markOopDesc::biased_lock_mask_in_place); //判斷是否有鎖(比較指令) //markOopDesc::biased_lock_pattern=101 cmpl(tmp_reg, markOopDesc::biased_lock_pattern); if (need_tmp_reg) { pop(tmp_reg); } //如果不相等則表示沒有開啟對象偏向模式(即已經(jīng)是輕量級鎖)則跳到cas_label標(biāo)簽到方法末尾 jcc(Assembler::notEqual, cas_label); // 分支2:相等則表示對象markword后三位是101即現(xiàn)在對象是偏向鎖模式(但不一定持有偏向鎖) // 這部分的邏輯是將線程id和epoch信息做比對,判斷是否已經(jīng)持有偏向鎖 movl(saved_mark_addr, swap_reg); if (need_tmp_reg) { push(tmp_reg); } //獲取線程id get_thread(tmp_reg); //對象的markword與線程id異或,若線程id部分一樣則線程id部分會變成0 xorl(swap_reg, tmp_reg); if (swap_reg_contains_mark) { null_check_offset = offset(); } //將klass放入tmp_reg寄存器 movl(tmp_reg, klass_addr); //與klass的markword異或,若兩者同位部分一樣則同位會變成0,這里是為了判斷epoch和鎖標(biāo)志位是否與klass一樣 xorl(swap_reg, Address(tmp_reg, Klass::prototype_header_offset())); //設(shè)置分代年齡掩碼即年齡為0 andl(swap_reg, ~((int) markOopDesc::age_mask_in_place)); if (need_tmp_reg) { pop(tmp_reg); } if (counters != NULL) { cond_inc32(Assembler::zero, ExternalAddress((address)counters->biased_lock_entry_count_addr())); } //前面已經(jīng)處理過markword,將其關(guān)鍵信息已經(jīng)存入swap_reg中,后面只使用swap_reg進(jìn)行判斷 //如果swap等于0,則表明線程id是本線程id,且epoch和鎖標(biāo)志位都與klass中的一樣,即已經(jīng)偏向本線程,跳到加鎖結(jié)束 jcc(Assembler::equal, done); //定義撤銷偏向鎖標(biāo)簽 Label try_revoke_bias; //定義重偏向鎖標(biāo)簽 Label try_rebias; //若不等則證明線程id,epoch和鎖標(biāo)志位有不一樣的 //分支3:先判斷鎖標(biāo)志位,即判斷類的偏向模式是否是關(guān)閉 //test可以理解為與運算 //因為之前已經(jīng)判斷過對象的是偏向模式,而klass與對象的鎖標(biāo)記位不等,則證明klass對象不是偏向模式 //如果類偏向模式是關(guān)閉,表明正在進(jìn)行批量撤銷偏向鎖的行為,即正在進(jìn)行鎖升級 //所以需要cas替換修復(fù)對象的markword,修復(fù)成類的markword,跳到撤銷標(biāo)簽 testl(swap_reg, markOopDesc::biased_lock_mask_in_place); jcc(Assembler::notZero, try_revoke_bias); //分支4:再判斷是否是epoch過期,過期則跳到重偏向標(biāo)簽 testl(swap_reg, markOopDesc::epoch_mask_in_place); jcc(Assembler::notZero, try_rebias); //分支5:到這里只剩線程id并不是本線程,進(jìn)行一次cas替換嘗試加偏向鎖 //將對象的markword讀到swap_reg中 movl(swap_reg, saved_mark_addr); //進(jìn)行與運算,獲取對象markword的鎖標(biāo)志位和age,epoch用來構(gòu)造一個新的帶鎖的markword andl(swap_reg, markOopDesc::biased_lock_mask_in_place | markOopDesc::age_mask_in_place | markOopDesc::epoch_mask_in_place); if (need_tmp_reg) { push(tmp_reg); } get_thread(tmp_reg); //將線程id也加入到構(gòu)造的markword中 orl(tmp_reg, swap_reg); //判斷是否是多核cpu如果是則加鎖——執(zhí)行Lock命令 if (os::is_MP()) { lock(); } //cas替換對象的對象的markword為剛剛構(gòu)造的持有鎖信息的markword //Address(obj_reg, 0)表示對象的markword位置 cmpxchgptr(tmp_reg, Address(obj_reg, 0)); if (need_tmp_reg) { pop(tmp_reg); } if (counters != NULL) { cond_inc32(Assembler::zero, ExternalAddress((address)counters->anonymously_biased_lock_entry_count_addr())); } //cas不為0則證明偏向我們失敗,意味著有另一個線程成功偏向,有競爭 //則進(jìn)入slow邏輯,跳轉(zhuǎn)到slow_case標(biāo)簽,執(zhí)行撤銷升級邏輯 if (slow_case != NULL) { jcc(Assembler::notZero, *slow_case); } //成功證明已經(jīng)偏向成功,跳轉(zhuǎn)到done標(biāo)簽 jmp(done); //epoch過期,重新偏向標(biāo)簽 bind(try_rebias); if (need_tmp_reg) { push(tmp_reg); } //獲取當(dāng)前線程ID get_thread(tmp_reg); movl(swap_reg, klass_addr); //或運算,以klass的markword為基礎(chǔ)和線程id組合構(gòu)成新的markword orl(tmp_reg, Address(swap_reg, Klass::prototype_header_offset())); movl(swap_reg, saved_mark_addr); if (os::is_MP()) { lock(); } //將新構(gòu)造的markword cas替換 對象的markword cmpxchgptr(tmp_reg, Address(obj_reg, 0)); if (need_tmp_reg) { pop(tmp_reg); } if (counters != NULL) { cond_inc32(Assembler::zero, ExternalAddress((address)counters->rebiased_lock_entry_count_addr())); } //偏向失敗則證明有另外的線程偏向成功,需要撤銷偏向 if (slow_case != NULL) { jcc(Assembler::notZero, *slow_case); } //跳到結(jié)束 jmp(done); //撤銷偏向,將對象markword重置為klass(類)的markword //這里只有判斷類的markword不是偏向標(biāo)記才會進(jìn)入,所以會將對象的markword重置為非偏向標(biāo)記 bind(try_revoke_bias); movl(swap_reg, saved_mark_addr); if (need_tmp_reg) { push(tmp_reg); } //獲取對象klass的markword movl(tmp_reg, klass_addr); movl(tmp_reg, Address(tmp_reg, Klass::prototype_header_offset())); if (os::is_MP()) { lock(); } //用對象klass的markword cas替換對象的markword cmpxchgptr(tmp_reg, Address(obj_reg, 0)); if (need_tmp_reg) { pop(tmp_reg); } //無論cas的結(jié)果成功與否,都證明有線程撤銷成功,所以繼續(xù)執(zhí)行 if (counters != NULL) { cond_inc32(Assembler::zero, ExternalAddress((address)counters->revoked_lock_entry_count_addr())); } bind(cas_label); return null_check_offset; }
看完了源碼我們可以這樣理解偏向鎖,添加偏向鎖的過程即是在對象處于可偏向模式時,在對象的markword中cas替換對應(yīng)的線程id標(biāo)記位,即表示當(dāng)前線程持有了對象的偏向鎖。完整的偏向鎖處理邏輯已經(jīng)分析完了,這里面分支比較多,我們來畫圖幫助理解下:
從圖中我們可以看到若對象持有偏向鎖且鎖不是偏向本線程,則會最少會進(jìn)行一次cas替換,若cas替換失敗則會進(jìn)入偏向鎖的撤銷升級邏輯。因為偏向鎖cas替換后會進(jìn)入撤銷升級的邏輯,所以從效率上看偏向鎖更適合一個線程不斷的獲取鎖的場景,而事實上偏向鎖正是設(shè)計用于應(yīng)對一個線程獲取鎖的場景。
當(dāng)然synchronized的執(zhí)行邏輯還沒有結(jié)束,本篇博客我們只著重分析偏向鎖相關(guān)邏輯。筆者后續(xù)還會繼續(xù)分析synchronized的輕量級鎖和重量級鎖的邏輯,盡量還原最原汁原味的synchronized。
以上就是從匯編代碼開始全面解析synchronized還原最真實的偏向鎖的詳細(xì)內(nèi)容,更多關(guān)于匯編代碼解析synchronized還原偏向鎖的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
淺析shellcode 反匯編模擬運行及調(diào)試方法
這篇文章主要介紹了shellcode 反匯編,模擬運行以及調(diào)試方法,本文給大介紹的非常詳細(xì),具有一定的參考借鑒價值,需要的朋友可以參考下2020-02-02