import com.google.common.util.concurrent.AbstractFuture;
public class ResponseFuture<JsonPRotocol>extends AbstractFuture<JsonProtocol>
{
private final Executorexecutor;
public ResponseFuture()
{
if (ThreadLocalUtil.get("isServer") ==null)
{
//TODO 這里是錯誤的,把這行代碼移到一個單例的全局共享中取,避免每次new。如果是服務(wù)端,那么所有的服務(wù)端都共享一個線程池
executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
new RpcThreadFacotry("CallBack"));
}
else
{
//單線程執(zhí)行器
executor = MoreExecutors.directExecutor();
}
}
/**
* 當(dāng)響應(yīng)回來的時候,結(jié)果被設(shè)置到future中,因此從future中可以獲得一個異步的響應(yīng)結(jié)果
* @param responseProtocol
*/
public void onResponse(JsonProtocolresponseProtocol)
{
//向future中設(shè)置值
super.set(responseProtocol);
}
/**
* 當(dāng)響應(yīng)有結(jié)果時候可以直接runnable的方法
* @param runnable
*/
public void addCallBack(Runnablerunnable)
{
super.addListener(runnable,executor);
}
}
2、等待線程獲得響應(yīng)結(jié)果(使用future.get阻塞等待異步線程的響應(yīng))
ResponseFuture<JsonProtocol>reponseFuture = client.futureInvoke(JsonProtocolReqeust);
JsonProtocol JsonProtocolResponse =null;
try
{
JsonProtocolResponse =reponseFuture.get(JsonProtocolReqeust.getRpcMetadata().getTimeOut(),
TimeUnit.MILLISECONDS);
}
catch (ExecutionException e){}
catch (TimeoutException e)
{
throw new RuntimeException("調(diào)用遠(yuǎn)程服務(wù)響應(yīng)超時",e);
}
新聞熱點(diǎn)
疑難解答