背景
之前在實(shí)現(xiàn)熔斷降級(jí)組件時(shí),需要實(shí)現(xiàn)一個(gè)接口的超時(shí)中斷,意思是,業(yè)務(wù)在使用熔斷降級(jí)功能時(shí),在平臺(tái)上設(shè)置了一個(gè)超時(shí)時(shí)間,如果在請(qǐng)求進(jìn)入熔斷器開(kāi)始計(jì)時(shí),并且接口在超時(shí)時(shí)間內(nèi)沒(méi)有響應(yīng),則需要提早中斷該請(qǐng)求并返回。
比如正常下游接口的超時(shí)時(shí)間為800ms,但是因?yàn)樽陨順I(yè)務(wù)的特殊需求,最多只能等200ms,如果200ms之內(nèi)沒(méi)有數(shù)據(jù)返回,則返回降級(jí)數(shù)據(jù)。這里處理請(qǐng)求的線(xiàn)程可以看成是tomcat線(xiàn)程池中的一個(gè)線(xiàn)程,如果通過(guò)線(xiàn)程池返回的Future,可以很輕松的實(shí)現(xiàn)超時(shí)返回,但是這種情況下,并不能拿到Futrue,需要換一種思路。
思路
中斷一個(gè)線(xiàn)程的思路有哪些?
除了已經(jīng)廢棄的Thread.stop, Thread.suspend, Thread.resume 方法,剩下的貌似只有一種方案了,就是調(diào)用當(dāng)前線(xiàn)程的 interrupt() ,但是這個(gè)方法的作用并不是中斷線(xiàn)程,而是設(shè)置一個(gè)標(biāo)識(shí),通知該線(xiàn)程可以被中斷了,到底是繼續(xù)執(zhí)行,還是中斷返回,由線(xiàn)程本身自己決定。
具體來(lái)說(shuō),當(dāng)對(duì)一個(gè)線(xiàn)程調(diào)用了 interrupt() 之后,如果該線(xiàn)程處于被阻塞狀態(tài)(比如執(zhí)行了wait、sleep或join等方法),那么會(huì)立即退出阻塞狀態(tài),并拋出一個(gè) InterruptedException 異常,在代碼中catch這個(gè)異常進(jìn)行后續(xù)處理。如果線(xiàn)程一直處于運(yùn)行狀態(tài),那么只會(huì)把該線(xiàn)程的中斷標(biāo)志設(shè)置為 true,僅此而已,所以 interrupt() 并不能真正的中斷線(xiàn)程,不過(guò)在rpc調(diào)用的場(chǎng)景中,請(qǐng)求線(xiàn)程一般都處于阻塞狀態(tài),等待數(shù)據(jù)返回,這時(shí) interrupt() 方法是可以派上用場(chǎng)的。
那么,要實(shí)現(xiàn)指定超時(shí)時(shí)間內(nèi)中斷請(qǐng)求線(xiàn)程,還有最后一個(gè)問(wèn)題需要解決:什么時(shí)候,由誰(shuí)去執(zhí)行 interrupt() 方法?
必然這個(gè)方法只能由其它線(xiàn)程來(lái)執(zhí)行了(自己都阻塞了,執(zhí)行個(gè)鬼),而且是在請(qǐng)求進(jìn)入熔斷器時(shí),并在超時(shí)時(shí)間之后執(zhí)行,有點(diǎn)繞,比如超時(shí)時(shí)間是200ms,那么請(qǐng)求進(jìn)入熔斷器之后,再過(guò)200ms,就執(zhí)行 interrupt() ,但是在200ms之內(nèi)有數(shù)據(jù)返回,那么就不執(zhí)行 interrupt() 了。
實(shí)現(xiàn)
需求已經(jīng)很明確了,相當(dāng)于延遲執(zhí)行一個(gè)task,其內(nèi)部邏輯就是執(zhí)行請(qǐng)求線(xiàn)程的 interrupt() ,當(dāng)然還有其它的邏輯。
Runnable task = new Runnable() { @Override public void run() { try { thread.interrupt(); // 取消定時(shí)器任務(wù) f.cancel(); } catch (Exception e) { logger.error("Failed while ticking TimerListener", e); } }};
Doug Lea大神提供的 ScheduledThreadPoolExecutor 可以很好的滿(mǎn)足這個(gè)需求,通過(guò) scheduleAtFixedRate 方法可以很方便的實(shí)現(xiàn)在延遲指定時(shí)間之后執(zhí)行提交的任務(wù)。
ScheduledFuture<?> f = executor.scheduleAtFixedRate(task, timeout, timeout, TimeUnit.MILLISECONDS);
在請(qǐng)求進(jìn)入熔斷器時(shí),順便提交一個(gè)任務(wù)到線(xiàn)程池中等待執(zhí)行,如果接口在超時(shí)時(shí)間內(nèi)沒(méi)有返回,那么該任務(wù)會(huì)被觸發(fā),并執(zhí)行請(qǐng)求線(xiàn)程的 interrupt 方法,這樣就實(shí)現(xiàn)了請(qǐng)求線(xiàn)程的中斷(因?yàn)檫@時(shí)請(qǐng)求線(xiàn)程正在被阻塞,等待數(shù)據(jù)返回),另外需要清空定時(shí)任務(wù),不然這個(gè)任務(wù)會(huì)一直執(zhí)行。
如果接口正常返回了,也要記得清空定時(shí)任務(wù),并且在請(qǐng)求退出熔斷器的時(shí)候,記得恢復(fù)請(qǐng)求線(xiàn)程的中斷標(biāo)識(shí),如何恢復(fù)?在請(qǐng)求線(xiàn)程中執(zhí)行下面代碼即可。
Thread.interrupted();// 內(nèi)部邏輯public static boolean interrupted() { return currentThread().isInterrupted(true);}// 參數(shù)為true,可以清除中斷標(biāo)識(shí)private native boolean isInterrupted(boolean ClearInterrupted);
執(zhí)行當(dāng)前線(xiàn)程(即請(qǐng)求線(xiàn)程)的isInterrupted方法。
使用這種方式實(shí)現(xiàn)請(qǐng)求的超時(shí)中斷,在QPS很高的情況下,會(huì)有額外的性能損失,因?yàn)槊看握?qǐng)求都要提交一個(gè)任務(wù)到線(xiàn)程池中等待執(zhí)行。
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)VeVb武林網(wǎng)的支持。
新聞熱點(diǎn)
疑難解答
圖片精選