国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

[netty核心類]--Channel和Unsafe類

2019-11-08 03:01:07
字體:
來源:轉載
供稿:網友

主要內容: (1)Channel 功能說明 (2)Unsafe 功能說明 (3)Channel的主要實現子類源碼分析 (4)Unsafe主要實現子類源碼分析

1.Channel功能說明

channel是netty網絡IO操作抽象出來的一個接口,主要功能有:網絡IO的讀寫,客戶端發起連接、主動關閉連接,鏈路關閉,獲取通信雙方的網絡地址等。下面分類進行介紹:

網絡IO操作 (1)Channel read(); 該函數將從當前的 Channel 中讀取數據到第一個inbound緩沖區中,如果讀取成功,就會觸發ChannelHandler.channelRead(ChannelHandlerContext,Object)事件,緊接著觸發ChannelHandler.channelReadComplete(ChannelHandlerContext)事件。

(2)ChannelFuture write(Object msg); 該函數將當前msg通過ChannelPipeline寫入到目標channel中。這里需要注意write()函數只把數據寫入到了消息發送緩沖區中,只有當調用flush()函數的時候,才真正寫入到Channel中被發送出去。這里我們可以直接調用writeAndFlush()函數實現這兩步。

(3)EventLoop eventLoop(); a)Channel 需要被注冊到EventLoop的多路復用器上面的,用于處理IO事件,通過eventLoop()函數我們可以獲取到Channel所注冊的EventLoop。

b)EventLoop本質上就是處理網絡讀寫事件的Reactor線程,不僅用來處理網絡事件,也可以用來執行定時任務和自定義的NioTask等任務。

(4)metadata() 在netty中,每個Channel就對應一個物理連接,每個連接都有自己的TCP參數配置,我們通過這個函數就可以獲得TCP的參數配置信息。

(5)parent() 對于服務端Channel而言,它的父Channel為空,對于客戶端Channel而言,他的父Channel就是創建它的ServerSocketChannel。

Channel的子類非常多,這里我選擇最重要的兩個子類來分析:用于服務端的通道NioServerSocketChannel和用于客戶端的通道NioSocketChannel

2. Channel子類NioServerSocketChannel和NioSocketChannel

NioServerSocketChannel是用于netty服務端的通道,它的繼承類圖如下: 這里寫圖片描述

NioSocketChannel是用于客戶端的Channel,它的繼承圖如下: 這里寫圖片描述

分析一下這兩個類的繼承圖就可以知道,他們都有公共的類,只是接口繼承的不同。它們都繼承于AbstractChannel、AbstractNioChannel類。后面不同的是NioServerSocketChannel繼承于AbstractNioMessageChannel,NioSocketChannel繼承于AbstractNioByteChannel。

下面就一步步來分析一下這些核心類: AbstractChannel (1)成員變量分析 AbstractChannel采用聚合的方式封裝各種功能。這里我列出一些能說明核心聚合功能的成員屬性如下:

//當前channel的父類channelPRivate final Channel parent;//當前channel的唯一idprivate final ChannelId id;//Unsafe實例private final Unsafe unsafe;//當前Channel對應的DefaultChannelPipelineprivate final DefaultChannelPipeline pipeline;//本地和遠端地址private volatile SocketAddress localAddress;private volatile SocketAddress remoteAddress;//當前Channel注冊所在的EventLoopprivate volatile EventLoop eventLoop;//當前Channel是否已被注冊private volatile boolean registered;

從成員變量的定義就可以看出,AbstractChannel聚合了所有Channel使用到的能力對象,由AbstractChannel來初始化和統一封裝,如果某些功能和子類強關聯,那就定義成抽象方法,繼承類自己實現。

(2)核心API源碼分析 我們知道netty的核心就是網路IO操作,所以我們也主要看一下網絡IO相關API的實現。在前面我們說到,當Channel進行網絡IO操作時會觸發ChannelPipeline中對應的事件方法。(這里不得不再次強調,netty是基于事件驅動的,我們可以理解為Channel進行網絡IO操作時會產生對應的IO事件,然后IO驅動事件在ChannelPipeline中傳播,由對應的ChannelHandler對事件進行處理)

netty基于事件驅動的模型,可以輕松通過事件定義來劃分事件攔截切面,方便業務的定制和功能擴展,相比AOP,其性能更加高。

下面給出一些核心的網絡IO相關的函數的源碼:

@Overridepublic ChannelFuture bind(SocketAddress localAddress) { return pipeline.bind(localAddress);}@Overridepublic ChannelFuture connect(SocketAddress remoteAddress) { return pipeline.connect(remoteAddress);}@Overridepublic Channel read() { pipeline.read(); return this;}@Overridepublic ChannelFuture write(Object msg) { return pipeline.write(msg);}@Overridepublic ChannelFuture write(Object msg, ChannelPromise promise) { return pipeline.write(msg, promise);}@Overridepublic ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg);}

這里只給出了一部分,稍微看一下我們就能看到共性:他們都是通過調用pipeline對象的方式實現的,這個pipeline對象是通過構造器實例化的,我們看看在構造其中實例化的代碼 pipeline = newChannelPipeline(); 繼續深入看newChannelPipeline()函數:

protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this);}

可知pipeline其實就是DefaultChannelPipeline類。

此外AbstractChannel也提供了一些公共API的實現,比如localAddress()和 remoteAddress()方法,實現源碼如下:

public SocketAddress localAddress() { SocketAddress localAddress = this.localAddress; if (localAddress == null) { try { this.localAddress = localAddress = unsafe().localAddress(); } catch (Throwable t) { // Sometimes fails on a closed socket in Windows. return null; } } return localAddress;}

首先從緩存的成員變量中獲取,當是第一次調用時就通過unfase對象的localAddress()獲取。

AbstractNioChannel (1)成員變量 同樣只列出核心的成員變量:

//private final SelectableChannel ch;protected final int readInterestOp;volatile SelectionKey selectionKey;

由于AbstractNioChannel是NioSocketChannel和NioServerSocketChannel需要共用,所以定義了一個java.nio.SocketChannel和java.nio.ServerSocketChannel的公共父類SelectableChannel,用于設置SelectableChannel參數和進行IO操作。

第二個參數是readInterestOp,其實含義和JDK中的SelectionKey的OP_READ參數類似,表示讀就緒。

第三個變量是一個volatile修飾的SelectionKey,該SelectionKey是Channel注冊到EventLoop后返回的選擇鍵。 由于一個通道可能會面臨多個線程的并發寫操作,所以當SelectionKey的狀態改變了之后必須保證其他的業務線程能夠感知到變化,所以需要使用volatile變量保證可視性。

(2)核心API的源碼分析 下面看看AbstractNioChannel中的Channel的注冊函數doRegister() :

protected void doRegister() throws Exception { //標識注冊是否成功 boolean selected = false; for (;;) { try { //調用JDK的NIO中的SelectableChannel的register方法, //將當前的channel注冊到EventLoop的多路復用器上 selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { //當當前注冊的selectionKey已經被取消,則拋出異常并處理, //eventLoop().selectNow()方法:將已經取消的selectionKey從多路復用器中刪除 eventLoop().selectNow(); selected = true; } else { // We forced a select Operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } }}

從上面的源碼中可知,通過局部變量selected來標識注冊操作是否成功,然后調用SelectableChannel的register方法將當前的channel注冊到EventLoop的多路復用器上。我們知道,當我們注冊channel到具體的多路復用選擇器上的時候是需要指定監聽的網絡操作位來表示Channel對哪幾種網絡事件感興趣。具體的定義在java.nio.channels.SelectionKey類中可以看到:

public static final int OP_READ = 1 << 0;//讀操作位public static final int OP_WRITE = 1 << 2;//寫操作位public static final int OP_CONNECT = 1 << 3;//客戶端連接服務器操作位public static final int OP_ACCEPT = 1 << 4;//服務端接收客戶端連接操作位

但是在AbstractNioChannel中注冊的卻是0,表示在注冊時對任何事件都不感興趣,僅僅完成注冊操作。

在注冊的時候可以指定附件,后續Channel接收到網絡事件通知時,可以從SelectionKey中重新獲取之前的附件進行處理,此處通過傳入this指針將AbstractNioChannel的子類自身作為附件注冊。如果當前Channel注冊成功,則返回selectionKey,通過selectionKey可以從多路復用器中獲取Channel對象。

AbstractNioByteChannel AbstractNioByteChannel只有一個成員變量,就是一個Runnable類型的flushTask來負責繼續寫半包消息,在這個Runnable的接口實現類里面,其實就是調用了flush()函數,保證緩沖區數據都寫入到channel中。

private Runnable flushTask;

在這個類里面最重要的方法就是 doWrite(ChannelOutboundBuffer in)方法。這個方法的實現很長,我就把它拆分成幾部分來講:

第一部分

int writeSpinCount = -1;boolean setOpWrite = false;//寫半包標識for (;;) { //彈出一條消息 Object msg = in.current(); if (msg == null) { //所有消息已經寫完 clearOpWrite(); //直接返回,所以incompleteWrite(...) 方法不會被調用 return; }

ChannelOutboundBuffer是一個環形數組,其實也就是一個環形緩沖區,服務端的數據是先寫入到緩沖區,然后再從緩沖區寫入到通道中的。

in.current(); 該方法先從消息環形數組中彈出一條消息,然后判斷該消息是否為空,如果是null說明緩沖區中所有待發送的消息全部都發送完畢了,調用clearOpWrite(); 方法清除半包標識然后直接返回退出循環。

下面看看清除半包標識做了什么:

protected final void clearOpWrite() { //獲取當前通道的SelectionKey final SelectionKey key = selectionKey(); //如果SelectionKey無效就直接退出函數 if (!key.isValid()) { return; } //獲取網絡操作位 final int interestOps = key.interestOps(); //與SelectionKey.OP_WRITE按位與,如果非0就表示當前SelectionKey是可讀的,也就是通道可讀的,然后清除寫操作位。 if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); }}

第二部分 其實也就是當所發送的消息時ByteBuf的時候,具體處理如下:

if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; //獲取可讀字節數 int readableBytes = buf.readableBytes(); //如果可讀字節數為0,直接從環形緩沖數組中刪除該消息,繼續循環處理其余消息 if (readableBytes == 0) { in.remove(); continue; } boolean done = false;//消息是否已經全部發送完畢標識 long flushedAmount = 0;//發送的總消息字節數 //獲取循環發送的次數 if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (!buf.isReadable()) { done = true; break; } } in.progress(flushedAmount); if (done) { in.remove(); } else { // Break the loop and so incompleteWrite(...) is called. break; }}

具體業務如下: (1)首先判斷ByteBuf可讀字節數是否為0,如果為0就直接從環形緩沖數組中刪除該消息,繼續循環處理其余消息

(2)創建標識消息是否完全發送標識done和發送的總消息字節數flushedAmount。之后從配置類中獲取循環發送的次數。循環發送的次數指一次發送沒有完成時繼續循環發送的次數。這里設置最大循環次數原因是為了避免IO線程一直嘗試寫操作時,IO寫線程無法處理其他IO操作,如果網絡IO太慢或則對方接收太慢會造成IO線程假死。 (一次發送沒有完成時稱為寫半包。)

調用doWriteBytes(buf);進行消息發送,不同的Channel有不同的實現,所以是抽象方法。如果本次發送的字節數是0,表示TCP緩沖區已滿,此時自旋再發送任然可能是0,所以講半包標識setOpWrite設置為true,退出循環,釋放IO線程,防止Io線程假死。

AbstractNioMessageChannel AbstractNioMessageChannel是服務端的Channel繼承的類,這個類里面主要的實現方法只有一個,就是doWrite(ChannelOutboundBuffer in); 源碼如下:

protected void doWrite(ChannelOutboundBuffer in) throws Exception { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); for (;;) { Object msg = in.current(); if (msg == null) { // Wrote all messages. if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } break; } try { boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) { if (doWriteMessage(msg, in)) { done = true; break; } } if (done) { in.remove(); } else { // Did not write all messages. if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } break; } } catch (IOException e) { if (continueOnWriteError()) { in.remove(e); } else { throw e; } } }}

該方法的實現與AbstractNioByteChannel類似,區別在于: (1)AbstractNioByteChannel調用doWriteBytes( );發送的是ByteBuf或則是FileRegion。

(2)AbstractNioMessageChannel 調用doWriteMessage(msg, in) 發送的是POJO對象。

NioServerSocketChannel NioServerSocketChannel主要是繼承了AbstractNioMessageChannel并且實現了io.netty.channel.socket.ServerSocketChannel接口。

成員屬性 我們首先看成員屬性和靜態函數:

private static ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); }}private final ServerSocketChannelConfig config;

定義了一個ServerSocketChannelConfig用于配置ServerSocketChannel的TCP參數,該對象在NioServerSocketChannel中實例化。

靜態的newSocket()方法用于通SelectorProvider.openServerSocketChannel(); 方法打開新ServerSocketChannel通道。

實現接口的方法實現 直接看源碼:

@Overridepublic boolean isActive() { return javaChannel().socket().isBound();}@Overridepublic InetSocketAddress remoteAddress() { return null;}//該函數獲取的是當前服務器的channel@Overrideprotected ServerSocketChannel javaChannel() { return (ServerSocketChannel) super.javaChannel();}@Overrideprotected SocketAddress localAddress0() { return javaChannel().socket().getLocalSocketAddress();}@Overrideprotected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); }}

java.net.ServerSocket的isBound()方法判斷服務端監聽端口是否處于綁定狀態,它的remoteAddress為空。

javaChannel() 的實現是java.nio.ServerSocketChannel,服務端在進行端口綁定的時候,可以指定backlog,也就是允許客戶端排隊的最大長度。

下面繼續看服務端Channel的doReadMessages()實現。

@Overrideprotected int doReadMessages(List<Object> buf) throws Exception { //1.獲取服務端的ServerSocketChannel,然后調用accept()接受新的客戶端; SocketChannel ch = javaChannel().accept(); try { //2.如果SocketChannel不為空,就利用當前的NioServerSocketChannel、EventLoop和SocketChannel創建新的NioSocketChannel,并加入到List<Object> if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0;}

客戶端-NioSocketChannel

(1)連接操作 重點分析與客戶端連接相關的操作:


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 同仁县| 依安县| 禹城市| 江西省| 绥芬河市| 旅游| 青州市| 秦皇岛市| 四川省| 阜新| 长寿区| 炎陵县| 阿图什市| 宿州市| 利川市| 宜城市| 阿拉善盟| 临汾市| 比如县| 信宜市| 龙里县| 肇源县| 三亚市| 横山县| 海丰县| 叙永县| 邳州市| 弥勒县| 临漳县| 丰县| 贡山| 博湖县| 宁城县| 保靖县| 汝南县| 沈丘县| 类乌齐县| 华亭县| 久治县| 广宗县| 广宗县|