'Java 網絡IO編程總結'

Java 通信 進擊的IT程序員 2019-08-12
"

本文會從傳統的BIO到NIO再到AIO自淺至深介紹,並附上完整的代碼講解。

下面代碼中會使用這樣一個例子:客戶端發送一段算式的字符串到服務器,服務器計算後返回結果到客戶端。

代碼的所有說明,都直接作為註釋,嵌入到代碼中,看代碼時就能更容易理解,代碼中會用到一個計算結果的工具類,見文章代碼部分。

1、BIO編程

1.1、傳統的BIO編程

網絡編程的基本模型是C/S模型,即兩個進程間的通信。

服務端提供IP和監聽端口,客戶端通過連接操作想服務端監聽的地址發起連接請求,通過三次握手連接,如果連接成功建立,雙方就可以通過套接字進行通信。

傳統的同步阻塞模型開發中,ServerSocket負責綁定IP地址,啟動監聽端口;Socket負責發起連接操作。連接成功後,雙方通過輸入和輸出流進行同步阻塞式通信。

簡單的描述一下BIO的服務端通信模型:採用BIO通信模型的服務端,通常由一個獨立的Acceptor線程負責監聽客戶端的連接,它接收到客戶端連接請求之後為每個客戶端創建一個新的線程進行鏈路處理沒處理完成後,通過輸出流返回應答給客戶端,線程銷燬。即典型的一請求一應答通宵模型。

傳統BIO通信模型圖:

"

本文會從傳統的BIO到NIO再到AIO自淺至深介紹,並附上完整的代碼講解。

下面代碼中會使用這樣一個例子:客戶端發送一段算式的字符串到服務器,服務器計算後返回結果到客戶端。

代碼的所有說明,都直接作為註釋,嵌入到代碼中,看代碼時就能更容易理解,代碼中會用到一個計算結果的工具類,見文章代碼部分。

1、BIO編程

1.1、傳統的BIO編程

網絡編程的基本模型是C/S模型,即兩個進程間的通信。

服務端提供IP和監聽端口,客戶端通過連接操作想服務端監聽的地址發起連接請求,通過三次握手連接,如果連接成功建立,雙方就可以通過套接字進行通信。

傳統的同步阻塞模型開發中,ServerSocket負責綁定IP地址,啟動監聽端口;Socket負責發起連接操作。連接成功後,雙方通過輸入和輸出流進行同步阻塞式通信。

簡單的描述一下BIO的服務端通信模型:採用BIO通信模型的服務端,通常由一個獨立的Acceptor線程負責監聽客戶端的連接,它接收到客戶端連接請求之後為每個客戶端創建一個新的線程進行鏈路處理沒處理完成後,通過輸出流返回應答給客戶端,線程銷燬。即典型的一請求一應答通宵模型。

傳統BIO通信模型圖:

Java 網絡IO編程總結

該模型最大的問題就是缺乏彈性伸縮能力,當客戶端併發訪問量增加後,服務端的線程個數和客戶端併發訪問數呈1:1的正比關係,Java中的線程也是比較寶貴的系統資源,線程數量快速膨脹後,系統的性能將急劇下降,隨著訪問量的繼續增大,系統最終就死-掉-了。

同步阻塞式I/O創建的Server源碼:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* BIO服務端源碼
* @author yangtao__anxpp.com
* @version 1.0
*/
public final class ServerNormal {
\t//默認的端口號
\tprivate static int DEFAULT_PORT = 12345;
\t//單例的ServerSocket
\tprivate static ServerSocket server;
\t//根據傳入參數設置監聽端口,如果沒有參數調用以下方法並使用默認值
\tpublic static void start() throws IOException{
\t\t//使用默認值
\t\tstart(DEFAULT_PORT);
\t}
\t//這個方法不會被大量併發訪問,不太需要考慮效率,直接進行方法同步就行了
\tpublic synchronized static void start(int port) throws IOException{
\t\tif(server != null) return;
\t\ttry{
\t\t\t//通過構造函數創建ServerSocket
\t\t\t//如果端口合法且空閒,服務端就監聽成功
\t\t\tserver = new ServerSocket(port);
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t\t//通過無線循環監聽客戶端連接
\t\t\t//如果沒有客戶端接入,將阻塞在accept操作上。
\t\t\twhile(true){
\t\t\t\tSocket socket = server.accept();
\t\t\t\t//當有新的客戶端接入時,會執行下面的代碼
\t\t\t\t//然後創建一個新的線程處理這條Socket鏈路
\t\t\t\tnew Thread(new ServerHandler(socket)).start();
\t\t\t}
\t\t}finally{
\t\t\t//一些必要的清理工作
\t\t\tif(server != null){
\t\t\t\tSystem.out.println("服務器已關閉。");
\t\t\t\tserver.close();
\t\t\t\tserver = null;
\t\t\t}
\t\t}
\t}
}

客戶端消息處理線程ServerHandler源碼:

package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

import com.anxpp.io.utils.Calculator;
/**
* 客戶端線程
* @author yangtao__anxpp.com
* 用於處理一個客戶端的Socket鏈路
*/
public class ServerHandler implements Runnable{
\tprivate Socket socket;
\tpublic ServerHandler(Socket socket) {
\t\tthis.socket = socket;
\t}
\t@Override
\tpublic void run() {
\t\tBufferedReader in = null;
\t\tPrintWriter out = null;
\t\ttry{
\t\t\tin = new BufferedReader(new InputStreamReader(socket.getInputStream()));
\t\t\tout = new PrintWriter(socket.getOutputStream(),true);
\t\t\tString expression;
\t\t\tString result;
\t\t\twhile(true){
\t\t\t\t//通過BufferedReader讀取一行
\t\t\t\t//如果已經讀到輸入流尾部,返回null,退出循環
\t\t\t\t//如果得到非空值,就嘗試計算結果並返回
\t\t\t\tif((expression = in.readLine())==null) break;
\t\t\t\tSystem.out.println("服務器收到消息:" + expression);
\t\t\t\ttry{
\t\t\t\t\tresult = Calculator.cal(expression).toString();
\t\t\t\t}catch(Exception e){
\t\t\t\t\tresult = "計算錯誤:" + e.getMessage();
\t\t\t\t}
\t\t\t\tout.println(result);
\t\t\t}
\t\t}catch(Exception e){
\t\t\te.printStackTrace();
\t\t}finally{
\t\t\t//一些必要的清理工作
\t\t\tif(in != null){
\t\t\t\ttry {
\t\t\t\t\tin.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tin = null;
\t\t\t}
\t\t\tif(out != null){
\t\t\t\tout.close();
\t\t\t\tout = null;
\t\t\t}
\t\t\tif(socket != null){
\t\t\t\ttry {
\t\t\t\t\tsocket.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tsocket = null;
\t\t\t}
\t\t}
\t}
}

同步阻塞式I/O創建的Client源碼:

package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
* 阻塞式I/O創建的客戶端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Client {
\t//默認的端口號
\tprivate static int DEFAULT_SERVER_PORT = 12345;
\tprivate static String DEFAULT_SERVER_IP = "127.0.0.1";
\tpublic static void send(String expression){
\t\tsend(DEFAULT_SERVER_PORT,expression);
\t}
\tpublic static void send(int port,String expression){
\t\tSystem.out.println("算術表達式為:" + expression);
\t\tSocket socket = null;
\t\tBufferedReader in = null;
\t\tPrintWriter out = null;
\t\ttry{
\t\t\tsocket = new Socket(DEFAULT_SERVER_IP,port);
\t\t\tin = new BufferedReader(new InputStreamReader(socket.getInputStream()));
\t\t\tout = new PrintWriter(socket.getOutputStream(),true);
\t\t\tout.println(expression);
\t\t\tSystem.out.println("___結果為:" + in.readLine());
\t\t}catch(Exception e){
\t\t\te.printStackTrace();
\t\t}finally{
\t\t\t//一下必要的清理工作
\t\t\tif(in != null){
\t\t\t\ttry {
\t\t\t\t\tin.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tin = null;
\t\t\t}
\t\t\tif(out != null){
\t\t\t\tout.close();
\t\t\t\tout = null;
\t\t\t}
\t\t\tif(socket != null){
\t\t\t\ttry {
\t\t\t\t\tsocket.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tsocket = null;
\t\t\t}
\t\t}
\t}
}

測試代碼,為了方便在控制檯看輸出結果,放到同一個程序(jvm)中運行:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.util.Random;
/**
* 測試方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Test {
\t//測試主方法
\tpublic static void main(String[] args) throws InterruptedException {
\t\t//運行服務器
\t\tnew Thread(new Runnable() {
\t\t\t@Override
\t\t\tpublic void run() {
\t\t\t\ttry {
\t\t\t\t\tServerBetter.start();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t}
\t\t}).start();
\t\t//避免客戶端先於服務器啟動前執行代碼
\t\tThread.sleep(100);
\t\t//運行客戶端
\t\tchar operators[] = {'+','-','*','/'};
\t\tRandom random = new Random(System.currentTimeMillis());
\t\tnew Thread(new Runnable() {
\t\t\t@SuppressWarnings("static-access")
\t\t\t@Override
\t\t\tpublic void run() {
\t\t\t\twhile(true){
\t\t\t\t\t//隨機產生算術表達式
\t\t\t\t\tString expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);
\t\t\t\t\tClient.send(expression);
\t\t\t\t\ttry {
\t\t\t\t\t\tThread.currentThread().sleep(random.nextInt(1000));
\t\t\t\t\t} catch (InterruptedException e) {
\t\t\t\t\t\te.printStackTrace();
\t\t\t\t\t}
\t\t\t\t}
\t\t\t}
\t\t}).start();
\t}
}

其中一次的運行結果:

服務器已啟動,端口號:12345
算術表達式為:4-2
服務器收到消息:4-2
___結果為:2
算術表達式為:5-10
服務器收到消息:5-10
___結果為:-5
算術表達式為:0-9
服務器收到消息:0-9
___結果為:-9
算術表達式為:0+6
服務器收到消息:0+6
___結果為:6
算術表達式為:1/6
服務器收到消息:1/6
___結果為:0.16666666666666666
...

從以上代碼,很容易看出,BIO主要的問題在於每當有一個新的客戶端請求接入時,服務端必須創建一個新的線程來處理這條鏈路,在需要滿足高性能、高併發的場景是沒法應用的(大量創建新的線程會嚴重影響服務器性能,甚至罷工)。

1.2、偽異步I/O編程

為了改進這種一連接一線程的模型,我們可以使用線程池來管理這些線程(需要了解更多請參考前面提供的文章),實現1個或多個線程處理N個客戶端的模型(但是底層還是使用的同步阻塞I/O),通常被稱為“偽異步I/O模型“。

偽異步I/O模型圖:

"

本文會從傳統的BIO到NIO再到AIO自淺至深介紹,並附上完整的代碼講解。

下面代碼中會使用這樣一個例子:客戶端發送一段算式的字符串到服務器,服務器計算後返回結果到客戶端。

代碼的所有說明,都直接作為註釋,嵌入到代碼中,看代碼時就能更容易理解,代碼中會用到一個計算結果的工具類,見文章代碼部分。

1、BIO編程

1.1、傳統的BIO編程

網絡編程的基本模型是C/S模型,即兩個進程間的通信。

服務端提供IP和監聽端口,客戶端通過連接操作想服務端監聽的地址發起連接請求,通過三次握手連接,如果連接成功建立,雙方就可以通過套接字進行通信。

傳統的同步阻塞模型開發中,ServerSocket負責綁定IP地址,啟動監聽端口;Socket負責發起連接操作。連接成功後,雙方通過輸入和輸出流進行同步阻塞式通信。

簡單的描述一下BIO的服務端通信模型:採用BIO通信模型的服務端,通常由一個獨立的Acceptor線程負責監聽客戶端的連接,它接收到客戶端連接請求之後為每個客戶端創建一個新的線程進行鏈路處理沒處理完成後,通過輸出流返回應答給客戶端,線程銷燬。即典型的一請求一應答通宵模型。

傳統BIO通信模型圖:

Java 網絡IO編程總結

該模型最大的問題就是缺乏彈性伸縮能力,當客戶端併發訪問量增加後,服務端的線程個數和客戶端併發訪問數呈1:1的正比關係,Java中的線程也是比較寶貴的系統資源,線程數量快速膨脹後,系統的性能將急劇下降,隨著訪問量的繼續增大,系統最終就死-掉-了。

同步阻塞式I/O創建的Server源碼:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* BIO服務端源碼
* @author yangtao__anxpp.com
* @version 1.0
*/
public final class ServerNormal {
\t//默認的端口號
\tprivate static int DEFAULT_PORT = 12345;
\t//單例的ServerSocket
\tprivate static ServerSocket server;
\t//根據傳入參數設置監聽端口,如果沒有參數調用以下方法並使用默認值
\tpublic static void start() throws IOException{
\t\t//使用默認值
\t\tstart(DEFAULT_PORT);
\t}
\t//這個方法不會被大量併發訪問,不太需要考慮效率,直接進行方法同步就行了
\tpublic synchronized static void start(int port) throws IOException{
\t\tif(server != null) return;
\t\ttry{
\t\t\t//通過構造函數創建ServerSocket
\t\t\t//如果端口合法且空閒,服務端就監聽成功
\t\t\tserver = new ServerSocket(port);
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t\t//通過無線循環監聽客戶端連接
\t\t\t//如果沒有客戶端接入,將阻塞在accept操作上。
\t\t\twhile(true){
\t\t\t\tSocket socket = server.accept();
\t\t\t\t//當有新的客戶端接入時,會執行下面的代碼
\t\t\t\t//然後創建一個新的線程處理這條Socket鏈路
\t\t\t\tnew Thread(new ServerHandler(socket)).start();
\t\t\t}
\t\t}finally{
\t\t\t//一些必要的清理工作
\t\t\tif(server != null){
\t\t\t\tSystem.out.println("服務器已關閉。");
\t\t\t\tserver.close();
\t\t\t\tserver = null;
\t\t\t}
\t\t}
\t}
}

客戶端消息處理線程ServerHandler源碼:

package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

import com.anxpp.io.utils.Calculator;
/**
* 客戶端線程
* @author yangtao__anxpp.com
* 用於處理一個客戶端的Socket鏈路
*/
public class ServerHandler implements Runnable{
\tprivate Socket socket;
\tpublic ServerHandler(Socket socket) {
\t\tthis.socket = socket;
\t}
\t@Override
\tpublic void run() {
\t\tBufferedReader in = null;
\t\tPrintWriter out = null;
\t\ttry{
\t\t\tin = new BufferedReader(new InputStreamReader(socket.getInputStream()));
\t\t\tout = new PrintWriter(socket.getOutputStream(),true);
\t\t\tString expression;
\t\t\tString result;
\t\t\twhile(true){
\t\t\t\t//通過BufferedReader讀取一行
\t\t\t\t//如果已經讀到輸入流尾部,返回null,退出循環
\t\t\t\t//如果得到非空值,就嘗試計算結果並返回
\t\t\t\tif((expression = in.readLine())==null) break;
\t\t\t\tSystem.out.println("服務器收到消息:" + expression);
\t\t\t\ttry{
\t\t\t\t\tresult = Calculator.cal(expression).toString();
\t\t\t\t}catch(Exception e){
\t\t\t\t\tresult = "計算錯誤:" + e.getMessage();
\t\t\t\t}
\t\t\t\tout.println(result);
\t\t\t}
\t\t}catch(Exception e){
\t\t\te.printStackTrace();
\t\t}finally{
\t\t\t//一些必要的清理工作
\t\t\tif(in != null){
\t\t\t\ttry {
\t\t\t\t\tin.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tin = null;
\t\t\t}
\t\t\tif(out != null){
\t\t\t\tout.close();
\t\t\t\tout = null;
\t\t\t}
\t\t\tif(socket != null){
\t\t\t\ttry {
\t\t\t\t\tsocket.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tsocket = null;
\t\t\t}
\t\t}
\t}
}

同步阻塞式I/O創建的Client源碼:

package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
* 阻塞式I/O創建的客戶端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Client {
\t//默認的端口號
\tprivate static int DEFAULT_SERVER_PORT = 12345;
\tprivate static String DEFAULT_SERVER_IP = "127.0.0.1";
\tpublic static void send(String expression){
\t\tsend(DEFAULT_SERVER_PORT,expression);
\t}
\tpublic static void send(int port,String expression){
\t\tSystem.out.println("算術表達式為:" + expression);
\t\tSocket socket = null;
\t\tBufferedReader in = null;
\t\tPrintWriter out = null;
\t\ttry{
\t\t\tsocket = new Socket(DEFAULT_SERVER_IP,port);
\t\t\tin = new BufferedReader(new InputStreamReader(socket.getInputStream()));
\t\t\tout = new PrintWriter(socket.getOutputStream(),true);
\t\t\tout.println(expression);
\t\t\tSystem.out.println("___結果為:" + in.readLine());
\t\t}catch(Exception e){
\t\t\te.printStackTrace();
\t\t}finally{
\t\t\t//一下必要的清理工作
\t\t\tif(in != null){
\t\t\t\ttry {
\t\t\t\t\tin.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tin = null;
\t\t\t}
\t\t\tif(out != null){
\t\t\t\tout.close();
\t\t\t\tout = null;
\t\t\t}
\t\t\tif(socket != null){
\t\t\t\ttry {
\t\t\t\t\tsocket.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tsocket = null;
\t\t\t}
\t\t}
\t}
}

測試代碼,為了方便在控制檯看輸出結果,放到同一個程序(jvm)中運行:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.util.Random;
/**
* 測試方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Test {
\t//測試主方法
\tpublic static void main(String[] args) throws InterruptedException {
\t\t//運行服務器
\t\tnew Thread(new Runnable() {
\t\t\t@Override
\t\t\tpublic void run() {
\t\t\t\ttry {
\t\t\t\t\tServerBetter.start();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t}
\t\t}).start();
\t\t//避免客戶端先於服務器啟動前執行代碼
\t\tThread.sleep(100);
\t\t//運行客戶端
\t\tchar operators[] = {'+','-','*','/'};
\t\tRandom random = new Random(System.currentTimeMillis());
\t\tnew Thread(new Runnable() {
\t\t\t@SuppressWarnings("static-access")
\t\t\t@Override
\t\t\tpublic void run() {
\t\t\t\twhile(true){
\t\t\t\t\t//隨機產生算術表達式
\t\t\t\t\tString expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);
\t\t\t\t\tClient.send(expression);
\t\t\t\t\ttry {
\t\t\t\t\t\tThread.currentThread().sleep(random.nextInt(1000));
\t\t\t\t\t} catch (InterruptedException e) {
\t\t\t\t\t\te.printStackTrace();
\t\t\t\t\t}
\t\t\t\t}
\t\t\t}
\t\t}).start();
\t}
}

其中一次的運行結果:

服務器已啟動,端口號:12345
算術表達式為:4-2
服務器收到消息:4-2
___結果為:2
算術表達式為:5-10
服務器收到消息:5-10
___結果為:-5
算術表達式為:0-9
服務器收到消息:0-9
___結果為:-9
算術表達式為:0+6
服務器收到消息:0+6
___結果為:6
算術表達式為:1/6
服務器收到消息:1/6
___結果為:0.16666666666666666
...

從以上代碼,很容易看出,BIO主要的問題在於每當有一個新的客戶端請求接入時,服務端必須創建一個新的線程來處理這條鏈路,在需要滿足高性能、高併發的場景是沒法應用的(大量創建新的線程會嚴重影響服務器性能,甚至罷工)。

1.2、偽異步I/O編程

為了改進這種一連接一線程的模型,我們可以使用線程池來管理這些線程(需要了解更多請參考前面提供的文章),實現1個或多個線程處理N個客戶端的模型(但是底層還是使用的同步阻塞I/O),通常被稱為“偽異步I/O模型“。

偽異步I/O模型圖:

Java 網絡IO編程總結

實現很簡單,我們只需要將新建線程的地方,交給線程池管理即可,只需要改動剛剛的Server代碼即可:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* BIO服務端源碼__偽異步I/O
* @author yangtao__anxpp.com
* @version 1.0
*/
public final class ServerBetter {
\t//默認的端口號
\tprivate static int DEFAULT_PORT = 12345;
\t//單例的ServerSocket
\tprivate static ServerSocket server;
\t//線程池 懶漢式的單例
\tprivate static ExecutorService executorService = Executors.newFixedThreadPool(60);
\t//根據傳入參數設置監聽端口,如果沒有參數調用以下方法並使用默認值
\tpublic static void start() throws IOException{
\t\t//使用默認值
\t\tstart(DEFAULT_PORT);
\t}
\t//這個方法不會被大量併發訪問,不太需要考慮效率,直接進行方法同步就行了
\tpublic synchronized static void start(int port) throws IOException{
\t\tif(server != null) return;
\t\ttry{
\t\t\t//通過構造函數創建ServerSocket
\t\t\t//如果端口合法且空閒,服務端就監聽成功
\t\t\tserver = new ServerSocket(port);
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t\t//通過無線循環監聽客戶端連接
\t\t\t//如果沒有客戶端接入,將阻塞在accept操作上。
\t\t\twhile(true){
\t\t\t\tSocket socket = server.accept();
\t\t\t\t//當有新的客戶端接入時,會執行下面的代碼
\t\t\t\t//然後創建一個新的線程處理這條Socket鏈路
\t\t\t\texecutorService.execute(new ServerHandler(socket));
\t\t\t}
\t\t}finally{
\t\t\t//一些必要的清理工作
\t\t\tif(server != null){
\t\t\t\tSystem.out.println("服務器已關閉。");
\t\t\t\tserver.close();
\t\t\t\tserver = null;
\t\t\t}
\t\t}
\t}
}

測試運行結果是一樣的。

我們知道,如果使用CachedThreadPool線程池(不限制線程數量,如果不清楚請參考文首提供的文章),其實除了能自動幫我們管理線程(複用),看起來也就像是1:1的客戶端:線程數模型,而使用FixedThreadPool我們就有效的控制了線程的最大數量,保證了系統有限的資源的控制,實現了N:M的偽異步I/O模型。

但是,正因為限制了線程數量,如果發生大量併發請求,超過最大數量的線程就只能等待,直到線程池中的有空閒的線程可以被複用。而對Socket的輸入流就行讀取時,會一直阻塞,直到發生:

有數據可讀

可用數據以及讀取完畢

發生空指針或I/O異常

所以在讀取數據較慢時(比如數據量大、網絡傳輸慢等),大量併發的情況下,其他接入的消息,只能一直等待,這就是最大的弊端。

而後面即將介紹的NIO,就能解決這個難題。

2、NIO 編程

JDK 1.4中的java.nio.*包中引入新的Java I/O庫,其目的是提高速度。實際上,“舊”的I/O包已經使用NIO重新實現過,即使我們不顯式的使用NIO編程,也能從中受益。速度的提高在文件I/O和網絡I/O中都可能會發生,但本文只討論後者。

2.1、簡介

NIO我們一般認為是New I/O(也是官方的叫法),因為它是相對於老的I/O類庫新增的(其實在JDK 1.4中就已經被引入了,但這個名詞還會繼續用很久,即使它們在現在看來已經是“舊”的了,所以也提示我們在命名時,需要好好考慮),做了很大的改變。但民間跟多人稱之為Non-block I/O,即非阻塞I/O,因為這樣叫,更能體現它的特點。而下文中的NIO,不是指整個新的I/O庫,而是非阻塞I/O。

NIO提供了與傳統BIO模型中的Socket和ServerSocket相對應的SocketChannel和ServerSocketChannel兩種不同的套接字通道實現。

新增的著兩種通道都支持阻塞和非阻塞兩種模式。

阻塞模式使用就像傳統中的支持一樣,比較簡單,但是性能和可靠性都不好;非阻塞模式正好與之相反。

對於低負載、低併發的應用程序,可以使用同步阻塞I/O來提升開發速率和更好的維護性;對於高負載、高併發的(網絡)應用,應使用NIO的非阻塞模式來開發。

下面會先對基礎知識進行介紹。

2.2、緩衝區 Buffer

Buffer是一個對象,包含一些要寫入或者讀出的數據。

在NIO庫中,所有數據都是用緩衝區處理的。在讀取數據時,它是直接讀到緩衝區中的;在寫入數據時,也是寫入到緩衝區中。任何時候訪問NIO中的數據,都是通過緩衝區進行操作。

緩衝區實際上是一個數組,並提供了對數據結構化訪問以及維護讀寫位置等信息。

具體的緩存區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實現了相同的接口:Buffer。

2.3、通道 Channel

我們對數據的讀取和寫入要通過Channel,它就像水管一樣,是一個通道。通道不同於流的地方就是通道是雙向的,可以用於讀、寫和同時讀寫操作。

底層的操作系統的通道一般都是全雙工的,所以全雙工的Channel比流能更好的映射底層操作系統的API。

Channel主要分兩大類:

SelectableChannel:用戶網絡讀寫

FileChannel:用於文件操作

後面代碼會涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子類。

2.4、多路複用器 Selector

Selector是Java NIO 編程的基礎。

Selector提供選擇已經就緒的任務的能力:Selector會不斷輪詢註冊在其上的Channel,如果某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以獲取就緒Channel的集合,進行後續的I/O操作。

一個Selector可以同時輪詢多個Channel,因為JDK使用了epoll()代替傳統的select實現,所以沒有最大連接句柄1024/2048的限制。所以,只需要一個線程負責Selector的輪詢,就可以接入成千上萬的客戶端。

2.5、NIO服務端

代碼比傳統的Socket編程看起來要複雜不少。

直接貼代碼吧,以註釋的形式給出代碼說明。

NIO創建的Server源碼:

package com.anxpp.io.calculator.nio;
public class Server {
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static ServerHandle serverHandle;
\tpublic static void start(){
\t\tstart(DEFAULT_PORT);
\t}
\tpublic static synchronized void start(int port){
\t\tif(serverHandle!=null)
\t\t\tserverHandle.stop();
\t\tserverHandle = new ServerHandle(port);
\t\tnew Thread(serverHandle,"Server").start();
\t}
\tpublic static void main(String[] args){
\t\tstart();
\t}
}

ServerHandle:

package com.anxpp.io.calculator.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

import com.anxpp.io.utils.Calculator;
/**
* NIO服務端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class ServerHandle implements Runnable{
\tprivate Selector selector;
\tprivate ServerSocketChannel serverChannel;
\tprivate volatile boolean started;
\t/**
\t * 構造方法
\t * @param port 指定要監聽的端口號
\t */
\tpublic ServerHandle(int port) {
\t\ttry{
\t\t\t//創建選擇器
\t\t\tselector = Selector.open();
\t\t\t//打開監聽通道
\t\t\tserverChannel = ServerSocketChannel.open();
\t\t\t//如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式
\t\t\tserverChannel.configureBlocking(false);//開啟非阻塞模式
\t\t\t//綁定端口 backlog設為1024
\t\t\tserverChannel.socket().bind(new InetSocketAddress(port),1024);
\t\t\t//監聽客戶端連接請求
\t\t\tserverChannel.register(selector, SelectionKey.OP_ACCEPT);
\t\t\t//標記服務器已開啟
\t\t\tstarted = true;
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t}catch(IOException e){
\t\t\te.printStackTrace();
\t\t\tSystem.exit(1);
\t\t}
\t}
\tpublic void stop(){
\t\tstarted = false;
\t}
\t@Override
\tpublic void run() {
\t\t//循環遍歷selector
\t\twhile(started){
\t\t\ttry{
\t\t\t\t//無論是否有讀寫事件發生,selector每隔1s被喚醒一次
\t\t\t\tselector.select(1000);
\t\t\t\t//阻塞,只有當至少一個註冊的事件發生的時候才會繼續.
//\t\t\t\tselector.select();
\t\t\t\tSet<SelectionKey> keys = selector.selectedKeys();
\t\t\t\tIterator<SelectionKey> it = keys.iterator();
\t\t\t\tSelectionKey key = null;
\t\t\t\twhile(it.hasNext()){
\t\t\t\t\tkey = it.next();
\t\t\t\t\tit.remove();
\t\t\t\t\ttry{
\t\t\t\t\t\thandleInput(key);
\t\t\t\t\t}catch(Exception e){
\t\t\t\t\t\tif(key != null){
\t\t\t\t\t\t\tkey.cancel();
\t\t\t\t\t\t\tif(key.channel() != null){
\t\t\t\t\t\t\t\tkey.channel().close();
\t\t\t\t\t\t\t}
\t\t\t\t\t\t}
\t\t\t\t\t}
\t\t\t\t}
\t\t\t}catch(Throwable t){
\t\t\t\tt.printStackTrace();
\t\t\t}
\t\t}
\t\t//selector關閉後會自動釋放裡面管理的資源
\t\tif(selector != null)
\t\t\ttry{
\t\t\t\tselector.close();
\t\t\t}catch (Exception e) {
\t\t\t\te.printStackTrace();
\t\t\t}
\t}
\tprivate void handleInput(SelectionKey key) throws IOException{
\t\tif(key.isValid()){
\t\t\t//處理新接入的請求消息
\t\t\tif(key.isAcceptable()){
\t\t\t\tServerSocketChannel ssc = (ServerSocketChannel) key.channel();
\t\t\t\t//通過ServerSocketChannel的accept創建SocketChannel實例
\t\t\t\t//完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立
\t\t\t\tSocketChannel sc = ssc.accept();
\t\t\t\t//設置為非阻塞的
\t\t\t\tsc.configureBlocking(false);
\t\t\t\t//註冊為讀
\t\t\t\tsc.register(selector, SelectionKey.OP_READ);
\t\t\t}
\t\t\t//讀消息
\t\t\tif(key.isReadable()){
\t\t\t\tSocketChannel sc = (SocketChannel) key.channel();
\t\t\t\t//創建ByteBuffer,並開闢一個1M的緩衝區
\t\t\t\tByteBuffer buffer = ByteBuffer.allocate(1024);
\t\t\t\t//讀取請求碼流,返回讀取到的字節數
\t\t\t\tint readBytes = sc.read(buffer);
\t\t\t\t//讀取到字節,對字節進行編解碼
\t\t\t\tif(readBytes>0){
\t\t\t\t\t//將緩衝區當前的limit設置為position=0,用於後續對緩衝區的讀取操作
\t\t\t\t\tbuffer.flip();
\t\t\t\t\t//根據緩衝區可讀字節數創建字節數組
\t\t\t\t\tbyte[] bytes = new byte[buffer.remaining()];
\t\t\t\t\t//將緩衝區可讀字節數組複製到新建的數組中
\t\t\t\t\tbuffer.get(bytes);
\t\t\t\t\tString expression = new String(bytes,"UTF-8");
\t\t\t\t\tSystem.out.println("服務器收到消息:" + expression);
\t\t\t\t\t//處理數據
\t\t\t\t\tString result = null;
\t\t\t\t\ttry{
\t\t\t\t\t\tresult = Calculator.cal(expression).toString();
\t\t\t\t\t}catch(Exception e){
\t\t\t\t\t\tresult = "計算錯誤:" + e.getMessage();
\t\t\t\t\t}
\t\t\t\t\t//發送應答消息
\t\t\t\t\tdoWrite(sc,result);
\t\t\t\t}
\t\t\t\t//沒有讀取到字節 忽略
//\t\t\t\telse if(readBytes==0);
\t\t\t\t//鏈路已經關閉,釋放資源
\t\t\t\telse if(readBytes<0){
\t\t\t\t\tkey.cancel();
\t\t\t\t\tsc.close();
\t\t\t\t}
\t\t\t}
\t\t}
\t}
\t//異步發送應答消息
\tprivate void doWrite(SocketChannel channel,String response) throws IOException{
\t\t//將消息編碼為字節數組
\t\tbyte[] bytes = response.getBytes();
\t\t//根據數組容量創建ByteBuffer
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
\t\t//將字節數組複製到緩衝區
\t\twriteBuffer.put(bytes);
\t\t//flip操作
\t\twriteBuffer.flip();
\t\t//發送緩衝區的字節數組
\t\tchannel.write(writeBuffer);
\t\t//****此處不含處理“寫半包”的代碼
\t}
}

可以看到,創建NIO服務端的主要步驟如下:

打開ServerSocketChannel,監聽客戶端連接

綁定監聽端口,設置連接為非阻塞模式

創建Reactor線程,創建多路複用器並啟動線程

將ServerSocketChannel註冊到Reactor線程中的Selector上,監聽ACCEPT事件

Selector輪詢準備就緒的key

Selector監聽到新的客戶端接入,處理新的接入請求,完成TCP三次握手,簡歷物理鏈路

設置客戶端鏈路為非阻塞模式

將新接入的客戶端連接註冊到Reactor線程的Selector上,監聽讀操作,讀取客戶端發送的網絡消息

異步讀取客戶端消息到緩衝區

對Buffer編解碼,處理半包消息,將解碼成功的消息封裝成Task

將應答消息編碼為Buffer,調用SocketChannel的write將消息異步發送給客戶端

因為應答消息的發送,SocketChannel也是異步非阻塞的,所以不能保證一次能吧需要發送的數據發送完,此時就會出現寫半包的問題。我們需要註冊寫操作,不斷輪詢Selector將沒有發送完的消息發送完畢,然後通過Buffer的hasRemain()方法判斷消息是否發送完成。

2.6、NIO客戶端

還是直接上代碼吧,過程也不需要太多解釋了,跟服務端代碼有點類似。

Client:

package com.anxpp.io.calculator.nio;
public class Client {
\tprivate static String DEFAULT_HOST = "127.0.0.1";
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static ClientHandle clientHandle;
\tpublic static void start(){
\t\tstart(DEFAULT_HOST,DEFAULT_PORT);
\t}
\tpublic static synchronized void start(String ip,int port){
\t\tif(clientHandle!=null)
\t\t\tclientHandle.stop();
\t\tclientHandle = new ClientHandle(ip,port);
\t\tnew Thread(clientHandle,"Server").start();
\t}
\t//向服務器發送消息
\tpublic static boolean sendMsg(String msg) throws Exception{
\t\tif(msg.equals("q")) return false;
\t\tclientHandle.sendMsg(msg);
\t\treturn true;
\t}
\tpublic static void main(String[] args){
\t\tstart();
\t}
}

ClientHandle:

package com.anxpp.io.calculator.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* NIO客戶端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class ClientHandle implements Runnable{
\tprivate String host;
\tprivate int port;
\tprivate Selector selector;
\tprivate SocketChannel socketChannel;
\tprivate volatile boolean started;

\tpublic ClientHandle(String ip,int port) {
\t\tthis.host = ip;
\t\tthis.port = port;
\t\ttry{
\t\t\t//創建選擇器
\t\t\tselector = Selector.open();
\t\t\t//打開監聽通道
\t\t\tsocketChannel = SocketChannel.open();
\t\t\t//如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式
\t\t\tsocketChannel.configureBlocking(false);//開啟非阻塞模式
\t\t\tstarted = true;
\t\t}catch(IOException e){
\t\t\te.printStackTrace();
\t\t\tSystem.exit(1);
\t\t}
\t}
\tpublic void stop(){
\t\tstarted = false;
\t}
\t@Override
\tpublic void run() {
\t\ttry{
\t\t\tdoConnect();
\t\t}catch(IOException e){
\t\t\te.printStackTrace();
\t\t\tSystem.exit(1);
\t\t}
\t\t//循環遍歷selector
\t\twhile(started){
\t\t\ttry{
\t\t\t\t//無論是否有讀寫事件發生,selector每隔1s被喚醒一次
\t\t\t\tselector.select(1000);
\t\t\t\t//阻塞,只有當至少一個註冊的事件發生的時候才會繼續.
//\t\t\t\tselector.select();
\t\t\t\tSet<SelectionKey> keys = selector.selectedKeys();
\t\t\t\tIterator<SelectionKey> it = keys.iterator();
\t\t\t\tSelectionKey key = null;
\t\t\t\twhile(it.hasNext()){
\t\t\t\t\tkey = it.next();
\t\t\t\t\tit.remove();
\t\t\t\t\ttry{
\t\t\t\t\t\thandleInput(key);
\t\t\t\t\t}catch(Exception e){
\t\t\t\t\t\tif(key != null){
\t\t\t\t\t\t\tkey.cancel();
\t\t\t\t\t\t\tif(key.channel() != null){
\t\t\t\t\t\t\t\tkey.channel().close();
\t\t\t\t\t\t\t}
\t\t\t\t\t\t}
\t\t\t\t\t}
\t\t\t\t}
\t\t\t}catch(Exception e){
\t\t\t\te.printStackTrace();
\t\t\t\tSystem.exit(1);
\t\t\t}
\t\t}
\t\t//selector關閉後會自動釋放裡面管理的資源
\t\tif(selector != null)
\t\t\ttry{
\t\t\t\tselector.close();
\t\t\t}catch (Exception e) {
\t\t\t\te.printStackTrace();
\t\t\t}
\t}
\tprivate void handleInput(SelectionKey key) throws IOException{
\t\tif(key.isValid()){
\t\t\tSocketChannel sc = (SocketChannel) key.channel();
\t\t\tif(key.isConnectable()){
\t\t\t\tif(sc.finishConnect());
\t\t\t\telse System.exit(1);
\t\t\t}
\t\t\t//讀消息
\t\t\tif(key.isReadable()){
\t\t\t\t//創建ByteBuffer,並開闢一個1M的緩衝區
\t\t\t\tByteBuffer buffer = ByteBuffer.allocate(1024);
\t\t\t\t//讀取請求碼流,返回讀取到的字節數
\t\t\t\tint readBytes = sc.read(buffer);
\t\t\t\t//讀取到字節,對字節進行編解碼
\t\t\t\tif(readBytes>0){
\t\t\t\t\t//將緩衝區當前的limit設置為position=0,用於後續對緩衝區的讀取操作
\t\t\t\t\tbuffer.flip();
\t\t\t\t\t//根據緩衝區可讀字節數創建字節數組
\t\t\t\t\tbyte[] bytes = new byte[buffer.remaining()];
\t\t\t\t\t//將緩衝區可讀字節數組複製到新建的數組中
\t\t\t\t\tbuffer.get(bytes);
\t\t\t\t\tString result = new String(bytes,"UTF-8");
\t\t\t\t\tSystem.out.println("客戶端收到消息:" + result);
\t\t\t\t}
\t\t\t\t//沒有讀取到字節 忽略
//\t\t\t\telse if(readBytes==0);
\t\t\t\t//鏈路已經關閉,釋放資源
\t\t\t\telse if(readBytes<0){
\t\t\t\t\tkey.cancel();
\t\t\t\t\tsc.close();
\t\t\t\t}
\t\t\t}
\t\t}
\t}
\t//異步發送消息
\tprivate void doWrite(SocketChannel channel,String request) throws IOException{
\t\t//將消息編碼為字節數組
\t\tbyte[] bytes = request.getBytes();
\t\t//根據數組容量創建ByteBuffer
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
\t\t//將字節數組複製到緩衝區
\t\twriteBuffer.put(bytes);
\t\t//flip操作
\t\twriteBuffer.flip();
\t\t//發送緩衝區的字節數組
\t\tchannel.write(writeBuffer);
\t\t//****此處不含處理“寫半包”的代碼
\t}
\tprivate void doConnect() throws IOException{
\t\tif(socketChannel.connect(new InetSocketAddress(host,port)));
\t\telse socketChannel.register(selector, SelectionKey.OP_CONNECT);
\t}
\tpublic void sendMsg(String msg) throws Exception{
\t\tsocketChannel.register(selector, SelectionKey.OP_READ);
\t\tdoWrite(socketChannel, msg);
\t}
}

2.7、演示結果

首先運行服務器,順便也運行一個客戶端:

package com.anxpp.io.calculator.nio;
import java.util.Scanner;
/**
* 測試方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Test {
\t//測試主方法
\t@SuppressWarnings("resource")
\tpublic static void main(String[] args) throws Exception{
\t\t//運行服務器
\t\tServer.start();
\t\t//避免客戶端先於服務器啟動前執行代碼
\t\tThread.sleep(100);
\t\t//運行客戶端
\t\tClient.start();
\t\twhile(Client.sendMsg(new Scanner(System.in).nextLine()));
\t}
}

我們也可以單獨運行客戶端,效果都是一樣的。

一次測試的結果:

服務器已啟動,端口號:12345
1+2+3+4+5+6
服務器收到消息:1+2+3+4+5+6
客戶端收到消息:21
1*2/3-4+5*6/7-8
服務器收到消息:1*2/3-4+5*6/7-8
客戶端收到消息:-7.0476190476190474

運行多個客戶端,都是沒有問題的。


3、AIO編程

NIO 2.0引入了新的異步通道的概念,並提供了異步文件通道和異步套接字通道的實現。

異步的套接字通道時真正的異步非阻塞I/O,對應於UNIX網絡編程中的事件驅動I/O(AIO)。他不需要過多的Selector對註冊的通道進行輪詢即可實現異步讀寫,從而簡化了NIO的編程模型。

直接上代碼吧。

3.1、Server端代碼

Server:

package com.anxpp.io.calculator.aio.server;
/**
* AIO服務端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Server {
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static AsyncServerHandler serverHandle;
\tpublic volatile static long clientCount = 0;
\tpublic static void start(){
\t\tstart(DEFAULT_PORT);
\t}
\tpublic static synchronized void start(int port){
\t\tif(serverHandle!=null)
\t\t\treturn;
\t\tserverHandle = new AsyncServerHandler(port);
\t\tnew Thread(serverHandle,"Server").start();
\t}
\tpublic static void main(String[] args){
\t\tServer.start();
\t}
}
AsyncServerHandler:
package com.anxpp.io.calculator.aio.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;
public class AsyncServerHandler implements Runnable {
\tpublic CountDownLatch latch;
\tpublic AsynchronousServerSocketChannel channel;
\tpublic AsyncServerHandler(int port) {
\t\ttry {
\t\t\t//創建服務端通道
\t\t\tchannel = AsynchronousServerSocketChannel.open();
\t\t\t//綁定端口
\t\t\tchannel.bind(new InetSocketAddress(port));
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t@Override
\tpublic void run() {
\t\t//CountDownLatch初始化
\t\t//它的作用:在完成一組正在執行的操作之前,允許當前的現場一直阻塞
\t\t//此處,讓現場在此阻塞,防止服務端執行完成後退出
\t\t//也可以使用while(true)+sleep
\t\t//生成環境就不需要擔心這個問題,以為服務端是不會退出的
\t\tlatch = new CountDownLatch(1);
\t\t//用於接收客戶端的連接
\t\tchannel.accept(this,new AcceptHandler());
\t\ttry {
\t\t\tlatch.await();
\t\t} catch (InterruptedException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
}

AcceptHandler:

package com.anxpp.io.calculator.aio.server;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
//作為handler接收客戶端連接
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {
\t@Override
\tpublic void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {
\t\t//繼續接受其他客戶端的請求
\t\tServer.clientCount++;
\t\tSystem.out.println("連接的客戶端數:" + Server.clientCount);
\t\tserverHandler.channel.accept(serverHandler, this);
\t\t//創建新的Buffer
\t\tByteBuffer buffer = ByteBuffer.allocate(1024);
\t\t//異步讀 第三個參數為接收消息回調的業務Handler
\t\tchannel.read(buffer, buffer, new ReadHandler(channel));
\t}
\t@Override
\tpublic void failed(Throwable exc, AsyncServerHandler serverHandler) {
\t\texc.printStackTrace();
\t\tserverHandler.latch.countDown();
\t}
}

ReadHandler:

package com.anxpp.io.calculator.aio.server;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import com.anxpp.io.utils.Calculator;
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
\t//用於讀取半包消息和發送應答
\tprivate AsynchronousSocketChannel channel;
\tpublic ReadHandler(AsynchronousSocketChannel channel) {
\t\t\tthis.channel = channel;
\t}
\t//讀取到消息後的處理
\t@Override
\tpublic void completed(Integer result, ByteBuffer attachment) {
\t\t//flip操作
\t\tattachment.flip();
\t\t//根據
\t\tbyte[] message = new byte[attachment.remaining()];
\t\tattachment.get(message);
\t\ttry {
\t\t\tString expression = new String(message, "UTF-8");
\t\t\tSystem.out.println("服務器收到消息: " + expression);
\t\t\tString calrResult = null;
\t\t\ttry{
\t\t\t\tcalrResult = Calculator.cal(expression).toString();
\t\t\t}catch(Exception e){
\t\t\t\tcalrResult = "計算錯誤:" + e.getMessage();
\t\t\t}
\t\t\t//向客戶端發送消息
\t\t\tdoWrite(calrResult);
\t\t} catch (UnsupportedEncodingException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t//發送消息
\tprivate void doWrite(String result) {
\t\tbyte[] bytes = result.getBytes();
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
\t\twriteBuffer.put(bytes);
\t\twriteBuffer.flip();
\t\t//異步寫數據 參數與前面的read一樣
\t\tchannel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {
\t\t\t@Override
\t\t\tpublic void completed(Integer result, ByteBuffer buffer) {
\t\t\t\t//如果沒有發送完,就繼續發送直到完成
\t\t\t\tif (buffer.hasRemaining())
\t\t\t\t\tchannel.write(buffer, buffer, this);
\t\t\t\telse{
\t\t\t\t\t//創建新的Buffer
\t\t\t\t\tByteBuffer readBuffer = ByteBuffer.allocate(1024);
\t\t\t\t\t//異步讀 第三個參數為接收消息回調的業務Handler
\t\t\t\t\tchannel.read(readBuffer, readBuffer, new ReadHandler(channel));
\t\t\t\t}
\t\t\t}
\t\t\t@Override
\t\t\tpublic void failed(Throwable exc, ByteBuffer attachment) {
\t\t\t\ttry {
\t\t\t\t\tchannel.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t}
\t\t\t}
\t\t});
\t}
\t@Override
\tpublic void failed(Throwable exc, ByteBuffer attachment) {
\t\ttry {
\t\t\tthis.channel.close();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
}

OK,這樣就已經完成了,其實說起來也簡單,雖然代碼感覺很多,但是API比NIO的使用起來真的簡單多了,主要就是監聽、讀、寫等各種CompletionHandler。此處本應有一個WriteHandler的,確實,我們在ReadHandler中,以一個匿名內部類實現了它。

下面看客戶端代碼。

3.2、Client端代碼

Client:

package com.anxpp.io.calculator.aio.client;
import java.util.Scanner;
public class Client {
\tprivate static String DEFAULT_HOST = "127.0.0.1";
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static AsyncClientHandler clientHandle;
\tpublic static void start(){
\t\tstart(DEFAULT_HOST,DEFAULT_PORT);
\t}
\tpublic static synchronized void start(String ip,int port){
\t\tif(clientHandle!=null)
\t\t\treturn;
\t\tclientHandle = new AsyncClientHandler(ip,port);
\t\tnew Thread(clientHandle,"Client").start();
\t}
\t//向服務器發送消息
\tpublic static boolean sendMsg(String msg) throws Exception{
\t\tif(msg.equals("q")) return false;
\t\tclientHandle.sendMsg(msg);
\t\treturn true;
\t}
\t@SuppressWarnings("resource")
\tpublic static void main(String[] args) throws Exception{
\t\tClient.start();
\t\tSystem.out.println("請輸入請求消息:");
\t\tScanner scanner = new Scanner(System.in);
\t\twhile(Client.sendMsg(scanner.nextLine()));
\t}
}

AsyncClientHandler:

package com.anxpp.io.calculator.aio.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable {
\tprivate AsynchronousSocketChannel clientChannel;
\tprivate String host;
\tprivate int port;
\tprivate CountDownLatch latch;
\tpublic AsyncClientHandler(String host, int port) {
\t\tthis.host = host;
\t\tthis.port = port;
\t\ttry {
\t\t\t//創建異步的客戶端通道
\t\t\tclientChannel = AsynchronousSocketChannel.open();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t@Override
\tpublic void run() {
\t\t//創建CountDownLatch等待
\t\tlatch = new CountDownLatch(1);
\t\t//發起異步連接操作,回調參數就是這個類本身,如果連接成功會回調completed方法
\t\tclientChannel.connect(new InetSocketAddress(host, port), this, this);
\t\ttry {
\t\t\tlatch.await();
\t\t} catch (InterruptedException e1) {
\t\t\te1.printStackTrace();
\t\t}
\t\ttry {
\t\t\tclientChannel.close();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t//連接服務器成功
\t//意味著TCP三次握手完成
\t@Override
\tpublic void completed(Void result, AsyncClientHandler attachment) {
\t\tSystem.out.println("客戶端成功連接到服務器...");
\t}
\t//連接服務器失敗
\t@Override
\tpublic void failed(Throwable exc, AsyncClientHandler attachment) {
\t\tSystem.err.println("連接服務器失敗...");
\t\texc.printStackTrace();
\t\ttry {
\t\t\tclientChannel.close();
\t\t\tlatch.countDown();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t//向服務器發送消息
\tpublic void sendMsg(String msg){
\t\tbyte[] req = msg.getBytes();
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
\t\twriteBuffer.put(req);
\t\twriteBuffer.flip();
\t\t//異步寫
\t\tclientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch));
\t}
}

WriteHandler:

package com.anxpp.io.calculator.aio.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
\tprivate AsynchronousSocketChannel clientChannel;
\tprivate CountDownLatch latch;
\tpublic WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
\t\tthis.clientChannel = clientChannel;
\t\tthis.latch = latch;
\t}
\t@Override
\tpublic void completed(Integer result, ByteBuffer buffer) {
\t\t//完成全部數據的寫入
\t\tif (buffer.hasRemaining()) {
\t\t\tclientChannel.write(buffer, buffer, this);
\t\t}
\t\telse {
\t\t\t//讀取數據
\t\t\tByteBuffer readBuffer = ByteBuffer.allocate(1024);
\t\t\tclientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch));
\t\t}
\t}
\t@Override
\tpublic void failed(Throwable exc, ByteBuffer attachment) {
\t\tSystem.err.println("數據發送失敗...");
\t\ttry {
\t\t\tclientChannel.close();
\t\t\tlatch.countDown();
\t\t} catch (IOException e) {
\t\t}
\t}
}

ReadHandler:

package com.anxpp.io.calculator.aio.client;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
\tprivate AsynchronousSocketChannel clientChannel;
\tprivate CountDownLatch latch;
\tpublic ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
\t\tthis.clientChannel = clientChannel;
\t\tthis.latch = latch;
\t}
\t@Override
\tpublic void completed(Integer result,ByteBuffer buffer) {
\t\tbuffer.flip();
\t\tbyte[] bytes = new byte[buffer.remaining()];
\t\tbuffer.get(bytes);
\t\tString body;
\t\ttry {
\t\t\tbody = new String(bytes,"UTF-8");
\t\t\tSystem.out.println("客戶端收到結果:"+ body);
\t\t} catch (UnsupportedEncodingException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t@Override
\tpublic void failed(Throwable exc,ByteBuffer attachment) {
\t\tSystem.err.println("數據讀取失敗...");
\t\ttry {
\t\t\tclientChannel.close();
\t\t\tlatch.countDown();
\t\t} catch (IOException e) {
\t\t}
\t}
}

這個API使用起來真的是很順手。

3.3、測試

Test:

package com.anxpp.io.calculator.aio;
import java.util.Scanner;
import com.anxpp.io.calculator.aio.client.Client;
import com.anxpp.io.calculator.aio.server.Server;
/**
* 測試方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Test {
\t//測試主方法
\t@SuppressWarnings("resource")
\tpublic static void main(String[] args) throws Exception{
\t\t//運行服務器
\t\tServer.start();
\t\t//避免客戶端先於服務器啟動前執行代碼
\t\tThread.sleep(100);
\t\t//運行客戶端
\t\tClient.start();
\t\tSystem.out.println("請輸入請求消息:");
\t\tScanner scanner = new Scanner(System.in);
\t\twhile(Client.sendMsg(scanner.nextLine()));
\t}
}

我們可以在控制檯輸入我們需要計算的算數字符串,服務器就會返回結果,當然,我們也可以運行大量的客戶端,都是沒有問題的,以為此處設計為單例客戶端,所以也就沒有演示大量客戶端併發。

讀者可以自己修改Client類,然後開闢大量線程,並使用構造方法創建很多的客戶端測試。

下面是其中一次參數的輸出:

服務器已啟動,端口號:12345
請輸入請求消息:
客戶端成功連接到服務器...
連接的客戶端數:1
123456+789+456
服務器收到消息: 123456+789+456
客戶端收到結果:124701
9526*56
服務器收到消息: 9526*56
客戶端收到結果:533456
...

AIO是真正的異步非阻塞的,所以,在面對超級大量的客戶端,更能得心應手。

下面就比較一下,幾種I/O編程的優缺點。

4、各種I/O的對比

先以一張表來直觀的對比一下:

"

本文會從傳統的BIO到NIO再到AIO自淺至深介紹,並附上完整的代碼講解。

下面代碼中會使用這樣一個例子:客戶端發送一段算式的字符串到服務器,服務器計算後返回結果到客戶端。

代碼的所有說明,都直接作為註釋,嵌入到代碼中,看代碼時就能更容易理解,代碼中會用到一個計算結果的工具類,見文章代碼部分。

1、BIO編程

1.1、傳統的BIO編程

網絡編程的基本模型是C/S模型,即兩個進程間的通信。

服務端提供IP和監聽端口,客戶端通過連接操作想服務端監聽的地址發起連接請求,通過三次握手連接,如果連接成功建立,雙方就可以通過套接字進行通信。

傳統的同步阻塞模型開發中,ServerSocket負責綁定IP地址,啟動監聽端口;Socket負責發起連接操作。連接成功後,雙方通過輸入和輸出流進行同步阻塞式通信。

簡單的描述一下BIO的服務端通信模型:採用BIO通信模型的服務端,通常由一個獨立的Acceptor線程負責監聽客戶端的連接,它接收到客戶端連接請求之後為每個客戶端創建一個新的線程進行鏈路處理沒處理完成後,通過輸出流返回應答給客戶端,線程銷燬。即典型的一請求一應答通宵模型。

傳統BIO通信模型圖:

Java 網絡IO編程總結

該模型最大的問題就是缺乏彈性伸縮能力,當客戶端併發訪問量增加後,服務端的線程個數和客戶端併發訪問數呈1:1的正比關係,Java中的線程也是比較寶貴的系統資源,線程數量快速膨脹後,系統的性能將急劇下降,隨著訪問量的繼續增大,系統最終就死-掉-了。

同步阻塞式I/O創建的Server源碼:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* BIO服務端源碼
* @author yangtao__anxpp.com
* @version 1.0
*/
public final class ServerNormal {
\t//默認的端口號
\tprivate static int DEFAULT_PORT = 12345;
\t//單例的ServerSocket
\tprivate static ServerSocket server;
\t//根據傳入參數設置監聽端口,如果沒有參數調用以下方法並使用默認值
\tpublic static void start() throws IOException{
\t\t//使用默認值
\t\tstart(DEFAULT_PORT);
\t}
\t//這個方法不會被大量併發訪問,不太需要考慮效率,直接進行方法同步就行了
\tpublic synchronized static void start(int port) throws IOException{
\t\tif(server != null) return;
\t\ttry{
\t\t\t//通過構造函數創建ServerSocket
\t\t\t//如果端口合法且空閒,服務端就監聽成功
\t\t\tserver = new ServerSocket(port);
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t\t//通過無線循環監聽客戶端連接
\t\t\t//如果沒有客戶端接入,將阻塞在accept操作上。
\t\t\twhile(true){
\t\t\t\tSocket socket = server.accept();
\t\t\t\t//當有新的客戶端接入時,會執行下面的代碼
\t\t\t\t//然後創建一個新的線程處理這條Socket鏈路
\t\t\t\tnew Thread(new ServerHandler(socket)).start();
\t\t\t}
\t\t}finally{
\t\t\t//一些必要的清理工作
\t\t\tif(server != null){
\t\t\t\tSystem.out.println("服務器已關閉。");
\t\t\t\tserver.close();
\t\t\t\tserver = null;
\t\t\t}
\t\t}
\t}
}

客戶端消息處理線程ServerHandler源碼:

package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

import com.anxpp.io.utils.Calculator;
/**
* 客戶端線程
* @author yangtao__anxpp.com
* 用於處理一個客戶端的Socket鏈路
*/
public class ServerHandler implements Runnable{
\tprivate Socket socket;
\tpublic ServerHandler(Socket socket) {
\t\tthis.socket = socket;
\t}
\t@Override
\tpublic void run() {
\t\tBufferedReader in = null;
\t\tPrintWriter out = null;
\t\ttry{
\t\t\tin = new BufferedReader(new InputStreamReader(socket.getInputStream()));
\t\t\tout = new PrintWriter(socket.getOutputStream(),true);
\t\t\tString expression;
\t\t\tString result;
\t\t\twhile(true){
\t\t\t\t//通過BufferedReader讀取一行
\t\t\t\t//如果已經讀到輸入流尾部,返回null,退出循環
\t\t\t\t//如果得到非空值,就嘗試計算結果並返回
\t\t\t\tif((expression = in.readLine())==null) break;
\t\t\t\tSystem.out.println("服務器收到消息:" + expression);
\t\t\t\ttry{
\t\t\t\t\tresult = Calculator.cal(expression).toString();
\t\t\t\t}catch(Exception e){
\t\t\t\t\tresult = "計算錯誤:" + e.getMessage();
\t\t\t\t}
\t\t\t\tout.println(result);
\t\t\t}
\t\t}catch(Exception e){
\t\t\te.printStackTrace();
\t\t}finally{
\t\t\t//一些必要的清理工作
\t\t\tif(in != null){
\t\t\t\ttry {
\t\t\t\t\tin.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tin = null;
\t\t\t}
\t\t\tif(out != null){
\t\t\t\tout.close();
\t\t\t\tout = null;
\t\t\t}
\t\t\tif(socket != null){
\t\t\t\ttry {
\t\t\t\t\tsocket.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tsocket = null;
\t\t\t}
\t\t}
\t}
}

同步阻塞式I/O創建的Client源碼:

package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
* 阻塞式I/O創建的客戶端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Client {
\t//默認的端口號
\tprivate static int DEFAULT_SERVER_PORT = 12345;
\tprivate static String DEFAULT_SERVER_IP = "127.0.0.1";
\tpublic static void send(String expression){
\t\tsend(DEFAULT_SERVER_PORT,expression);
\t}
\tpublic static void send(int port,String expression){
\t\tSystem.out.println("算術表達式為:" + expression);
\t\tSocket socket = null;
\t\tBufferedReader in = null;
\t\tPrintWriter out = null;
\t\ttry{
\t\t\tsocket = new Socket(DEFAULT_SERVER_IP,port);
\t\t\tin = new BufferedReader(new InputStreamReader(socket.getInputStream()));
\t\t\tout = new PrintWriter(socket.getOutputStream(),true);
\t\t\tout.println(expression);
\t\t\tSystem.out.println("___結果為:" + in.readLine());
\t\t}catch(Exception e){
\t\t\te.printStackTrace();
\t\t}finally{
\t\t\t//一下必要的清理工作
\t\t\tif(in != null){
\t\t\t\ttry {
\t\t\t\t\tin.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tin = null;
\t\t\t}
\t\t\tif(out != null){
\t\t\t\tout.close();
\t\t\t\tout = null;
\t\t\t}
\t\t\tif(socket != null){
\t\t\t\ttry {
\t\t\t\t\tsocket.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tsocket = null;
\t\t\t}
\t\t}
\t}
}

測試代碼,為了方便在控制檯看輸出結果,放到同一個程序(jvm)中運行:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.util.Random;
/**
* 測試方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Test {
\t//測試主方法
\tpublic static void main(String[] args) throws InterruptedException {
\t\t//運行服務器
\t\tnew Thread(new Runnable() {
\t\t\t@Override
\t\t\tpublic void run() {
\t\t\t\ttry {
\t\t\t\t\tServerBetter.start();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t}
\t\t}).start();
\t\t//避免客戶端先於服務器啟動前執行代碼
\t\tThread.sleep(100);
\t\t//運行客戶端
\t\tchar operators[] = {'+','-','*','/'};
\t\tRandom random = new Random(System.currentTimeMillis());
\t\tnew Thread(new Runnable() {
\t\t\t@SuppressWarnings("static-access")
\t\t\t@Override
\t\t\tpublic void run() {
\t\t\t\twhile(true){
\t\t\t\t\t//隨機產生算術表達式
\t\t\t\t\tString expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);
\t\t\t\t\tClient.send(expression);
\t\t\t\t\ttry {
\t\t\t\t\t\tThread.currentThread().sleep(random.nextInt(1000));
\t\t\t\t\t} catch (InterruptedException e) {
\t\t\t\t\t\te.printStackTrace();
\t\t\t\t\t}
\t\t\t\t}
\t\t\t}
\t\t}).start();
\t}
}

其中一次的運行結果:

服務器已啟動,端口號:12345
算術表達式為:4-2
服務器收到消息:4-2
___結果為:2
算術表達式為:5-10
服務器收到消息:5-10
___結果為:-5
算術表達式為:0-9
服務器收到消息:0-9
___結果為:-9
算術表達式為:0+6
服務器收到消息:0+6
___結果為:6
算術表達式為:1/6
服務器收到消息:1/6
___結果為:0.16666666666666666
...

從以上代碼,很容易看出,BIO主要的問題在於每當有一個新的客戶端請求接入時,服務端必須創建一個新的線程來處理這條鏈路,在需要滿足高性能、高併發的場景是沒法應用的(大量創建新的線程會嚴重影響服務器性能,甚至罷工)。

1.2、偽異步I/O編程

為了改進這種一連接一線程的模型,我們可以使用線程池來管理這些線程(需要了解更多請參考前面提供的文章),實現1個或多個線程處理N個客戶端的模型(但是底層還是使用的同步阻塞I/O),通常被稱為“偽異步I/O模型“。

偽異步I/O模型圖:

Java 網絡IO編程總結

實現很簡單,我們只需要將新建線程的地方,交給線程池管理即可,只需要改動剛剛的Server代碼即可:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* BIO服務端源碼__偽異步I/O
* @author yangtao__anxpp.com
* @version 1.0
*/
public final class ServerBetter {
\t//默認的端口號
\tprivate static int DEFAULT_PORT = 12345;
\t//單例的ServerSocket
\tprivate static ServerSocket server;
\t//線程池 懶漢式的單例
\tprivate static ExecutorService executorService = Executors.newFixedThreadPool(60);
\t//根據傳入參數設置監聽端口,如果沒有參數調用以下方法並使用默認值
\tpublic static void start() throws IOException{
\t\t//使用默認值
\t\tstart(DEFAULT_PORT);
\t}
\t//這個方法不會被大量併發訪問,不太需要考慮效率,直接進行方法同步就行了
\tpublic synchronized static void start(int port) throws IOException{
\t\tif(server != null) return;
\t\ttry{
\t\t\t//通過構造函數創建ServerSocket
\t\t\t//如果端口合法且空閒,服務端就監聽成功
\t\t\tserver = new ServerSocket(port);
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t\t//通過無線循環監聽客戶端連接
\t\t\t//如果沒有客戶端接入,將阻塞在accept操作上。
\t\t\twhile(true){
\t\t\t\tSocket socket = server.accept();
\t\t\t\t//當有新的客戶端接入時,會執行下面的代碼
\t\t\t\t//然後創建一個新的線程處理這條Socket鏈路
\t\t\t\texecutorService.execute(new ServerHandler(socket));
\t\t\t}
\t\t}finally{
\t\t\t//一些必要的清理工作
\t\t\tif(server != null){
\t\t\t\tSystem.out.println("服務器已關閉。");
\t\t\t\tserver.close();
\t\t\t\tserver = null;
\t\t\t}
\t\t}
\t}
}

測試運行結果是一樣的。

我們知道,如果使用CachedThreadPool線程池(不限制線程數量,如果不清楚請參考文首提供的文章),其實除了能自動幫我們管理線程(複用),看起來也就像是1:1的客戶端:線程數模型,而使用FixedThreadPool我們就有效的控制了線程的最大數量,保證了系統有限的資源的控制,實現了N:M的偽異步I/O模型。

但是,正因為限制了線程數量,如果發生大量併發請求,超過最大數量的線程就只能等待,直到線程池中的有空閒的線程可以被複用。而對Socket的輸入流就行讀取時,會一直阻塞,直到發生:

有數據可讀

可用數據以及讀取完畢

發生空指針或I/O異常

所以在讀取數據較慢時(比如數據量大、網絡傳輸慢等),大量併發的情況下,其他接入的消息,只能一直等待,這就是最大的弊端。

而後面即將介紹的NIO,就能解決這個難題。

2、NIO 編程

JDK 1.4中的java.nio.*包中引入新的Java I/O庫,其目的是提高速度。實際上,“舊”的I/O包已經使用NIO重新實現過,即使我們不顯式的使用NIO編程,也能從中受益。速度的提高在文件I/O和網絡I/O中都可能會發生,但本文只討論後者。

2.1、簡介

NIO我們一般認為是New I/O(也是官方的叫法),因為它是相對於老的I/O類庫新增的(其實在JDK 1.4中就已經被引入了,但這個名詞還會繼續用很久,即使它們在現在看來已經是“舊”的了,所以也提示我們在命名時,需要好好考慮),做了很大的改變。但民間跟多人稱之為Non-block I/O,即非阻塞I/O,因為這樣叫,更能體現它的特點。而下文中的NIO,不是指整個新的I/O庫,而是非阻塞I/O。

NIO提供了與傳統BIO模型中的Socket和ServerSocket相對應的SocketChannel和ServerSocketChannel兩種不同的套接字通道實現。

新增的著兩種通道都支持阻塞和非阻塞兩種模式。

阻塞模式使用就像傳統中的支持一樣,比較簡單,但是性能和可靠性都不好;非阻塞模式正好與之相反。

對於低負載、低併發的應用程序,可以使用同步阻塞I/O來提升開發速率和更好的維護性;對於高負載、高併發的(網絡)應用,應使用NIO的非阻塞模式來開發。

下面會先對基礎知識進行介紹。

2.2、緩衝區 Buffer

Buffer是一個對象,包含一些要寫入或者讀出的數據。

在NIO庫中,所有數據都是用緩衝區處理的。在讀取數據時,它是直接讀到緩衝區中的;在寫入數據時,也是寫入到緩衝區中。任何時候訪問NIO中的數據,都是通過緩衝區進行操作。

緩衝區實際上是一個數組,並提供了對數據結構化訪問以及維護讀寫位置等信息。

具體的緩存區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實現了相同的接口:Buffer。

2.3、通道 Channel

我們對數據的讀取和寫入要通過Channel,它就像水管一樣,是一個通道。通道不同於流的地方就是通道是雙向的,可以用於讀、寫和同時讀寫操作。

底層的操作系統的通道一般都是全雙工的,所以全雙工的Channel比流能更好的映射底層操作系統的API。

Channel主要分兩大類:

SelectableChannel:用戶網絡讀寫

FileChannel:用於文件操作

後面代碼會涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子類。

2.4、多路複用器 Selector

Selector是Java NIO 編程的基礎。

Selector提供選擇已經就緒的任務的能力:Selector會不斷輪詢註冊在其上的Channel,如果某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以獲取就緒Channel的集合,進行後續的I/O操作。

一個Selector可以同時輪詢多個Channel,因為JDK使用了epoll()代替傳統的select實現,所以沒有最大連接句柄1024/2048的限制。所以,只需要一個線程負責Selector的輪詢,就可以接入成千上萬的客戶端。

2.5、NIO服務端

代碼比傳統的Socket編程看起來要複雜不少。

直接貼代碼吧,以註釋的形式給出代碼說明。

NIO創建的Server源碼:

package com.anxpp.io.calculator.nio;
public class Server {
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static ServerHandle serverHandle;
\tpublic static void start(){
\t\tstart(DEFAULT_PORT);
\t}
\tpublic static synchronized void start(int port){
\t\tif(serverHandle!=null)
\t\t\tserverHandle.stop();
\t\tserverHandle = new ServerHandle(port);
\t\tnew Thread(serverHandle,"Server").start();
\t}
\tpublic static void main(String[] args){
\t\tstart();
\t}
}

ServerHandle:

package com.anxpp.io.calculator.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

import com.anxpp.io.utils.Calculator;
/**
* NIO服務端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class ServerHandle implements Runnable{
\tprivate Selector selector;
\tprivate ServerSocketChannel serverChannel;
\tprivate volatile boolean started;
\t/**
\t * 構造方法
\t * @param port 指定要監聽的端口號
\t */
\tpublic ServerHandle(int port) {
\t\ttry{
\t\t\t//創建選擇器
\t\t\tselector = Selector.open();
\t\t\t//打開監聽通道
\t\t\tserverChannel = ServerSocketChannel.open();
\t\t\t//如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式
\t\t\tserverChannel.configureBlocking(false);//開啟非阻塞模式
\t\t\t//綁定端口 backlog設為1024
\t\t\tserverChannel.socket().bind(new InetSocketAddress(port),1024);
\t\t\t//監聽客戶端連接請求
\t\t\tserverChannel.register(selector, SelectionKey.OP_ACCEPT);
\t\t\t//標記服務器已開啟
\t\t\tstarted = true;
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t}catch(IOException e){
\t\t\te.printStackTrace();
\t\t\tSystem.exit(1);
\t\t}
\t}
\tpublic void stop(){
\t\tstarted = false;
\t}
\t@Override
\tpublic void run() {
\t\t//循環遍歷selector
\t\twhile(started){
\t\t\ttry{
\t\t\t\t//無論是否有讀寫事件發生,selector每隔1s被喚醒一次
\t\t\t\tselector.select(1000);
\t\t\t\t//阻塞,只有當至少一個註冊的事件發生的時候才會繼續.
//\t\t\t\tselector.select();
\t\t\t\tSet<SelectionKey> keys = selector.selectedKeys();
\t\t\t\tIterator<SelectionKey> it = keys.iterator();
\t\t\t\tSelectionKey key = null;
\t\t\t\twhile(it.hasNext()){
\t\t\t\t\tkey = it.next();
\t\t\t\t\tit.remove();
\t\t\t\t\ttry{
\t\t\t\t\t\thandleInput(key);
\t\t\t\t\t}catch(Exception e){
\t\t\t\t\t\tif(key != null){
\t\t\t\t\t\t\tkey.cancel();
\t\t\t\t\t\t\tif(key.channel() != null){
\t\t\t\t\t\t\t\tkey.channel().close();
\t\t\t\t\t\t\t}
\t\t\t\t\t\t}
\t\t\t\t\t}
\t\t\t\t}
\t\t\t}catch(Throwable t){
\t\t\t\tt.printStackTrace();
\t\t\t}
\t\t}
\t\t//selector關閉後會自動釋放裡面管理的資源
\t\tif(selector != null)
\t\t\ttry{
\t\t\t\tselector.close();
\t\t\t}catch (Exception e) {
\t\t\t\te.printStackTrace();
\t\t\t}
\t}
\tprivate void handleInput(SelectionKey key) throws IOException{
\t\tif(key.isValid()){
\t\t\t//處理新接入的請求消息
\t\t\tif(key.isAcceptable()){
\t\t\t\tServerSocketChannel ssc = (ServerSocketChannel) key.channel();
\t\t\t\t//通過ServerSocketChannel的accept創建SocketChannel實例
\t\t\t\t//完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立
\t\t\t\tSocketChannel sc = ssc.accept();
\t\t\t\t//設置為非阻塞的
\t\t\t\tsc.configureBlocking(false);
\t\t\t\t//註冊為讀
\t\t\t\tsc.register(selector, SelectionKey.OP_READ);
\t\t\t}
\t\t\t//讀消息
\t\t\tif(key.isReadable()){
\t\t\t\tSocketChannel sc = (SocketChannel) key.channel();
\t\t\t\t//創建ByteBuffer,並開闢一個1M的緩衝區
\t\t\t\tByteBuffer buffer = ByteBuffer.allocate(1024);
\t\t\t\t//讀取請求碼流,返回讀取到的字節數
\t\t\t\tint readBytes = sc.read(buffer);
\t\t\t\t//讀取到字節,對字節進行編解碼
\t\t\t\tif(readBytes>0){
\t\t\t\t\t//將緩衝區當前的limit設置為position=0,用於後續對緩衝區的讀取操作
\t\t\t\t\tbuffer.flip();
\t\t\t\t\t//根據緩衝區可讀字節數創建字節數組
\t\t\t\t\tbyte[] bytes = new byte[buffer.remaining()];
\t\t\t\t\t//將緩衝區可讀字節數組複製到新建的數組中
\t\t\t\t\tbuffer.get(bytes);
\t\t\t\t\tString expression = new String(bytes,"UTF-8");
\t\t\t\t\tSystem.out.println("服務器收到消息:" + expression);
\t\t\t\t\t//處理數據
\t\t\t\t\tString result = null;
\t\t\t\t\ttry{
\t\t\t\t\t\tresult = Calculator.cal(expression).toString();
\t\t\t\t\t}catch(Exception e){
\t\t\t\t\t\tresult = "計算錯誤:" + e.getMessage();
\t\t\t\t\t}
\t\t\t\t\t//發送應答消息
\t\t\t\t\tdoWrite(sc,result);
\t\t\t\t}
\t\t\t\t//沒有讀取到字節 忽略
//\t\t\t\telse if(readBytes==0);
\t\t\t\t//鏈路已經關閉,釋放資源
\t\t\t\telse if(readBytes<0){
\t\t\t\t\tkey.cancel();
\t\t\t\t\tsc.close();
\t\t\t\t}
\t\t\t}
\t\t}
\t}
\t//異步發送應答消息
\tprivate void doWrite(SocketChannel channel,String response) throws IOException{
\t\t//將消息編碼為字節數組
\t\tbyte[] bytes = response.getBytes();
\t\t//根據數組容量創建ByteBuffer
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
\t\t//將字節數組複製到緩衝區
\t\twriteBuffer.put(bytes);
\t\t//flip操作
\t\twriteBuffer.flip();
\t\t//發送緩衝區的字節數組
\t\tchannel.write(writeBuffer);
\t\t//****此處不含處理“寫半包”的代碼
\t}
}

可以看到,創建NIO服務端的主要步驟如下:

打開ServerSocketChannel,監聽客戶端連接

綁定監聽端口,設置連接為非阻塞模式

創建Reactor線程,創建多路複用器並啟動線程

將ServerSocketChannel註冊到Reactor線程中的Selector上,監聽ACCEPT事件

Selector輪詢準備就緒的key

Selector監聽到新的客戶端接入,處理新的接入請求,完成TCP三次握手,簡歷物理鏈路

設置客戶端鏈路為非阻塞模式

將新接入的客戶端連接註冊到Reactor線程的Selector上,監聽讀操作,讀取客戶端發送的網絡消息

異步讀取客戶端消息到緩衝區

對Buffer編解碼,處理半包消息,將解碼成功的消息封裝成Task

將應答消息編碼為Buffer,調用SocketChannel的write將消息異步發送給客戶端

因為應答消息的發送,SocketChannel也是異步非阻塞的,所以不能保證一次能吧需要發送的數據發送完,此時就會出現寫半包的問題。我們需要註冊寫操作,不斷輪詢Selector將沒有發送完的消息發送完畢,然後通過Buffer的hasRemain()方法判斷消息是否發送完成。

2.6、NIO客戶端

還是直接上代碼吧,過程也不需要太多解釋了,跟服務端代碼有點類似。

Client:

package com.anxpp.io.calculator.nio;
public class Client {
\tprivate static String DEFAULT_HOST = "127.0.0.1";
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static ClientHandle clientHandle;
\tpublic static void start(){
\t\tstart(DEFAULT_HOST,DEFAULT_PORT);
\t}
\tpublic static synchronized void start(String ip,int port){
\t\tif(clientHandle!=null)
\t\t\tclientHandle.stop();
\t\tclientHandle = new ClientHandle(ip,port);
\t\tnew Thread(clientHandle,"Server").start();
\t}
\t//向服務器發送消息
\tpublic static boolean sendMsg(String msg) throws Exception{
\t\tif(msg.equals("q")) return false;
\t\tclientHandle.sendMsg(msg);
\t\treturn true;
\t}
\tpublic static void main(String[] args){
\t\tstart();
\t}
}

ClientHandle:

package com.anxpp.io.calculator.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* NIO客戶端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class ClientHandle implements Runnable{
\tprivate String host;
\tprivate int port;
\tprivate Selector selector;
\tprivate SocketChannel socketChannel;
\tprivate volatile boolean started;

\tpublic ClientHandle(String ip,int port) {
\t\tthis.host = ip;
\t\tthis.port = port;
\t\ttry{
\t\t\t//創建選擇器
\t\t\tselector = Selector.open();
\t\t\t//打開監聽通道
\t\t\tsocketChannel = SocketChannel.open();
\t\t\t//如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式
\t\t\tsocketChannel.configureBlocking(false);//開啟非阻塞模式
\t\t\tstarted = true;
\t\t}catch(IOException e){
\t\t\te.printStackTrace();
\t\t\tSystem.exit(1);
\t\t}
\t}
\tpublic void stop(){
\t\tstarted = false;
\t}
\t@Override
\tpublic void run() {
\t\ttry{
\t\t\tdoConnect();
\t\t}catch(IOException e){
\t\t\te.printStackTrace();
\t\t\tSystem.exit(1);
\t\t}
\t\t//循環遍歷selector
\t\twhile(started){
\t\t\ttry{
\t\t\t\t//無論是否有讀寫事件發生,selector每隔1s被喚醒一次
\t\t\t\tselector.select(1000);
\t\t\t\t//阻塞,只有當至少一個註冊的事件發生的時候才會繼續.
//\t\t\t\tselector.select();
\t\t\t\tSet<SelectionKey> keys = selector.selectedKeys();
\t\t\t\tIterator<SelectionKey> it = keys.iterator();
\t\t\t\tSelectionKey key = null;
\t\t\t\twhile(it.hasNext()){
\t\t\t\t\tkey = it.next();
\t\t\t\t\tit.remove();
\t\t\t\t\ttry{
\t\t\t\t\t\thandleInput(key);
\t\t\t\t\t}catch(Exception e){
\t\t\t\t\t\tif(key != null){
\t\t\t\t\t\t\tkey.cancel();
\t\t\t\t\t\t\tif(key.channel() != null){
\t\t\t\t\t\t\t\tkey.channel().close();
\t\t\t\t\t\t\t}
\t\t\t\t\t\t}
\t\t\t\t\t}
\t\t\t\t}
\t\t\t}catch(Exception e){
\t\t\t\te.printStackTrace();
\t\t\t\tSystem.exit(1);
\t\t\t}
\t\t}
\t\t//selector關閉後會自動釋放裡面管理的資源
\t\tif(selector != null)
\t\t\ttry{
\t\t\t\tselector.close();
\t\t\t}catch (Exception e) {
\t\t\t\te.printStackTrace();
\t\t\t}
\t}
\tprivate void handleInput(SelectionKey key) throws IOException{
\t\tif(key.isValid()){
\t\t\tSocketChannel sc = (SocketChannel) key.channel();
\t\t\tif(key.isConnectable()){
\t\t\t\tif(sc.finishConnect());
\t\t\t\telse System.exit(1);
\t\t\t}
\t\t\t//讀消息
\t\t\tif(key.isReadable()){
\t\t\t\t//創建ByteBuffer,並開闢一個1M的緩衝區
\t\t\t\tByteBuffer buffer = ByteBuffer.allocate(1024);
\t\t\t\t//讀取請求碼流,返回讀取到的字節數
\t\t\t\tint readBytes = sc.read(buffer);
\t\t\t\t//讀取到字節,對字節進行編解碼
\t\t\t\tif(readBytes>0){
\t\t\t\t\t//將緩衝區當前的limit設置為position=0,用於後續對緩衝區的讀取操作
\t\t\t\t\tbuffer.flip();
\t\t\t\t\t//根據緩衝區可讀字節數創建字節數組
\t\t\t\t\tbyte[] bytes = new byte[buffer.remaining()];
\t\t\t\t\t//將緩衝區可讀字節數組複製到新建的數組中
\t\t\t\t\tbuffer.get(bytes);
\t\t\t\t\tString result = new String(bytes,"UTF-8");
\t\t\t\t\tSystem.out.println("客戶端收到消息:" + result);
\t\t\t\t}
\t\t\t\t//沒有讀取到字節 忽略
//\t\t\t\telse if(readBytes==0);
\t\t\t\t//鏈路已經關閉,釋放資源
\t\t\t\telse if(readBytes<0){
\t\t\t\t\tkey.cancel();
\t\t\t\t\tsc.close();
\t\t\t\t}
\t\t\t}
\t\t}
\t}
\t//異步發送消息
\tprivate void doWrite(SocketChannel channel,String request) throws IOException{
\t\t//將消息編碼為字節數組
\t\tbyte[] bytes = request.getBytes();
\t\t//根據數組容量創建ByteBuffer
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
\t\t//將字節數組複製到緩衝區
\t\twriteBuffer.put(bytes);
\t\t//flip操作
\t\twriteBuffer.flip();
\t\t//發送緩衝區的字節數組
\t\tchannel.write(writeBuffer);
\t\t//****此處不含處理“寫半包”的代碼
\t}
\tprivate void doConnect() throws IOException{
\t\tif(socketChannel.connect(new InetSocketAddress(host,port)));
\t\telse socketChannel.register(selector, SelectionKey.OP_CONNECT);
\t}
\tpublic void sendMsg(String msg) throws Exception{
\t\tsocketChannel.register(selector, SelectionKey.OP_READ);
\t\tdoWrite(socketChannel, msg);
\t}
}

2.7、演示結果

首先運行服務器,順便也運行一個客戶端:

package com.anxpp.io.calculator.nio;
import java.util.Scanner;
/**
* 測試方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Test {
\t//測試主方法
\t@SuppressWarnings("resource")
\tpublic static void main(String[] args) throws Exception{
\t\t//運行服務器
\t\tServer.start();
\t\t//避免客戶端先於服務器啟動前執行代碼
\t\tThread.sleep(100);
\t\t//運行客戶端
\t\tClient.start();
\t\twhile(Client.sendMsg(new Scanner(System.in).nextLine()));
\t}
}

我們也可以單獨運行客戶端,效果都是一樣的。

一次測試的結果:

服務器已啟動,端口號:12345
1+2+3+4+5+6
服務器收到消息:1+2+3+4+5+6
客戶端收到消息:21
1*2/3-4+5*6/7-8
服務器收到消息:1*2/3-4+5*6/7-8
客戶端收到消息:-7.0476190476190474

運行多個客戶端,都是沒有問題的。


3、AIO編程

NIO 2.0引入了新的異步通道的概念,並提供了異步文件通道和異步套接字通道的實現。

異步的套接字通道時真正的異步非阻塞I/O,對應於UNIX網絡編程中的事件驅動I/O(AIO)。他不需要過多的Selector對註冊的通道進行輪詢即可實現異步讀寫,從而簡化了NIO的編程模型。

直接上代碼吧。

3.1、Server端代碼

Server:

package com.anxpp.io.calculator.aio.server;
/**
* AIO服務端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Server {
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static AsyncServerHandler serverHandle;
\tpublic volatile static long clientCount = 0;
\tpublic static void start(){
\t\tstart(DEFAULT_PORT);
\t}
\tpublic static synchronized void start(int port){
\t\tif(serverHandle!=null)
\t\t\treturn;
\t\tserverHandle = new AsyncServerHandler(port);
\t\tnew Thread(serverHandle,"Server").start();
\t}
\tpublic static void main(String[] args){
\t\tServer.start();
\t}
}
AsyncServerHandler:
package com.anxpp.io.calculator.aio.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;
public class AsyncServerHandler implements Runnable {
\tpublic CountDownLatch latch;
\tpublic AsynchronousServerSocketChannel channel;
\tpublic AsyncServerHandler(int port) {
\t\ttry {
\t\t\t//創建服務端通道
\t\t\tchannel = AsynchronousServerSocketChannel.open();
\t\t\t//綁定端口
\t\t\tchannel.bind(new InetSocketAddress(port));
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t@Override
\tpublic void run() {
\t\t//CountDownLatch初始化
\t\t//它的作用:在完成一組正在執行的操作之前,允許當前的現場一直阻塞
\t\t//此處,讓現場在此阻塞,防止服務端執行完成後退出
\t\t//也可以使用while(true)+sleep
\t\t//生成環境就不需要擔心這個問題,以為服務端是不會退出的
\t\tlatch = new CountDownLatch(1);
\t\t//用於接收客戶端的連接
\t\tchannel.accept(this,new AcceptHandler());
\t\ttry {
\t\t\tlatch.await();
\t\t} catch (InterruptedException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
}

AcceptHandler:

package com.anxpp.io.calculator.aio.server;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
//作為handler接收客戶端連接
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {
\t@Override
\tpublic void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {
\t\t//繼續接受其他客戶端的請求
\t\tServer.clientCount++;
\t\tSystem.out.println("連接的客戶端數:" + Server.clientCount);
\t\tserverHandler.channel.accept(serverHandler, this);
\t\t//創建新的Buffer
\t\tByteBuffer buffer = ByteBuffer.allocate(1024);
\t\t//異步讀 第三個參數為接收消息回調的業務Handler
\t\tchannel.read(buffer, buffer, new ReadHandler(channel));
\t}
\t@Override
\tpublic void failed(Throwable exc, AsyncServerHandler serverHandler) {
\t\texc.printStackTrace();
\t\tserverHandler.latch.countDown();
\t}
}

ReadHandler:

package com.anxpp.io.calculator.aio.server;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import com.anxpp.io.utils.Calculator;
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
\t//用於讀取半包消息和發送應答
\tprivate AsynchronousSocketChannel channel;
\tpublic ReadHandler(AsynchronousSocketChannel channel) {
\t\t\tthis.channel = channel;
\t}
\t//讀取到消息後的處理
\t@Override
\tpublic void completed(Integer result, ByteBuffer attachment) {
\t\t//flip操作
\t\tattachment.flip();
\t\t//根據
\t\tbyte[] message = new byte[attachment.remaining()];
\t\tattachment.get(message);
\t\ttry {
\t\t\tString expression = new String(message, "UTF-8");
\t\t\tSystem.out.println("服務器收到消息: " + expression);
\t\t\tString calrResult = null;
\t\t\ttry{
\t\t\t\tcalrResult = Calculator.cal(expression).toString();
\t\t\t}catch(Exception e){
\t\t\t\tcalrResult = "計算錯誤:" + e.getMessage();
\t\t\t}
\t\t\t//向客戶端發送消息
\t\t\tdoWrite(calrResult);
\t\t} catch (UnsupportedEncodingException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t//發送消息
\tprivate void doWrite(String result) {
\t\tbyte[] bytes = result.getBytes();
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
\t\twriteBuffer.put(bytes);
\t\twriteBuffer.flip();
\t\t//異步寫數據 參數與前面的read一樣
\t\tchannel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {
\t\t\t@Override
\t\t\tpublic void completed(Integer result, ByteBuffer buffer) {
\t\t\t\t//如果沒有發送完,就繼續發送直到完成
\t\t\t\tif (buffer.hasRemaining())
\t\t\t\t\tchannel.write(buffer, buffer, this);
\t\t\t\telse{
\t\t\t\t\t//創建新的Buffer
\t\t\t\t\tByteBuffer readBuffer = ByteBuffer.allocate(1024);
\t\t\t\t\t//異步讀 第三個參數為接收消息回調的業務Handler
\t\t\t\t\tchannel.read(readBuffer, readBuffer, new ReadHandler(channel));
\t\t\t\t}
\t\t\t}
\t\t\t@Override
\t\t\tpublic void failed(Throwable exc, ByteBuffer attachment) {
\t\t\t\ttry {
\t\t\t\t\tchannel.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t}
\t\t\t}
\t\t});
\t}
\t@Override
\tpublic void failed(Throwable exc, ByteBuffer attachment) {
\t\ttry {
\t\t\tthis.channel.close();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
}

OK,這樣就已經完成了,其實說起來也簡單,雖然代碼感覺很多,但是API比NIO的使用起來真的簡單多了,主要就是監聽、讀、寫等各種CompletionHandler。此處本應有一個WriteHandler的,確實,我們在ReadHandler中,以一個匿名內部類實現了它。

下面看客戶端代碼。

3.2、Client端代碼

Client:

package com.anxpp.io.calculator.aio.client;
import java.util.Scanner;
public class Client {
\tprivate static String DEFAULT_HOST = "127.0.0.1";
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static AsyncClientHandler clientHandle;
\tpublic static void start(){
\t\tstart(DEFAULT_HOST,DEFAULT_PORT);
\t}
\tpublic static synchronized void start(String ip,int port){
\t\tif(clientHandle!=null)
\t\t\treturn;
\t\tclientHandle = new AsyncClientHandler(ip,port);
\t\tnew Thread(clientHandle,"Client").start();
\t}
\t//向服務器發送消息
\tpublic static boolean sendMsg(String msg) throws Exception{
\t\tif(msg.equals("q")) return false;
\t\tclientHandle.sendMsg(msg);
\t\treturn true;
\t}
\t@SuppressWarnings("resource")
\tpublic static void main(String[] args) throws Exception{
\t\tClient.start();
\t\tSystem.out.println("請輸入請求消息:");
\t\tScanner scanner = new Scanner(System.in);
\t\twhile(Client.sendMsg(scanner.nextLine()));
\t}
}

AsyncClientHandler:

package com.anxpp.io.calculator.aio.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable {
\tprivate AsynchronousSocketChannel clientChannel;
\tprivate String host;
\tprivate int port;
\tprivate CountDownLatch latch;
\tpublic AsyncClientHandler(String host, int port) {
\t\tthis.host = host;
\t\tthis.port = port;
\t\ttry {
\t\t\t//創建異步的客戶端通道
\t\t\tclientChannel = AsynchronousSocketChannel.open();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t@Override
\tpublic void run() {
\t\t//創建CountDownLatch等待
\t\tlatch = new CountDownLatch(1);
\t\t//發起異步連接操作,回調參數就是這個類本身,如果連接成功會回調completed方法
\t\tclientChannel.connect(new InetSocketAddress(host, port), this, this);
\t\ttry {
\t\t\tlatch.await();
\t\t} catch (InterruptedException e1) {
\t\t\te1.printStackTrace();
\t\t}
\t\ttry {
\t\t\tclientChannel.close();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t//連接服務器成功
\t//意味著TCP三次握手完成
\t@Override
\tpublic void completed(Void result, AsyncClientHandler attachment) {
\t\tSystem.out.println("客戶端成功連接到服務器...");
\t}
\t//連接服務器失敗
\t@Override
\tpublic void failed(Throwable exc, AsyncClientHandler attachment) {
\t\tSystem.err.println("連接服務器失敗...");
\t\texc.printStackTrace();
\t\ttry {
\t\t\tclientChannel.close();
\t\t\tlatch.countDown();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t//向服務器發送消息
\tpublic void sendMsg(String msg){
\t\tbyte[] req = msg.getBytes();
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
\t\twriteBuffer.put(req);
\t\twriteBuffer.flip();
\t\t//異步寫
\t\tclientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch));
\t}
}

WriteHandler:

package com.anxpp.io.calculator.aio.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
\tprivate AsynchronousSocketChannel clientChannel;
\tprivate CountDownLatch latch;
\tpublic WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
\t\tthis.clientChannel = clientChannel;
\t\tthis.latch = latch;
\t}
\t@Override
\tpublic void completed(Integer result, ByteBuffer buffer) {
\t\t//完成全部數據的寫入
\t\tif (buffer.hasRemaining()) {
\t\t\tclientChannel.write(buffer, buffer, this);
\t\t}
\t\telse {
\t\t\t//讀取數據
\t\t\tByteBuffer readBuffer = ByteBuffer.allocate(1024);
\t\t\tclientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch));
\t\t}
\t}
\t@Override
\tpublic void failed(Throwable exc, ByteBuffer attachment) {
\t\tSystem.err.println("數據發送失敗...");
\t\ttry {
\t\t\tclientChannel.close();
\t\t\tlatch.countDown();
\t\t} catch (IOException e) {
\t\t}
\t}
}

ReadHandler:

package com.anxpp.io.calculator.aio.client;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
\tprivate AsynchronousSocketChannel clientChannel;
\tprivate CountDownLatch latch;
\tpublic ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
\t\tthis.clientChannel = clientChannel;
\t\tthis.latch = latch;
\t}
\t@Override
\tpublic void completed(Integer result,ByteBuffer buffer) {
\t\tbuffer.flip();
\t\tbyte[] bytes = new byte[buffer.remaining()];
\t\tbuffer.get(bytes);
\t\tString body;
\t\ttry {
\t\t\tbody = new String(bytes,"UTF-8");
\t\t\tSystem.out.println("客戶端收到結果:"+ body);
\t\t} catch (UnsupportedEncodingException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t@Override
\tpublic void failed(Throwable exc,ByteBuffer attachment) {
\t\tSystem.err.println("數據讀取失敗...");
\t\ttry {
\t\t\tclientChannel.close();
\t\t\tlatch.countDown();
\t\t} catch (IOException e) {
\t\t}
\t}
}

這個API使用起來真的是很順手。

3.3、測試

Test:

package com.anxpp.io.calculator.aio;
import java.util.Scanner;
import com.anxpp.io.calculator.aio.client.Client;
import com.anxpp.io.calculator.aio.server.Server;
/**
* 測試方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Test {
\t//測試主方法
\t@SuppressWarnings("resource")
\tpublic static void main(String[] args) throws Exception{
\t\t//運行服務器
\t\tServer.start();
\t\t//避免客戶端先於服務器啟動前執行代碼
\t\tThread.sleep(100);
\t\t//運行客戶端
\t\tClient.start();
\t\tSystem.out.println("請輸入請求消息:");
\t\tScanner scanner = new Scanner(System.in);
\t\twhile(Client.sendMsg(scanner.nextLine()));
\t}
}

我們可以在控制檯輸入我們需要計算的算數字符串,服務器就會返回結果,當然,我們也可以運行大量的客戶端,都是沒有問題的,以為此處設計為單例客戶端,所以也就沒有演示大量客戶端併發。

讀者可以自己修改Client類,然後開闢大量線程,並使用構造方法創建很多的客戶端測試。

下面是其中一次參數的輸出:

服務器已啟動,端口號:12345
請輸入請求消息:
客戶端成功連接到服務器...
連接的客戶端數:1
123456+789+456
服務器收到消息: 123456+789+456
客戶端收到結果:124701
9526*56
服務器收到消息: 9526*56
客戶端收到結果:533456
...

AIO是真正的異步非阻塞的,所以,在面對超級大量的客戶端,更能得心應手。

下面就比較一下,幾種I/O編程的優缺點。

4、各種I/O的對比

先以一張表來直觀的對比一下:

Java 網絡IO編程總結

具體選擇什麼樣的模型或者NIO框架,完全基於業務的實際應用場景和性能需求,如果客戶端很少,服務器負荷不重,就沒有必要選擇開發起來相對不那麼簡單的NIO做服務端;相反,就應考慮使用NIO或者相關的框架了。

5、附錄

上文中服務端使用到的用於計算的工具類:

package com.anxpp.utils;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
public final class Calculator {
private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
public static Object cal(String expression) throws ScriptException{
return jse.eval(expression);
}
}

私信我:“資料”,可免費領取更多學習資料哦

"

本文會從傳統的BIO到NIO再到AIO自淺至深介紹,並附上完整的代碼講解。

下面代碼中會使用這樣一個例子:客戶端發送一段算式的字符串到服務器,服務器計算後返回結果到客戶端。

代碼的所有說明,都直接作為註釋,嵌入到代碼中,看代碼時就能更容易理解,代碼中會用到一個計算結果的工具類,見文章代碼部分。

1、BIO編程

1.1、傳統的BIO編程

網絡編程的基本模型是C/S模型,即兩個進程間的通信。

服務端提供IP和監聽端口,客戶端通過連接操作想服務端監聽的地址發起連接請求,通過三次握手連接,如果連接成功建立,雙方就可以通過套接字進行通信。

傳統的同步阻塞模型開發中,ServerSocket負責綁定IP地址,啟動監聽端口;Socket負責發起連接操作。連接成功後,雙方通過輸入和輸出流進行同步阻塞式通信。

簡單的描述一下BIO的服務端通信模型:採用BIO通信模型的服務端,通常由一個獨立的Acceptor線程負責監聽客戶端的連接,它接收到客戶端連接請求之後為每個客戶端創建一個新的線程進行鏈路處理沒處理完成後,通過輸出流返回應答給客戶端,線程銷燬。即典型的一請求一應答通宵模型。

傳統BIO通信模型圖:

Java 網絡IO編程總結

該模型最大的問題就是缺乏彈性伸縮能力,當客戶端併發訪問量增加後,服務端的線程個數和客戶端併發訪問數呈1:1的正比關係,Java中的線程也是比較寶貴的系統資源,線程數量快速膨脹後,系統的性能將急劇下降,隨著訪問量的繼續增大,系統最終就死-掉-了。

同步阻塞式I/O創建的Server源碼:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* BIO服務端源碼
* @author yangtao__anxpp.com
* @version 1.0
*/
public final class ServerNormal {
\t//默認的端口號
\tprivate static int DEFAULT_PORT = 12345;
\t//單例的ServerSocket
\tprivate static ServerSocket server;
\t//根據傳入參數設置監聽端口,如果沒有參數調用以下方法並使用默認值
\tpublic static void start() throws IOException{
\t\t//使用默認值
\t\tstart(DEFAULT_PORT);
\t}
\t//這個方法不會被大量併發訪問,不太需要考慮效率,直接進行方法同步就行了
\tpublic synchronized static void start(int port) throws IOException{
\t\tif(server != null) return;
\t\ttry{
\t\t\t//通過構造函數創建ServerSocket
\t\t\t//如果端口合法且空閒,服務端就監聽成功
\t\t\tserver = new ServerSocket(port);
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t\t//通過無線循環監聽客戶端連接
\t\t\t//如果沒有客戶端接入,將阻塞在accept操作上。
\t\t\twhile(true){
\t\t\t\tSocket socket = server.accept();
\t\t\t\t//當有新的客戶端接入時,會執行下面的代碼
\t\t\t\t//然後創建一個新的線程處理這條Socket鏈路
\t\t\t\tnew Thread(new ServerHandler(socket)).start();
\t\t\t}
\t\t}finally{
\t\t\t//一些必要的清理工作
\t\t\tif(server != null){
\t\t\t\tSystem.out.println("服務器已關閉。");
\t\t\t\tserver.close();
\t\t\t\tserver = null;
\t\t\t}
\t\t}
\t}
}

客戶端消息處理線程ServerHandler源碼:

package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

import com.anxpp.io.utils.Calculator;
/**
* 客戶端線程
* @author yangtao__anxpp.com
* 用於處理一個客戶端的Socket鏈路
*/
public class ServerHandler implements Runnable{
\tprivate Socket socket;
\tpublic ServerHandler(Socket socket) {
\t\tthis.socket = socket;
\t}
\t@Override
\tpublic void run() {
\t\tBufferedReader in = null;
\t\tPrintWriter out = null;
\t\ttry{
\t\t\tin = new BufferedReader(new InputStreamReader(socket.getInputStream()));
\t\t\tout = new PrintWriter(socket.getOutputStream(),true);
\t\t\tString expression;
\t\t\tString result;
\t\t\twhile(true){
\t\t\t\t//通過BufferedReader讀取一行
\t\t\t\t//如果已經讀到輸入流尾部,返回null,退出循環
\t\t\t\t//如果得到非空值,就嘗試計算結果並返回
\t\t\t\tif((expression = in.readLine())==null) break;
\t\t\t\tSystem.out.println("服務器收到消息:" + expression);
\t\t\t\ttry{
\t\t\t\t\tresult = Calculator.cal(expression).toString();
\t\t\t\t}catch(Exception e){
\t\t\t\t\tresult = "計算錯誤:" + e.getMessage();
\t\t\t\t}
\t\t\t\tout.println(result);
\t\t\t}
\t\t}catch(Exception e){
\t\t\te.printStackTrace();
\t\t}finally{
\t\t\t//一些必要的清理工作
\t\t\tif(in != null){
\t\t\t\ttry {
\t\t\t\t\tin.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tin = null;
\t\t\t}
\t\t\tif(out != null){
\t\t\t\tout.close();
\t\t\t\tout = null;
\t\t\t}
\t\t\tif(socket != null){
\t\t\t\ttry {
\t\t\t\t\tsocket.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tsocket = null;
\t\t\t}
\t\t}
\t}
}

同步阻塞式I/O創建的Client源碼:

package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
* 阻塞式I/O創建的客戶端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Client {
\t//默認的端口號
\tprivate static int DEFAULT_SERVER_PORT = 12345;
\tprivate static String DEFAULT_SERVER_IP = "127.0.0.1";
\tpublic static void send(String expression){
\t\tsend(DEFAULT_SERVER_PORT,expression);
\t}
\tpublic static void send(int port,String expression){
\t\tSystem.out.println("算術表達式為:" + expression);
\t\tSocket socket = null;
\t\tBufferedReader in = null;
\t\tPrintWriter out = null;
\t\ttry{
\t\t\tsocket = new Socket(DEFAULT_SERVER_IP,port);
\t\t\tin = new BufferedReader(new InputStreamReader(socket.getInputStream()));
\t\t\tout = new PrintWriter(socket.getOutputStream(),true);
\t\t\tout.println(expression);
\t\t\tSystem.out.println("___結果為:" + in.readLine());
\t\t}catch(Exception e){
\t\t\te.printStackTrace();
\t\t}finally{
\t\t\t//一下必要的清理工作
\t\t\tif(in != null){
\t\t\t\ttry {
\t\t\t\t\tin.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tin = null;
\t\t\t}
\t\t\tif(out != null){
\t\t\t\tout.close();
\t\t\t\tout = null;
\t\t\t}
\t\t\tif(socket != null){
\t\t\t\ttry {
\t\t\t\t\tsocket.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tsocket = null;
\t\t\t}
\t\t}
\t}
}

測試代碼,為了方便在控制檯看輸出結果,放到同一個程序(jvm)中運行:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.util.Random;
/**
* 測試方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Test {
\t//測試主方法
\tpublic static void main(String[] args) throws InterruptedException {
\t\t//運行服務器
\t\tnew Thread(new Runnable() {
\t\t\t@Override
\t\t\tpublic void run() {
\t\t\t\ttry {
\t\t\t\t\tServerBetter.start();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t}
\t\t}).start();
\t\t//避免客戶端先於服務器啟動前執行代碼
\t\tThread.sleep(100);
\t\t//運行客戶端
\t\tchar operators[] = {'+','-','*','/'};
\t\tRandom random = new Random(System.currentTimeMillis());
\t\tnew Thread(new Runnable() {
\t\t\t@SuppressWarnings("static-access")
\t\t\t@Override
\t\t\tpublic void run() {
\t\t\t\twhile(true){
\t\t\t\t\t//隨機產生算術表達式
\t\t\t\t\tString expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);
\t\t\t\t\tClient.send(expression);
\t\t\t\t\ttry {
\t\t\t\t\t\tThread.currentThread().sleep(random.nextInt(1000));
\t\t\t\t\t} catch (InterruptedException e) {
\t\t\t\t\t\te.printStackTrace();
\t\t\t\t\t}
\t\t\t\t}
\t\t\t}
\t\t}).start();
\t}
}

其中一次的運行結果:

服務器已啟動,端口號:12345
算術表達式為:4-2
服務器收到消息:4-2
___結果為:2
算術表達式為:5-10
服務器收到消息:5-10
___結果為:-5
算術表達式為:0-9
服務器收到消息:0-9
___結果為:-9
算術表達式為:0+6
服務器收到消息:0+6
___結果為:6
算術表達式為:1/6
服務器收到消息:1/6
___結果為:0.16666666666666666
...

從以上代碼,很容易看出,BIO主要的問題在於每當有一個新的客戶端請求接入時,服務端必須創建一個新的線程來處理這條鏈路,在需要滿足高性能、高併發的場景是沒法應用的(大量創建新的線程會嚴重影響服務器性能,甚至罷工)。

1.2、偽異步I/O編程

為了改進這種一連接一線程的模型,我們可以使用線程池來管理這些線程(需要了解更多請參考前面提供的文章),實現1個或多個線程處理N個客戶端的模型(但是底層還是使用的同步阻塞I/O),通常被稱為“偽異步I/O模型“。

偽異步I/O模型圖:

Java 網絡IO編程總結

實現很簡單,我們只需要將新建線程的地方,交給線程池管理即可,只需要改動剛剛的Server代碼即可:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* BIO服務端源碼__偽異步I/O
* @author yangtao__anxpp.com
* @version 1.0
*/
public final class ServerBetter {
\t//默認的端口號
\tprivate static int DEFAULT_PORT = 12345;
\t//單例的ServerSocket
\tprivate static ServerSocket server;
\t//線程池 懶漢式的單例
\tprivate static ExecutorService executorService = Executors.newFixedThreadPool(60);
\t//根據傳入參數設置監聽端口,如果沒有參數調用以下方法並使用默認值
\tpublic static void start() throws IOException{
\t\t//使用默認值
\t\tstart(DEFAULT_PORT);
\t}
\t//這個方法不會被大量併發訪問,不太需要考慮效率,直接進行方法同步就行了
\tpublic synchronized static void start(int port) throws IOException{
\t\tif(server != null) return;
\t\ttry{
\t\t\t//通過構造函數創建ServerSocket
\t\t\t//如果端口合法且空閒,服務端就監聽成功
\t\t\tserver = new ServerSocket(port);
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t\t//通過無線循環監聽客戶端連接
\t\t\t//如果沒有客戶端接入,將阻塞在accept操作上。
\t\t\twhile(true){
\t\t\t\tSocket socket = server.accept();
\t\t\t\t//當有新的客戶端接入時,會執行下面的代碼
\t\t\t\t//然後創建一個新的線程處理這條Socket鏈路
\t\t\t\texecutorService.execute(new ServerHandler(socket));
\t\t\t}
\t\t}finally{
\t\t\t//一些必要的清理工作
\t\t\tif(server != null){
\t\t\t\tSystem.out.println("服務器已關閉。");
\t\t\t\tserver.close();
\t\t\t\tserver = null;
\t\t\t}
\t\t}
\t}
}

測試運行結果是一樣的。

我們知道,如果使用CachedThreadPool線程池(不限制線程數量,如果不清楚請參考文首提供的文章),其實除了能自動幫我們管理線程(複用),看起來也就像是1:1的客戶端:線程數模型,而使用FixedThreadPool我們就有效的控制了線程的最大數量,保證了系統有限的資源的控制,實現了N:M的偽異步I/O模型。

但是,正因為限制了線程數量,如果發生大量併發請求,超過最大數量的線程就只能等待,直到線程池中的有空閒的線程可以被複用。而對Socket的輸入流就行讀取時,會一直阻塞,直到發生:

有數據可讀

可用數據以及讀取完畢

發生空指針或I/O異常

所以在讀取數據較慢時(比如數據量大、網絡傳輸慢等),大量併發的情況下,其他接入的消息,只能一直等待,這就是最大的弊端。

而後面即將介紹的NIO,就能解決這個難題。

2、NIO 編程

JDK 1.4中的java.nio.*包中引入新的Java I/O庫,其目的是提高速度。實際上,“舊”的I/O包已經使用NIO重新實現過,即使我們不顯式的使用NIO編程,也能從中受益。速度的提高在文件I/O和網絡I/O中都可能會發生,但本文只討論後者。

2.1、簡介

NIO我們一般認為是New I/O(也是官方的叫法),因為它是相對於老的I/O類庫新增的(其實在JDK 1.4中就已經被引入了,但這個名詞還會繼續用很久,即使它們在現在看來已經是“舊”的了,所以也提示我們在命名時,需要好好考慮),做了很大的改變。但民間跟多人稱之為Non-block I/O,即非阻塞I/O,因為這樣叫,更能體現它的特點。而下文中的NIO,不是指整個新的I/O庫,而是非阻塞I/O。

NIO提供了與傳統BIO模型中的Socket和ServerSocket相對應的SocketChannel和ServerSocketChannel兩種不同的套接字通道實現。

新增的著兩種通道都支持阻塞和非阻塞兩種模式。

阻塞模式使用就像傳統中的支持一樣,比較簡單,但是性能和可靠性都不好;非阻塞模式正好與之相反。

對於低負載、低併發的應用程序,可以使用同步阻塞I/O來提升開發速率和更好的維護性;對於高負載、高併發的(網絡)應用,應使用NIO的非阻塞模式來開發。

下面會先對基礎知識進行介紹。

2.2、緩衝區 Buffer

Buffer是一個對象,包含一些要寫入或者讀出的數據。

在NIO庫中,所有數據都是用緩衝區處理的。在讀取數據時,它是直接讀到緩衝區中的;在寫入數據時,也是寫入到緩衝區中。任何時候訪問NIO中的數據,都是通過緩衝區進行操作。

緩衝區實際上是一個數組,並提供了對數據結構化訪問以及維護讀寫位置等信息。

具體的緩存區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實現了相同的接口:Buffer。

2.3、通道 Channel

我們對數據的讀取和寫入要通過Channel,它就像水管一樣,是一個通道。通道不同於流的地方就是通道是雙向的,可以用於讀、寫和同時讀寫操作。

底層的操作系統的通道一般都是全雙工的,所以全雙工的Channel比流能更好的映射底層操作系統的API。

Channel主要分兩大類:

SelectableChannel:用戶網絡讀寫

FileChannel:用於文件操作

後面代碼會涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子類。

2.4、多路複用器 Selector

Selector是Java NIO 編程的基礎。

Selector提供選擇已經就緒的任務的能力:Selector會不斷輪詢註冊在其上的Channel,如果某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以獲取就緒Channel的集合,進行後續的I/O操作。

一個Selector可以同時輪詢多個Channel,因為JDK使用了epoll()代替傳統的select實現,所以沒有最大連接句柄1024/2048的限制。所以,只需要一個線程負責Selector的輪詢,就可以接入成千上萬的客戶端。

2.5、NIO服務端

代碼比傳統的Socket編程看起來要複雜不少。

直接貼代碼吧,以註釋的形式給出代碼說明。

NIO創建的Server源碼:

package com.anxpp.io.calculator.nio;
public class Server {
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static ServerHandle serverHandle;
\tpublic static void start(){
\t\tstart(DEFAULT_PORT);
\t}
\tpublic static synchronized void start(int port){
\t\tif(serverHandle!=null)
\t\t\tserverHandle.stop();
\t\tserverHandle = new ServerHandle(port);
\t\tnew Thread(serverHandle,"Server").start();
\t}
\tpublic static void main(String[] args){
\t\tstart();
\t}
}

ServerHandle:

package com.anxpp.io.calculator.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

import com.anxpp.io.utils.Calculator;
/**
* NIO服務端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class ServerHandle implements Runnable{
\tprivate Selector selector;
\tprivate ServerSocketChannel serverChannel;
\tprivate volatile boolean started;
\t/**
\t * 構造方法
\t * @param port 指定要監聽的端口號
\t */
\tpublic ServerHandle(int port) {
\t\ttry{
\t\t\t//創建選擇器
\t\t\tselector = Selector.open();
\t\t\t//打開監聽通道
\t\t\tserverChannel = ServerSocketChannel.open();
\t\t\t//如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式
\t\t\tserverChannel.configureBlocking(false);//開啟非阻塞模式
\t\t\t//綁定端口 backlog設為1024
\t\t\tserverChannel.socket().bind(new InetSocketAddress(port),1024);
\t\t\t//監聽客戶端連接請求
\t\t\tserverChannel.register(selector, SelectionKey.OP_ACCEPT);
\t\t\t//標記服務器已開啟
\t\t\tstarted = true;
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t}catch(IOException e){
\t\t\te.printStackTrace();
\t\t\tSystem.exit(1);
\t\t}
\t}
\tpublic void stop(){
\t\tstarted = false;
\t}
\t@Override
\tpublic void run() {
\t\t//循環遍歷selector
\t\twhile(started){
\t\t\ttry{
\t\t\t\t//無論是否有讀寫事件發生,selector每隔1s被喚醒一次
\t\t\t\tselector.select(1000);
\t\t\t\t//阻塞,只有當至少一個註冊的事件發生的時候才會繼續.
//\t\t\t\tselector.select();
\t\t\t\tSet<SelectionKey> keys = selector.selectedKeys();
\t\t\t\tIterator<SelectionKey> it = keys.iterator();
\t\t\t\tSelectionKey key = null;
\t\t\t\twhile(it.hasNext()){
\t\t\t\t\tkey = it.next();
\t\t\t\t\tit.remove();
\t\t\t\t\ttry{
\t\t\t\t\t\thandleInput(key);
\t\t\t\t\t}catch(Exception e){
\t\t\t\t\t\tif(key != null){
\t\t\t\t\t\t\tkey.cancel();
\t\t\t\t\t\t\tif(key.channel() != null){
\t\t\t\t\t\t\t\tkey.channel().close();
\t\t\t\t\t\t\t}
\t\t\t\t\t\t}
\t\t\t\t\t}
\t\t\t\t}
\t\t\t}catch(Throwable t){
\t\t\t\tt.printStackTrace();
\t\t\t}
\t\t}
\t\t//selector關閉後會自動釋放裡面管理的資源
\t\tif(selector != null)
\t\t\ttry{
\t\t\t\tselector.close();
\t\t\t}catch (Exception e) {
\t\t\t\te.printStackTrace();
\t\t\t}
\t}
\tprivate void handleInput(SelectionKey key) throws IOException{
\t\tif(key.isValid()){
\t\t\t//處理新接入的請求消息
\t\t\tif(key.isAcceptable()){
\t\t\t\tServerSocketChannel ssc = (ServerSocketChannel) key.channel();
\t\t\t\t//通過ServerSocketChannel的accept創建SocketChannel實例
\t\t\t\t//完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立
\t\t\t\tSocketChannel sc = ssc.accept();
\t\t\t\t//設置為非阻塞的
\t\t\t\tsc.configureBlocking(false);
\t\t\t\t//註冊為讀
\t\t\t\tsc.register(selector, SelectionKey.OP_READ);
\t\t\t}
\t\t\t//讀消息
\t\t\tif(key.isReadable()){
\t\t\t\tSocketChannel sc = (SocketChannel) key.channel();
\t\t\t\t//創建ByteBuffer,並開闢一個1M的緩衝區
\t\t\t\tByteBuffer buffer = ByteBuffer.allocate(1024);
\t\t\t\t//讀取請求碼流,返回讀取到的字節數
\t\t\t\tint readBytes = sc.read(buffer);
\t\t\t\t//讀取到字節,對字節進行編解碼
\t\t\t\tif(readBytes>0){
\t\t\t\t\t//將緩衝區當前的limit設置為position=0,用於後續對緩衝區的讀取操作
\t\t\t\t\tbuffer.flip();
\t\t\t\t\t//根據緩衝區可讀字節數創建字節數組
\t\t\t\t\tbyte[] bytes = new byte[buffer.remaining()];
\t\t\t\t\t//將緩衝區可讀字節數組複製到新建的數組中
\t\t\t\t\tbuffer.get(bytes);
\t\t\t\t\tString expression = new String(bytes,"UTF-8");
\t\t\t\t\tSystem.out.println("服務器收到消息:" + expression);
\t\t\t\t\t//處理數據
\t\t\t\t\tString result = null;
\t\t\t\t\ttry{
\t\t\t\t\t\tresult = Calculator.cal(expression).toString();
\t\t\t\t\t}catch(Exception e){
\t\t\t\t\t\tresult = "計算錯誤:" + e.getMessage();
\t\t\t\t\t}
\t\t\t\t\t//發送應答消息
\t\t\t\t\tdoWrite(sc,result);
\t\t\t\t}
\t\t\t\t//沒有讀取到字節 忽略
//\t\t\t\telse if(readBytes==0);
\t\t\t\t//鏈路已經關閉,釋放資源
\t\t\t\telse if(readBytes<0){
\t\t\t\t\tkey.cancel();
\t\t\t\t\tsc.close();
\t\t\t\t}
\t\t\t}
\t\t}
\t}
\t//異步發送應答消息
\tprivate void doWrite(SocketChannel channel,String response) throws IOException{
\t\t//將消息編碼為字節數組
\t\tbyte[] bytes = response.getBytes();
\t\t//根據數組容量創建ByteBuffer
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
\t\t//將字節數組複製到緩衝區
\t\twriteBuffer.put(bytes);
\t\t//flip操作
\t\twriteBuffer.flip();
\t\t//發送緩衝區的字節數組
\t\tchannel.write(writeBuffer);
\t\t//****此處不含處理“寫半包”的代碼
\t}
}

可以看到,創建NIO服務端的主要步驟如下:

打開ServerSocketChannel,監聽客戶端連接

綁定監聽端口,設置連接為非阻塞模式

創建Reactor線程,創建多路複用器並啟動線程

將ServerSocketChannel註冊到Reactor線程中的Selector上,監聽ACCEPT事件

Selector輪詢準備就緒的key

Selector監聽到新的客戶端接入,處理新的接入請求,完成TCP三次握手,簡歷物理鏈路

設置客戶端鏈路為非阻塞模式

將新接入的客戶端連接註冊到Reactor線程的Selector上,監聽讀操作,讀取客戶端發送的網絡消息

異步讀取客戶端消息到緩衝區

對Buffer編解碼,處理半包消息,將解碼成功的消息封裝成Task

將應答消息編碼為Buffer,調用SocketChannel的write將消息異步發送給客戶端

因為應答消息的發送,SocketChannel也是異步非阻塞的,所以不能保證一次能吧需要發送的數據發送完,此時就會出現寫半包的問題。我們需要註冊寫操作,不斷輪詢Selector將沒有發送完的消息發送完畢,然後通過Buffer的hasRemain()方法判斷消息是否發送完成。

2.6、NIO客戶端

還是直接上代碼吧,過程也不需要太多解釋了,跟服務端代碼有點類似。

Client:

package com.anxpp.io.calculator.nio;
public class Client {
\tprivate static String DEFAULT_HOST = "127.0.0.1";
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static ClientHandle clientHandle;
\tpublic static void start(){
\t\tstart(DEFAULT_HOST,DEFAULT_PORT);
\t}
\tpublic static synchronized void start(String ip,int port){
\t\tif(clientHandle!=null)
\t\t\tclientHandle.stop();
\t\tclientHandle = new ClientHandle(ip,port);
\t\tnew Thread(clientHandle,"Server").start();
\t}
\t//向服務器發送消息
\tpublic static boolean sendMsg(String msg) throws Exception{
\t\tif(msg.equals("q")) return false;
\t\tclientHandle.sendMsg(msg);
\t\treturn true;
\t}
\tpublic static void main(String[] args){
\t\tstart();
\t}
}

ClientHandle:

package com.anxpp.io.calculator.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* NIO客戶端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class ClientHandle implements Runnable{
\tprivate String host;
\tprivate int port;
\tprivate Selector selector;
\tprivate SocketChannel socketChannel;
\tprivate volatile boolean started;

\tpublic ClientHandle(String ip,int port) {
\t\tthis.host = ip;
\t\tthis.port = port;
\t\ttry{
\t\t\t//創建選擇器
\t\t\tselector = Selector.open();
\t\t\t//打開監聽通道
\t\t\tsocketChannel = SocketChannel.open();
\t\t\t//如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式
\t\t\tsocketChannel.configureBlocking(false);//開啟非阻塞模式
\t\t\tstarted = true;
\t\t}catch(IOException e){
\t\t\te.printStackTrace();
\t\t\tSystem.exit(1);
\t\t}
\t}
\tpublic void stop(){
\t\tstarted = false;
\t}
\t@Override
\tpublic void run() {
\t\ttry{
\t\t\tdoConnect();
\t\t}catch(IOException e){
\t\t\te.printStackTrace();
\t\t\tSystem.exit(1);
\t\t}
\t\t//循環遍歷selector
\t\twhile(started){
\t\t\ttry{
\t\t\t\t//無論是否有讀寫事件發生,selector每隔1s被喚醒一次
\t\t\t\tselector.select(1000);
\t\t\t\t//阻塞,只有當至少一個註冊的事件發生的時候才會繼續.
//\t\t\t\tselector.select();
\t\t\t\tSet<SelectionKey> keys = selector.selectedKeys();
\t\t\t\tIterator<SelectionKey> it = keys.iterator();
\t\t\t\tSelectionKey key = null;
\t\t\t\twhile(it.hasNext()){
\t\t\t\t\tkey = it.next();
\t\t\t\t\tit.remove();
\t\t\t\t\ttry{
\t\t\t\t\t\thandleInput(key);
\t\t\t\t\t}catch(Exception e){
\t\t\t\t\t\tif(key != null){
\t\t\t\t\t\t\tkey.cancel();
\t\t\t\t\t\t\tif(key.channel() != null){
\t\t\t\t\t\t\t\tkey.channel().close();
\t\t\t\t\t\t\t}
\t\t\t\t\t\t}
\t\t\t\t\t}
\t\t\t\t}
\t\t\t}catch(Exception e){
\t\t\t\te.printStackTrace();
\t\t\t\tSystem.exit(1);
\t\t\t}
\t\t}
\t\t//selector關閉後會自動釋放裡面管理的資源
\t\tif(selector != null)
\t\t\ttry{
\t\t\t\tselector.close();
\t\t\t}catch (Exception e) {
\t\t\t\te.printStackTrace();
\t\t\t}
\t}
\tprivate void handleInput(SelectionKey key) throws IOException{
\t\tif(key.isValid()){
\t\t\tSocketChannel sc = (SocketChannel) key.channel();
\t\t\tif(key.isConnectable()){
\t\t\t\tif(sc.finishConnect());
\t\t\t\telse System.exit(1);
\t\t\t}
\t\t\t//讀消息
\t\t\tif(key.isReadable()){
\t\t\t\t//創建ByteBuffer,並開闢一個1M的緩衝區
\t\t\t\tByteBuffer buffer = ByteBuffer.allocate(1024);
\t\t\t\t//讀取請求碼流,返回讀取到的字節數
\t\t\t\tint readBytes = sc.read(buffer);
\t\t\t\t//讀取到字節,對字節進行編解碼
\t\t\t\tif(readBytes>0){
\t\t\t\t\t//將緩衝區當前的limit設置為position=0,用於後續對緩衝區的讀取操作
\t\t\t\t\tbuffer.flip();
\t\t\t\t\t//根據緩衝區可讀字節數創建字節數組
\t\t\t\t\tbyte[] bytes = new byte[buffer.remaining()];
\t\t\t\t\t//將緩衝區可讀字節數組複製到新建的數組中
\t\t\t\t\tbuffer.get(bytes);
\t\t\t\t\tString result = new String(bytes,"UTF-8");
\t\t\t\t\tSystem.out.println("客戶端收到消息:" + result);
\t\t\t\t}
\t\t\t\t//沒有讀取到字節 忽略
//\t\t\t\telse if(readBytes==0);
\t\t\t\t//鏈路已經關閉,釋放資源
\t\t\t\telse if(readBytes<0){
\t\t\t\t\tkey.cancel();
\t\t\t\t\tsc.close();
\t\t\t\t}
\t\t\t}
\t\t}
\t}
\t//異步發送消息
\tprivate void doWrite(SocketChannel channel,String request) throws IOException{
\t\t//將消息編碼為字節數組
\t\tbyte[] bytes = request.getBytes();
\t\t//根據數組容量創建ByteBuffer
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
\t\t//將字節數組複製到緩衝區
\t\twriteBuffer.put(bytes);
\t\t//flip操作
\t\twriteBuffer.flip();
\t\t//發送緩衝區的字節數組
\t\tchannel.write(writeBuffer);
\t\t//****此處不含處理“寫半包”的代碼
\t}
\tprivate void doConnect() throws IOException{
\t\tif(socketChannel.connect(new InetSocketAddress(host,port)));
\t\telse socketChannel.register(selector, SelectionKey.OP_CONNECT);
\t}
\tpublic void sendMsg(String msg) throws Exception{
\t\tsocketChannel.register(selector, SelectionKey.OP_READ);
\t\tdoWrite(socketChannel, msg);
\t}
}

2.7、演示結果

首先運行服務器,順便也運行一個客戶端:

package com.anxpp.io.calculator.nio;
import java.util.Scanner;
/**
* 測試方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Test {
\t//測試主方法
\t@SuppressWarnings("resource")
\tpublic static void main(String[] args) throws Exception{
\t\t//運行服務器
\t\tServer.start();
\t\t//避免客戶端先於服務器啟動前執行代碼
\t\tThread.sleep(100);
\t\t//運行客戶端
\t\tClient.start();
\t\twhile(Client.sendMsg(new Scanner(System.in).nextLine()));
\t}
}

我們也可以單獨運行客戶端,效果都是一樣的。

一次測試的結果:

服務器已啟動,端口號:12345
1+2+3+4+5+6
服務器收到消息:1+2+3+4+5+6
客戶端收到消息:21
1*2/3-4+5*6/7-8
服務器收到消息:1*2/3-4+5*6/7-8
客戶端收到消息:-7.0476190476190474

運行多個客戶端,都是沒有問題的。


3、AIO編程

NIO 2.0引入了新的異步通道的概念,並提供了異步文件通道和異步套接字通道的實現。

異步的套接字通道時真正的異步非阻塞I/O,對應於UNIX網絡編程中的事件驅動I/O(AIO)。他不需要過多的Selector對註冊的通道進行輪詢即可實現異步讀寫,從而簡化了NIO的編程模型。

直接上代碼吧。

3.1、Server端代碼

Server:

package com.anxpp.io.calculator.aio.server;
/**
* AIO服務端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Server {
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static AsyncServerHandler serverHandle;
\tpublic volatile static long clientCount = 0;
\tpublic static void start(){
\t\tstart(DEFAULT_PORT);
\t}
\tpublic static synchronized void start(int port){
\t\tif(serverHandle!=null)
\t\t\treturn;
\t\tserverHandle = new AsyncServerHandler(port);
\t\tnew Thread(serverHandle,"Server").start();
\t}
\tpublic static void main(String[] args){
\t\tServer.start();
\t}
}
AsyncServerHandler:
package com.anxpp.io.calculator.aio.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;
public class AsyncServerHandler implements Runnable {
\tpublic CountDownLatch latch;
\tpublic AsynchronousServerSocketChannel channel;
\tpublic AsyncServerHandler(int port) {
\t\ttry {
\t\t\t//創建服務端通道
\t\t\tchannel = AsynchronousServerSocketChannel.open();
\t\t\t//綁定端口
\t\t\tchannel.bind(new InetSocketAddress(port));
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t@Override
\tpublic void run() {
\t\t//CountDownLatch初始化
\t\t//它的作用:在完成一組正在執行的操作之前,允許當前的現場一直阻塞
\t\t//此處,讓現場在此阻塞,防止服務端執行完成後退出
\t\t//也可以使用while(true)+sleep
\t\t//生成環境就不需要擔心這個問題,以為服務端是不會退出的
\t\tlatch = new CountDownLatch(1);
\t\t//用於接收客戶端的連接
\t\tchannel.accept(this,new AcceptHandler());
\t\ttry {
\t\t\tlatch.await();
\t\t} catch (InterruptedException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
}

AcceptHandler:

package com.anxpp.io.calculator.aio.server;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
//作為handler接收客戶端連接
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {
\t@Override
\tpublic void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {
\t\t//繼續接受其他客戶端的請求
\t\tServer.clientCount++;
\t\tSystem.out.println("連接的客戶端數:" + Server.clientCount);
\t\tserverHandler.channel.accept(serverHandler, this);
\t\t//創建新的Buffer
\t\tByteBuffer buffer = ByteBuffer.allocate(1024);
\t\t//異步讀 第三個參數為接收消息回調的業務Handler
\t\tchannel.read(buffer, buffer, new ReadHandler(channel));
\t}
\t@Override
\tpublic void failed(Throwable exc, AsyncServerHandler serverHandler) {
\t\texc.printStackTrace();
\t\tserverHandler.latch.countDown();
\t}
}

ReadHandler:

package com.anxpp.io.calculator.aio.server;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import com.anxpp.io.utils.Calculator;
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
\t//用於讀取半包消息和發送應答
\tprivate AsynchronousSocketChannel channel;
\tpublic ReadHandler(AsynchronousSocketChannel channel) {
\t\t\tthis.channel = channel;
\t}
\t//讀取到消息後的處理
\t@Override
\tpublic void completed(Integer result, ByteBuffer attachment) {
\t\t//flip操作
\t\tattachment.flip();
\t\t//根據
\t\tbyte[] message = new byte[attachment.remaining()];
\t\tattachment.get(message);
\t\ttry {
\t\t\tString expression = new String(message, "UTF-8");
\t\t\tSystem.out.println("服務器收到消息: " + expression);
\t\t\tString calrResult = null;
\t\t\ttry{
\t\t\t\tcalrResult = Calculator.cal(expression).toString();
\t\t\t}catch(Exception e){
\t\t\t\tcalrResult = "計算錯誤:" + e.getMessage();
\t\t\t}
\t\t\t//向客戶端發送消息
\t\t\tdoWrite(calrResult);
\t\t} catch (UnsupportedEncodingException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t//發送消息
\tprivate void doWrite(String result) {
\t\tbyte[] bytes = result.getBytes();
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
\t\twriteBuffer.put(bytes);
\t\twriteBuffer.flip();
\t\t//異步寫數據 參數與前面的read一樣
\t\tchannel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {
\t\t\t@Override
\t\t\tpublic void completed(Integer result, ByteBuffer buffer) {
\t\t\t\t//如果沒有發送完,就繼續發送直到完成
\t\t\t\tif (buffer.hasRemaining())
\t\t\t\t\tchannel.write(buffer, buffer, this);
\t\t\t\telse{
\t\t\t\t\t//創建新的Buffer
\t\t\t\t\tByteBuffer readBuffer = ByteBuffer.allocate(1024);
\t\t\t\t\t//異步讀 第三個參數為接收消息回調的業務Handler
\t\t\t\t\tchannel.read(readBuffer, readBuffer, new ReadHandler(channel));
\t\t\t\t}
\t\t\t}
\t\t\t@Override
\t\t\tpublic void failed(Throwable exc, ByteBuffer attachment) {
\t\t\t\ttry {
\t\t\t\t\tchannel.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t}
\t\t\t}
\t\t});
\t}
\t@Override
\tpublic void failed(Throwable exc, ByteBuffer attachment) {
\t\ttry {
\t\t\tthis.channel.close();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
}

OK,這樣就已經完成了,其實說起來也簡單,雖然代碼感覺很多,但是API比NIO的使用起來真的簡單多了,主要就是監聽、讀、寫等各種CompletionHandler。此處本應有一個WriteHandler的,確實,我們在ReadHandler中,以一個匿名內部類實現了它。

下面看客戶端代碼。

3.2、Client端代碼

Client:

package com.anxpp.io.calculator.aio.client;
import java.util.Scanner;
public class Client {
\tprivate static String DEFAULT_HOST = "127.0.0.1";
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static AsyncClientHandler clientHandle;
\tpublic static void start(){
\t\tstart(DEFAULT_HOST,DEFAULT_PORT);
\t}
\tpublic static synchronized void start(String ip,int port){
\t\tif(clientHandle!=null)
\t\t\treturn;
\t\tclientHandle = new AsyncClientHandler(ip,port);
\t\tnew Thread(clientHandle,"Client").start();
\t}
\t//向服務器發送消息
\tpublic static boolean sendMsg(String msg) throws Exception{
\t\tif(msg.equals("q")) return false;
\t\tclientHandle.sendMsg(msg);
\t\treturn true;
\t}
\t@SuppressWarnings("resource")
\tpublic static void main(String[] args) throws Exception{
\t\tClient.start();
\t\tSystem.out.println("請輸入請求消息:");
\t\tScanner scanner = new Scanner(System.in);
\t\twhile(Client.sendMsg(scanner.nextLine()));
\t}
}

AsyncClientHandler:

package com.anxpp.io.calculator.aio.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable {
\tprivate AsynchronousSocketChannel clientChannel;
\tprivate String host;
\tprivate int port;
\tprivate CountDownLatch latch;
\tpublic AsyncClientHandler(String host, int port) {
\t\tthis.host = host;
\t\tthis.port = port;
\t\ttry {
\t\t\t//創建異步的客戶端通道
\t\t\tclientChannel = AsynchronousSocketChannel.open();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t@Override
\tpublic void run() {
\t\t//創建CountDownLatch等待
\t\tlatch = new CountDownLatch(1);
\t\t//發起異步連接操作,回調參數就是這個類本身,如果連接成功會回調completed方法
\t\tclientChannel.connect(new InetSocketAddress(host, port), this, this);
\t\ttry {
\t\t\tlatch.await();
\t\t} catch (InterruptedException e1) {
\t\t\te1.printStackTrace();
\t\t}
\t\ttry {
\t\t\tclientChannel.close();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t//連接服務器成功
\t//意味著TCP三次握手完成
\t@Override
\tpublic void completed(Void result, AsyncClientHandler attachment) {
\t\tSystem.out.println("客戶端成功連接到服務器...");
\t}
\t//連接服務器失敗
\t@Override
\tpublic void failed(Throwable exc, AsyncClientHandler attachment) {
\t\tSystem.err.println("連接服務器失敗...");
\t\texc.printStackTrace();
\t\ttry {
\t\t\tclientChannel.close();
\t\t\tlatch.countDown();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t//向服務器發送消息
\tpublic void sendMsg(String msg){
\t\tbyte[] req = msg.getBytes();
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
\t\twriteBuffer.put(req);
\t\twriteBuffer.flip();
\t\t//異步寫
\t\tclientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch));
\t}
}

WriteHandler:

package com.anxpp.io.calculator.aio.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
\tprivate AsynchronousSocketChannel clientChannel;
\tprivate CountDownLatch latch;
\tpublic WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
\t\tthis.clientChannel = clientChannel;
\t\tthis.latch = latch;
\t}
\t@Override
\tpublic void completed(Integer result, ByteBuffer buffer) {
\t\t//完成全部數據的寫入
\t\tif (buffer.hasRemaining()) {
\t\t\tclientChannel.write(buffer, buffer, this);
\t\t}
\t\telse {
\t\t\t//讀取數據
\t\t\tByteBuffer readBuffer = ByteBuffer.allocate(1024);
\t\t\tclientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch));
\t\t}
\t}
\t@Override
\tpublic void failed(Throwable exc, ByteBuffer attachment) {
\t\tSystem.err.println("數據發送失敗...");
\t\ttry {
\t\t\tclientChannel.close();
\t\t\tlatch.countDown();
\t\t} catch (IOException e) {
\t\t}
\t}
}

ReadHandler:

package com.anxpp.io.calculator.aio.client;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
\tprivate AsynchronousSocketChannel clientChannel;
\tprivate CountDownLatch latch;
\tpublic ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
\t\tthis.clientChannel = clientChannel;
\t\tthis.latch = latch;
\t}
\t@Override
\tpublic void completed(Integer result,ByteBuffer buffer) {
\t\tbuffer.flip();
\t\tbyte[] bytes = new byte[buffer.remaining()];
\t\tbuffer.get(bytes);
\t\tString body;
\t\ttry {
\t\t\tbody = new String(bytes,"UTF-8");
\t\t\tSystem.out.println("客戶端收到結果:"+ body);
\t\t} catch (UnsupportedEncodingException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t@Override
\tpublic void failed(Throwable exc,ByteBuffer attachment) {
\t\tSystem.err.println("數據讀取失敗...");
\t\ttry {
\t\t\tclientChannel.close();
\t\t\tlatch.countDown();
\t\t} catch (IOException e) {
\t\t}
\t}
}

這個API使用起來真的是很順手。

3.3、測試

Test:

package com.anxpp.io.calculator.aio;
import java.util.Scanner;
import com.anxpp.io.calculator.aio.client.Client;
import com.anxpp.io.calculator.aio.server.Server;
/**
* 測試方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Test {
\t//測試主方法
\t@SuppressWarnings("resource")
\tpublic static void main(String[] args) throws Exception{
\t\t//運行服務器
\t\tServer.start();
\t\t//避免客戶端先於服務器啟動前執行代碼
\t\tThread.sleep(100);
\t\t//運行客戶端
\t\tClient.start();
\t\tSystem.out.println("請輸入請求消息:");
\t\tScanner scanner = new Scanner(System.in);
\t\twhile(Client.sendMsg(scanner.nextLine()));
\t}
}

我們可以在控制檯輸入我們需要計算的算數字符串,服務器就會返回結果,當然,我們也可以運行大量的客戶端,都是沒有問題的,以為此處設計為單例客戶端,所以也就沒有演示大量客戶端併發。

讀者可以自己修改Client類,然後開闢大量線程,並使用構造方法創建很多的客戶端測試。

下面是其中一次參數的輸出:

服務器已啟動,端口號:12345
請輸入請求消息:
客戶端成功連接到服務器...
連接的客戶端數:1
123456+789+456
服務器收到消息: 123456+789+456
客戶端收到結果:124701
9526*56
服務器收到消息: 9526*56
客戶端收到結果:533456
...

AIO是真正的異步非阻塞的,所以,在面對超級大量的客戶端,更能得心應手。

下面就比較一下,幾種I/O編程的優缺點。

4、各種I/O的對比

先以一張表來直觀的對比一下:

Java 網絡IO編程總結

具體選擇什麼樣的模型或者NIO框架,完全基於業務的實際應用場景和性能需求,如果客戶端很少,服務器負荷不重,就沒有必要選擇開發起來相對不那麼簡單的NIO做服務端;相反,就應考慮使用NIO或者相關的框架了。

5、附錄

上文中服務端使用到的用於計算的工具類:

package com.anxpp.utils;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
public final class Calculator {
private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
public static Object cal(String expression) throws ScriptException{
return jse.eval(expression);
}
}

私信我:“資料”,可免費領取更多學習資料哦

Java 網絡IO編程總結

"

本文會從傳統的BIO到NIO再到AIO自淺至深介紹,並附上完整的代碼講解。

下面代碼中會使用這樣一個例子:客戶端發送一段算式的字符串到服務器,服務器計算後返回結果到客戶端。

代碼的所有說明,都直接作為註釋,嵌入到代碼中,看代碼時就能更容易理解,代碼中會用到一個計算結果的工具類,見文章代碼部分。

1、BIO編程

1.1、傳統的BIO編程

網絡編程的基本模型是C/S模型,即兩個進程間的通信。

服務端提供IP和監聽端口,客戶端通過連接操作想服務端監聽的地址發起連接請求,通過三次握手連接,如果連接成功建立,雙方就可以通過套接字進行通信。

傳統的同步阻塞模型開發中,ServerSocket負責綁定IP地址,啟動監聽端口;Socket負責發起連接操作。連接成功後,雙方通過輸入和輸出流進行同步阻塞式通信。

簡單的描述一下BIO的服務端通信模型:採用BIO通信模型的服務端,通常由一個獨立的Acceptor線程負責監聽客戶端的連接,它接收到客戶端連接請求之後為每個客戶端創建一個新的線程進行鏈路處理沒處理完成後,通過輸出流返回應答給客戶端,線程銷燬。即典型的一請求一應答通宵模型。

傳統BIO通信模型圖:

Java 網絡IO編程總結

該模型最大的問題就是缺乏彈性伸縮能力,當客戶端併發訪問量增加後,服務端的線程個數和客戶端併發訪問數呈1:1的正比關係,Java中的線程也是比較寶貴的系統資源,線程數量快速膨脹後,系統的性能將急劇下降,隨著訪問量的繼續增大,系統最終就死-掉-了。

同步阻塞式I/O創建的Server源碼:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* BIO服務端源碼
* @author yangtao__anxpp.com
* @version 1.0
*/
public final class ServerNormal {
\t//默認的端口號
\tprivate static int DEFAULT_PORT = 12345;
\t//單例的ServerSocket
\tprivate static ServerSocket server;
\t//根據傳入參數設置監聽端口,如果沒有參數調用以下方法並使用默認值
\tpublic static void start() throws IOException{
\t\t//使用默認值
\t\tstart(DEFAULT_PORT);
\t}
\t//這個方法不會被大量併發訪問,不太需要考慮效率,直接進行方法同步就行了
\tpublic synchronized static void start(int port) throws IOException{
\t\tif(server != null) return;
\t\ttry{
\t\t\t//通過構造函數創建ServerSocket
\t\t\t//如果端口合法且空閒,服務端就監聽成功
\t\t\tserver = new ServerSocket(port);
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t\t//通過無線循環監聽客戶端連接
\t\t\t//如果沒有客戶端接入,將阻塞在accept操作上。
\t\t\twhile(true){
\t\t\t\tSocket socket = server.accept();
\t\t\t\t//當有新的客戶端接入時,會執行下面的代碼
\t\t\t\t//然後創建一個新的線程處理這條Socket鏈路
\t\t\t\tnew Thread(new ServerHandler(socket)).start();
\t\t\t}
\t\t}finally{
\t\t\t//一些必要的清理工作
\t\t\tif(server != null){
\t\t\t\tSystem.out.println("服務器已關閉。");
\t\t\t\tserver.close();
\t\t\t\tserver = null;
\t\t\t}
\t\t}
\t}
}

客戶端消息處理線程ServerHandler源碼:

package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

import com.anxpp.io.utils.Calculator;
/**
* 客戶端線程
* @author yangtao__anxpp.com
* 用於處理一個客戶端的Socket鏈路
*/
public class ServerHandler implements Runnable{
\tprivate Socket socket;
\tpublic ServerHandler(Socket socket) {
\t\tthis.socket = socket;
\t}
\t@Override
\tpublic void run() {
\t\tBufferedReader in = null;
\t\tPrintWriter out = null;
\t\ttry{
\t\t\tin = new BufferedReader(new InputStreamReader(socket.getInputStream()));
\t\t\tout = new PrintWriter(socket.getOutputStream(),true);
\t\t\tString expression;
\t\t\tString result;
\t\t\twhile(true){
\t\t\t\t//通過BufferedReader讀取一行
\t\t\t\t//如果已經讀到輸入流尾部,返回null,退出循環
\t\t\t\t//如果得到非空值,就嘗試計算結果並返回
\t\t\t\tif((expression = in.readLine())==null) break;
\t\t\t\tSystem.out.println("服務器收到消息:" + expression);
\t\t\t\ttry{
\t\t\t\t\tresult = Calculator.cal(expression).toString();
\t\t\t\t}catch(Exception e){
\t\t\t\t\tresult = "計算錯誤:" + e.getMessage();
\t\t\t\t}
\t\t\t\tout.println(result);
\t\t\t}
\t\t}catch(Exception e){
\t\t\te.printStackTrace();
\t\t}finally{
\t\t\t//一些必要的清理工作
\t\t\tif(in != null){
\t\t\t\ttry {
\t\t\t\t\tin.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tin = null;
\t\t\t}
\t\t\tif(out != null){
\t\t\t\tout.close();
\t\t\t\tout = null;
\t\t\t}
\t\t\tif(socket != null){
\t\t\t\ttry {
\t\t\t\t\tsocket.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tsocket = null;
\t\t\t}
\t\t}
\t}
}

同步阻塞式I/O創建的Client源碼:

package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
* 阻塞式I/O創建的客戶端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Client {
\t//默認的端口號
\tprivate static int DEFAULT_SERVER_PORT = 12345;
\tprivate static String DEFAULT_SERVER_IP = "127.0.0.1";
\tpublic static void send(String expression){
\t\tsend(DEFAULT_SERVER_PORT,expression);
\t}
\tpublic static void send(int port,String expression){
\t\tSystem.out.println("算術表達式為:" + expression);
\t\tSocket socket = null;
\t\tBufferedReader in = null;
\t\tPrintWriter out = null;
\t\ttry{
\t\t\tsocket = new Socket(DEFAULT_SERVER_IP,port);
\t\t\tin = new BufferedReader(new InputStreamReader(socket.getInputStream()));
\t\t\tout = new PrintWriter(socket.getOutputStream(),true);
\t\t\tout.println(expression);
\t\t\tSystem.out.println("___結果為:" + in.readLine());
\t\t}catch(Exception e){
\t\t\te.printStackTrace();
\t\t}finally{
\t\t\t//一下必要的清理工作
\t\t\tif(in != null){
\t\t\t\ttry {
\t\t\t\t\tin.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tin = null;
\t\t\t}
\t\t\tif(out != null){
\t\t\t\tout.close();
\t\t\t\tout = null;
\t\t\t}
\t\t\tif(socket != null){
\t\t\t\ttry {
\t\t\t\t\tsocket.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t\tsocket = null;
\t\t\t}
\t\t}
\t}
}

測試代碼,為了方便在控制檯看輸出結果,放到同一個程序(jvm)中運行:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.util.Random;
/**
* 測試方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Test {
\t//測試主方法
\tpublic static void main(String[] args) throws InterruptedException {
\t\t//運行服務器
\t\tnew Thread(new Runnable() {
\t\t\t@Override
\t\t\tpublic void run() {
\t\t\t\ttry {
\t\t\t\t\tServerBetter.start();
\t\t\t\t} catch (IOException e) {
\t\t\t\t\te.printStackTrace();
\t\t\t\t}
\t\t\t}
\t\t}).start();
\t\t//避免客戶端先於服務器啟動前執行代碼
\t\tThread.sleep(100);
\t\t//運行客戶端
\t\tchar operators[] = {'+','-','*','/'};
\t\tRandom random = new Random(System.currentTimeMillis());
\t\tnew Thread(new Runnable() {
\t\t\t@SuppressWarnings("static-access")
\t\t\t@Override
\t\t\tpublic void run() {
\t\t\t\twhile(true){
\t\t\t\t\t//隨機產生算術表達式
\t\t\t\t\tString expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);
\t\t\t\t\tClient.send(expression);
\t\t\t\t\ttry {
\t\t\t\t\t\tThread.currentThread().sleep(random.nextInt(1000));
\t\t\t\t\t} catch (InterruptedException e) {
\t\t\t\t\t\te.printStackTrace();
\t\t\t\t\t}
\t\t\t\t}
\t\t\t}
\t\t}).start();
\t}
}

其中一次的運行結果:

服務器已啟動,端口號:12345
算術表達式為:4-2
服務器收到消息:4-2
___結果為:2
算術表達式為:5-10
服務器收到消息:5-10
___結果為:-5
算術表達式為:0-9
服務器收到消息:0-9
___結果為:-9
算術表達式為:0+6
服務器收到消息:0+6
___結果為:6
算術表達式為:1/6
服務器收到消息:1/6
___結果為:0.16666666666666666
...

從以上代碼,很容易看出,BIO主要的問題在於每當有一個新的客戶端請求接入時,服務端必須創建一個新的線程來處理這條鏈路,在需要滿足高性能、高併發的場景是沒法應用的(大量創建新的線程會嚴重影響服務器性能,甚至罷工)。

1.2、偽異步I/O編程

為了改進這種一連接一線程的模型,我們可以使用線程池來管理這些線程(需要了解更多請參考前面提供的文章),實現1個或多個線程處理N個客戶端的模型(但是底層還是使用的同步阻塞I/O),通常被稱為“偽異步I/O模型“。

偽異步I/O模型圖:

Java 網絡IO編程總結

實現很簡單,我們只需要將新建線程的地方,交給線程池管理即可,只需要改動剛剛的Server代碼即可:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* BIO服務端源碼__偽異步I/O
* @author yangtao__anxpp.com
* @version 1.0
*/
public final class ServerBetter {
\t//默認的端口號
\tprivate static int DEFAULT_PORT = 12345;
\t//單例的ServerSocket
\tprivate static ServerSocket server;
\t//線程池 懶漢式的單例
\tprivate static ExecutorService executorService = Executors.newFixedThreadPool(60);
\t//根據傳入參數設置監聽端口,如果沒有參數調用以下方法並使用默認值
\tpublic static void start() throws IOException{
\t\t//使用默認值
\t\tstart(DEFAULT_PORT);
\t}
\t//這個方法不會被大量併發訪問,不太需要考慮效率,直接進行方法同步就行了
\tpublic synchronized static void start(int port) throws IOException{
\t\tif(server != null) return;
\t\ttry{
\t\t\t//通過構造函數創建ServerSocket
\t\t\t//如果端口合法且空閒,服務端就監聽成功
\t\t\tserver = new ServerSocket(port);
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t\t//通過無線循環監聽客戶端連接
\t\t\t//如果沒有客戶端接入,將阻塞在accept操作上。
\t\t\twhile(true){
\t\t\t\tSocket socket = server.accept();
\t\t\t\t//當有新的客戶端接入時,會執行下面的代碼
\t\t\t\t//然後創建一個新的線程處理這條Socket鏈路
\t\t\t\texecutorService.execute(new ServerHandler(socket));
\t\t\t}
\t\t}finally{
\t\t\t//一些必要的清理工作
\t\t\tif(server != null){
\t\t\t\tSystem.out.println("服務器已關閉。");
\t\t\t\tserver.close();
\t\t\t\tserver = null;
\t\t\t}
\t\t}
\t}
}

測試運行結果是一樣的。

我們知道,如果使用CachedThreadPool線程池(不限制線程數量,如果不清楚請參考文首提供的文章),其實除了能自動幫我們管理線程(複用),看起來也就像是1:1的客戶端:線程數模型,而使用FixedThreadPool我們就有效的控制了線程的最大數量,保證了系統有限的資源的控制,實現了N:M的偽異步I/O模型。

但是,正因為限制了線程數量,如果發生大量併發請求,超過最大數量的線程就只能等待,直到線程池中的有空閒的線程可以被複用。而對Socket的輸入流就行讀取時,會一直阻塞,直到發生:

有數據可讀

可用數據以及讀取完畢

發生空指針或I/O異常

所以在讀取數據較慢時(比如數據量大、網絡傳輸慢等),大量併發的情況下,其他接入的消息,只能一直等待,這就是最大的弊端。

而後面即將介紹的NIO,就能解決這個難題。

2、NIO 編程

JDK 1.4中的java.nio.*包中引入新的Java I/O庫,其目的是提高速度。實際上,“舊”的I/O包已經使用NIO重新實現過,即使我們不顯式的使用NIO編程,也能從中受益。速度的提高在文件I/O和網絡I/O中都可能會發生,但本文只討論後者。

2.1、簡介

NIO我們一般認為是New I/O(也是官方的叫法),因為它是相對於老的I/O類庫新增的(其實在JDK 1.4中就已經被引入了,但這個名詞還會繼續用很久,即使它們在現在看來已經是“舊”的了,所以也提示我們在命名時,需要好好考慮),做了很大的改變。但民間跟多人稱之為Non-block I/O,即非阻塞I/O,因為這樣叫,更能體現它的特點。而下文中的NIO,不是指整個新的I/O庫,而是非阻塞I/O。

NIO提供了與傳統BIO模型中的Socket和ServerSocket相對應的SocketChannel和ServerSocketChannel兩種不同的套接字通道實現。

新增的著兩種通道都支持阻塞和非阻塞兩種模式。

阻塞模式使用就像傳統中的支持一樣,比較簡單,但是性能和可靠性都不好;非阻塞模式正好與之相反。

對於低負載、低併發的應用程序,可以使用同步阻塞I/O來提升開發速率和更好的維護性;對於高負載、高併發的(網絡)應用,應使用NIO的非阻塞模式來開發。

下面會先對基礎知識進行介紹。

2.2、緩衝區 Buffer

Buffer是一個對象,包含一些要寫入或者讀出的數據。

在NIO庫中,所有數據都是用緩衝區處理的。在讀取數據時,它是直接讀到緩衝區中的;在寫入數據時,也是寫入到緩衝區中。任何時候訪問NIO中的數據,都是通過緩衝區進行操作。

緩衝區實際上是一個數組,並提供了對數據結構化訪問以及維護讀寫位置等信息。

具體的緩存區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實現了相同的接口:Buffer。

2.3、通道 Channel

我們對數據的讀取和寫入要通過Channel,它就像水管一樣,是一個通道。通道不同於流的地方就是通道是雙向的,可以用於讀、寫和同時讀寫操作。

底層的操作系統的通道一般都是全雙工的,所以全雙工的Channel比流能更好的映射底層操作系統的API。

Channel主要分兩大類:

SelectableChannel:用戶網絡讀寫

FileChannel:用於文件操作

後面代碼會涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子類。

2.4、多路複用器 Selector

Selector是Java NIO 編程的基礎。

Selector提供選擇已經就緒的任務的能力:Selector會不斷輪詢註冊在其上的Channel,如果某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以獲取就緒Channel的集合,進行後續的I/O操作。

一個Selector可以同時輪詢多個Channel,因為JDK使用了epoll()代替傳統的select實現,所以沒有最大連接句柄1024/2048的限制。所以,只需要一個線程負責Selector的輪詢,就可以接入成千上萬的客戶端。

2.5、NIO服務端

代碼比傳統的Socket編程看起來要複雜不少。

直接貼代碼吧,以註釋的形式給出代碼說明。

NIO創建的Server源碼:

package com.anxpp.io.calculator.nio;
public class Server {
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static ServerHandle serverHandle;
\tpublic static void start(){
\t\tstart(DEFAULT_PORT);
\t}
\tpublic static synchronized void start(int port){
\t\tif(serverHandle!=null)
\t\t\tserverHandle.stop();
\t\tserverHandle = new ServerHandle(port);
\t\tnew Thread(serverHandle,"Server").start();
\t}
\tpublic static void main(String[] args){
\t\tstart();
\t}
}

ServerHandle:

package com.anxpp.io.calculator.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

import com.anxpp.io.utils.Calculator;
/**
* NIO服務端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class ServerHandle implements Runnable{
\tprivate Selector selector;
\tprivate ServerSocketChannel serverChannel;
\tprivate volatile boolean started;
\t/**
\t * 構造方法
\t * @param port 指定要監聽的端口號
\t */
\tpublic ServerHandle(int port) {
\t\ttry{
\t\t\t//創建選擇器
\t\t\tselector = Selector.open();
\t\t\t//打開監聽通道
\t\t\tserverChannel = ServerSocketChannel.open();
\t\t\t//如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式
\t\t\tserverChannel.configureBlocking(false);//開啟非阻塞模式
\t\t\t//綁定端口 backlog設為1024
\t\t\tserverChannel.socket().bind(new InetSocketAddress(port),1024);
\t\t\t//監聽客戶端連接請求
\t\t\tserverChannel.register(selector, SelectionKey.OP_ACCEPT);
\t\t\t//標記服務器已開啟
\t\t\tstarted = true;
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t}catch(IOException e){
\t\t\te.printStackTrace();
\t\t\tSystem.exit(1);
\t\t}
\t}
\tpublic void stop(){
\t\tstarted = false;
\t}
\t@Override
\tpublic void run() {
\t\t//循環遍歷selector
\t\twhile(started){
\t\t\ttry{
\t\t\t\t//無論是否有讀寫事件發生,selector每隔1s被喚醒一次
\t\t\t\tselector.select(1000);
\t\t\t\t//阻塞,只有當至少一個註冊的事件發生的時候才會繼續.
//\t\t\t\tselector.select();
\t\t\t\tSet<SelectionKey> keys = selector.selectedKeys();
\t\t\t\tIterator<SelectionKey> it = keys.iterator();
\t\t\t\tSelectionKey key = null;
\t\t\t\twhile(it.hasNext()){
\t\t\t\t\tkey = it.next();
\t\t\t\t\tit.remove();
\t\t\t\t\ttry{
\t\t\t\t\t\thandleInput(key);
\t\t\t\t\t}catch(Exception e){
\t\t\t\t\t\tif(key != null){
\t\t\t\t\t\t\tkey.cancel();
\t\t\t\t\t\t\tif(key.channel() != null){
\t\t\t\t\t\t\t\tkey.channel().close();
\t\t\t\t\t\t\t}
\t\t\t\t\t\t}
\t\t\t\t\t}
\t\t\t\t}
\t\t\t}catch(Throwable t){
\t\t\t\tt.printStackTrace();
\t\t\t}
\t\t}
\t\t//selector關閉後會自動釋放裡面管理的資源
\t\tif(selector != null)
\t\t\ttry{
\t\t\t\tselector.close();
\t\t\t}catch (Exception e) {
\t\t\t\te.printStackTrace();
\t\t\t}
\t}
\tprivate void handleInput(SelectionKey key) throws IOException{
\t\tif(key.isValid()){
\t\t\t//處理新接入的請求消息
\t\t\tif(key.isAcceptable()){
\t\t\t\tServerSocketChannel ssc = (ServerSocketChannel) key.channel();
\t\t\t\t//通過ServerSocketChannel的accept創建SocketChannel實例
\t\t\t\t//完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立
\t\t\t\tSocketChannel sc = ssc.accept();
\t\t\t\t//設置為非阻塞的
\t\t\t\tsc.configureBlocking(false);
\t\t\t\t//註冊為讀
\t\t\t\tsc.register(selector, SelectionKey.OP_READ);
\t\t\t}
\t\t\t//讀消息
\t\t\tif(key.isReadable()){
\t\t\t\tSocketChannel sc = (SocketChannel) key.channel();
\t\t\t\t//創建ByteBuffer,並開闢一個1M的緩衝區
\t\t\t\tByteBuffer buffer = ByteBuffer.allocate(1024);
\t\t\t\t//讀取請求碼流,返回讀取到的字節數
\t\t\t\tint readBytes = sc.read(buffer);
\t\t\t\t//讀取到字節,對字節進行編解碼
\t\t\t\tif(readBytes>0){
\t\t\t\t\t//將緩衝區當前的limit設置為position=0,用於後續對緩衝區的讀取操作
\t\t\t\t\tbuffer.flip();
\t\t\t\t\t//根據緩衝區可讀字節數創建字節數組
\t\t\t\t\tbyte[] bytes = new byte[buffer.remaining()];
\t\t\t\t\t//將緩衝區可讀字節數組複製到新建的數組中
\t\t\t\t\tbuffer.get(bytes);
\t\t\t\t\tString expression = new String(bytes,"UTF-8");
\t\t\t\t\tSystem.out.println("服務器收到消息:" + expression);
\t\t\t\t\t//處理數據
\t\t\t\t\tString result = null;
\t\t\t\t\ttry{
\t\t\t\t\t\tresult = Calculator.cal(expression).toString();
\t\t\t\t\t}catch(Exception e){
\t\t\t\t\t\tresult = "計算錯誤:" + e.getMessage();
\t\t\t\t\t}
\t\t\t\t\t//發送應答消息
\t\t\t\t\tdoWrite(sc,result);
\t\t\t\t}
\t\t\t\t//沒有讀取到字節 忽略
//\t\t\t\telse if(readBytes==0);
\t\t\t\t//鏈路已經關閉,釋放資源
\t\t\t\telse if(readBytes<0){
\t\t\t\t\tkey.cancel();
\t\t\t\t\tsc.close();
\t\t\t\t}
\t\t\t}
\t\t}
\t}
\t//異步發送應答消息
\tprivate void doWrite(SocketChannel channel,String response) throws IOException{
\t\t//將消息編碼為字節數組
\t\tbyte[] bytes = response.getBytes();
\t\t//根據數組容量創建ByteBuffer
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
\t\t//將字節數組複製到緩衝區
\t\twriteBuffer.put(bytes);
\t\t//flip操作
\t\twriteBuffer.flip();
\t\t//發送緩衝區的字節數組
\t\tchannel.write(writeBuffer);
\t\t//****此處不含處理“寫半包”的代碼
\t}
}

可以看到,創建NIO服務端的主要步驟如下:

打開ServerSocketChannel,監聽客戶端連接

綁定監聽端口,設置連接為非阻塞模式

創建Reactor線程,創建多路複用器並啟動線程

將ServerSocketChannel註冊到Reactor線程中的Selector上,監聽ACCEPT事件

Selector輪詢準備就緒的key

Selector監聽到新的客戶端接入,處理新的接入請求,完成TCP三次握手,簡歷物理鏈路

設置客戶端鏈路為非阻塞模式

將新接入的客戶端連接註冊到Reactor線程的Selector上,監聽讀操作,讀取客戶端發送的網絡消息

異步讀取客戶端消息到緩衝區

對Buffer編解碼,處理半包消息,將解碼成功的消息封裝成Task

將應答消息編碼為Buffer,調用SocketChannel的write將消息異步發送給客戶端

因為應答消息的發送,SocketChannel也是異步非阻塞的,所以不能保證一次能吧需要發送的數據發送完,此時就會出現寫半包的問題。我們需要註冊寫操作,不斷輪詢Selector將沒有發送完的消息發送完畢,然後通過Buffer的hasRemain()方法判斷消息是否發送完成。

2.6、NIO客戶端

還是直接上代碼吧,過程也不需要太多解釋了,跟服務端代碼有點類似。

Client:

package com.anxpp.io.calculator.nio;
public class Client {
\tprivate static String DEFAULT_HOST = "127.0.0.1";
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static ClientHandle clientHandle;
\tpublic static void start(){
\t\tstart(DEFAULT_HOST,DEFAULT_PORT);
\t}
\tpublic static synchronized void start(String ip,int port){
\t\tif(clientHandle!=null)
\t\t\tclientHandle.stop();
\t\tclientHandle = new ClientHandle(ip,port);
\t\tnew Thread(clientHandle,"Server").start();
\t}
\t//向服務器發送消息
\tpublic static boolean sendMsg(String msg) throws Exception{
\t\tif(msg.equals("q")) return false;
\t\tclientHandle.sendMsg(msg);
\t\treturn true;
\t}
\tpublic static void main(String[] args){
\t\tstart();
\t}
}

ClientHandle:

package com.anxpp.io.calculator.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* NIO客戶端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class ClientHandle implements Runnable{
\tprivate String host;
\tprivate int port;
\tprivate Selector selector;
\tprivate SocketChannel socketChannel;
\tprivate volatile boolean started;

\tpublic ClientHandle(String ip,int port) {
\t\tthis.host = ip;
\t\tthis.port = port;
\t\ttry{
\t\t\t//創建選擇器
\t\t\tselector = Selector.open();
\t\t\t//打開監聽通道
\t\t\tsocketChannel = SocketChannel.open();
\t\t\t//如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式
\t\t\tsocketChannel.configureBlocking(false);//開啟非阻塞模式
\t\t\tstarted = true;
\t\t}catch(IOException e){
\t\t\te.printStackTrace();
\t\t\tSystem.exit(1);
\t\t}
\t}
\tpublic void stop(){
\t\tstarted = false;
\t}
\t@Override
\tpublic void run() {
\t\ttry{
\t\t\tdoConnect();
\t\t}catch(IOException e){
\t\t\te.printStackTrace();
\t\t\tSystem.exit(1);
\t\t}
\t\t//循環遍歷selector
\t\twhile(started){
\t\t\ttry{
\t\t\t\t//無論是否有讀寫事件發生,selector每隔1s被喚醒一次
\t\t\t\tselector.select(1000);
\t\t\t\t//阻塞,只有當至少一個註冊的事件發生的時候才會繼續.
//\t\t\t\tselector.select();
\t\t\t\tSet<SelectionKey> keys = selector.selectedKeys();
\t\t\t\tIterator<SelectionKey> it = keys.iterator();
\t\t\t\tSelectionKey key = null;
\t\t\t\twhile(it.hasNext()){
\t\t\t\t\tkey = it.next();
\t\t\t\t\tit.remove();
\t\t\t\t\ttry{
\t\t\t\t\t\thandleInput(key);
\t\t\t\t\t}catch(Exception e){
\t\t\t\t\t\tif(key != null){
\t\t\t\t\t\t\tkey.cancel();
\t\t\t\t\t\t\tif(key.channel() != null){
\t\t\t\t\t\t\t\tkey.channel().close();
\t\t\t\t\t\t\t}
\t\t\t\t\t\t}
\t\t\t\t\t}
\t\t\t\t}
\t\t\t}catch(Exception e){
\t\t\t\te.printStackTrace();
\t\t\t\tSystem.exit(1);
\t\t\t}
\t\t}
\t\t//selector關閉後會自動釋放裡面管理的資源
\t\tif(selector != null)
\t\t\ttry{
\t\t\t\tselector.close();
\t\t\t}catch (Exception e) {
\t\t\t\te.printStackTrace();
\t\t\t}
\t}
\tprivate void handleInput(SelectionKey key) throws IOException{
\t\tif(key.isValid()){
\t\t\tSocketChannel sc = (SocketChannel) key.channel();
\t\t\tif(key.isConnectable()){
\t\t\t\tif(sc.finishConnect());
\t\t\t\telse System.exit(1);
\t\t\t}
\t\t\t//讀消息
\t\t\tif(key.isReadable()){
\t\t\t\t//創建ByteBuffer,並開闢一個1M的緩衝區
\t\t\t\tByteBuffer buffer = ByteBuffer.allocate(1024);
\t\t\t\t//讀取請求碼流,返回讀取到的字節數
\t\t\t\tint readBytes = sc.read(buffer);
\t\t\t\t//讀取到字節,對字節進行編解碼
\t\t\t\tif(readBytes>0){
\t\t\t\t\t//將緩衝區當前的limit設置為position=0,用於後續對緩衝區的讀取操作
\t\t\t\t\tbuffer.flip();
\t\t\t\t\t//根據緩衝區可讀字節數創建字節數組
\t\t\t\t\tbyte[] bytes = new byte[buffer.remaining()];
\t\t\t\t\t//將緩衝區可讀字節數組複製到新建的數組中
\t\t\t\t\tbuffer.get(bytes);
\t\t\t\t\tString result = new String(bytes,"UTF-8");
\t\t\t\t\tSystem.out.println("客戶端收到消息:" + result);
\t\t\t\t}
\t\t\t\t//沒有讀取到字節 忽略
//\t\t\t\telse if(readBytes==0);
\t\t\t\t//鏈路已經關閉,釋放資源
\t\t\t\telse if(readBytes<0){
\t\t\t\t\tkey.cancel();
\t\t\t\t\tsc.close();
\t\t\t\t}
\t\t\t}
\t\t}
\t}
\t//異步發送消息
\tprivate void doWrite(SocketChannel channel,String request) throws IOException{
\t\t//將消息編碼為字節數組
\t\tbyte[] bytes = request.getBytes();
\t\t//根據數組容量創建ByteBuffer
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
\t\t//將字節數組複製到緩衝區
\t\twriteBuffer.put(bytes);
\t\t//flip操作
\t\twriteBuffer.flip();
\t\t//發送緩衝區的字節數組
\t\tchannel.write(writeBuffer);
\t\t//****此處不含處理“寫半包”的代碼
\t}
\tprivate void doConnect() throws IOException{
\t\tif(socketChannel.connect(new InetSocketAddress(host,port)));
\t\telse socketChannel.register(selector, SelectionKey.OP_CONNECT);
\t}
\tpublic void sendMsg(String msg) throws Exception{
\t\tsocketChannel.register(selector, SelectionKey.OP_READ);
\t\tdoWrite(socketChannel, msg);
\t}
}

2.7、演示結果

首先運行服務器,順便也運行一個客戶端:

package com.anxpp.io.calculator.nio;
import java.util.Scanner;
/**
* 測試方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Test {
\t//測試主方法
\t@SuppressWarnings("resource")
\tpublic static void main(String[] args) throws Exception{
\t\t//運行服務器
\t\tServer.start();
\t\t//避免客戶端先於服務器啟動前執行代碼
\t\tThread.sleep(100);
\t\t//運行客戶端
\t\tClient.start();
\t\twhile(Client.sendMsg(new Scanner(System.in).nextLine()));
\t}
}

我們也可以單獨運行客戶端,效果都是一樣的。

一次測試的結果:

服務器已啟動,端口號:12345
1+2+3+4+5+6
服務器收到消息:1+2+3+4+5+6
客戶端收到消息:21
1*2/3-4+5*6/7-8
服務器收到消息:1*2/3-4+5*6/7-8
客戶端收到消息:-7.0476190476190474

運行多個客戶端,都是沒有問題的。


3、AIO編程

NIO 2.0引入了新的異步通道的概念,並提供了異步文件通道和異步套接字通道的實現。

異步的套接字通道時真正的異步非阻塞I/O,對應於UNIX網絡編程中的事件驅動I/O(AIO)。他不需要過多的Selector對註冊的通道進行輪詢即可實現異步讀寫,從而簡化了NIO的編程模型。

直接上代碼吧。

3.1、Server端代碼

Server:

package com.anxpp.io.calculator.aio.server;
/**
* AIO服務端
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Server {
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static AsyncServerHandler serverHandle;
\tpublic volatile static long clientCount = 0;
\tpublic static void start(){
\t\tstart(DEFAULT_PORT);
\t}
\tpublic static synchronized void start(int port){
\t\tif(serverHandle!=null)
\t\t\treturn;
\t\tserverHandle = new AsyncServerHandler(port);
\t\tnew Thread(serverHandle,"Server").start();
\t}
\tpublic static void main(String[] args){
\t\tServer.start();
\t}
}
AsyncServerHandler:
package com.anxpp.io.calculator.aio.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;
public class AsyncServerHandler implements Runnable {
\tpublic CountDownLatch latch;
\tpublic AsynchronousServerSocketChannel channel;
\tpublic AsyncServerHandler(int port) {
\t\ttry {
\t\t\t//創建服務端通道
\t\t\tchannel = AsynchronousServerSocketChannel.open();
\t\t\t//綁定端口
\t\t\tchannel.bind(new InetSocketAddress(port));
\t\t\tSystem.out.println("服務器已啟動,端口號:" + port);
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t@Override
\tpublic void run() {
\t\t//CountDownLatch初始化
\t\t//它的作用:在完成一組正在執行的操作之前,允許當前的現場一直阻塞
\t\t//此處,讓現場在此阻塞,防止服務端執行完成後退出
\t\t//也可以使用while(true)+sleep
\t\t//生成環境就不需要擔心這個問題,以為服務端是不會退出的
\t\tlatch = new CountDownLatch(1);
\t\t//用於接收客戶端的連接
\t\tchannel.accept(this,new AcceptHandler());
\t\ttry {
\t\t\tlatch.await();
\t\t} catch (InterruptedException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
}

AcceptHandler:

package com.anxpp.io.calculator.aio.server;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
//作為handler接收客戶端連接
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {
\t@Override
\tpublic void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {
\t\t//繼續接受其他客戶端的請求
\t\tServer.clientCount++;
\t\tSystem.out.println("連接的客戶端數:" + Server.clientCount);
\t\tserverHandler.channel.accept(serverHandler, this);
\t\t//創建新的Buffer
\t\tByteBuffer buffer = ByteBuffer.allocate(1024);
\t\t//異步讀 第三個參數為接收消息回調的業務Handler
\t\tchannel.read(buffer, buffer, new ReadHandler(channel));
\t}
\t@Override
\tpublic void failed(Throwable exc, AsyncServerHandler serverHandler) {
\t\texc.printStackTrace();
\t\tserverHandler.latch.countDown();
\t}
}

ReadHandler:

package com.anxpp.io.calculator.aio.server;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import com.anxpp.io.utils.Calculator;
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
\t//用於讀取半包消息和發送應答
\tprivate AsynchronousSocketChannel channel;
\tpublic ReadHandler(AsynchronousSocketChannel channel) {
\t\t\tthis.channel = channel;
\t}
\t//讀取到消息後的處理
\t@Override
\tpublic void completed(Integer result, ByteBuffer attachment) {
\t\t//flip操作
\t\tattachment.flip();
\t\t//根據
\t\tbyte[] message = new byte[attachment.remaining()];
\t\tattachment.get(message);
\t\ttry {
\t\t\tString expression = new String(message, "UTF-8");
\t\t\tSystem.out.println("服務器收到消息: " + expression);
\t\t\tString calrResult = null;
\t\t\ttry{
\t\t\t\tcalrResult = Calculator.cal(expression).toString();
\t\t\t}catch(Exception e){
\t\t\t\tcalrResult = "計算錯誤:" + e.getMessage();
\t\t\t}
\t\t\t//向客戶端發送消息
\t\t\tdoWrite(calrResult);
\t\t} catch (UnsupportedEncodingException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t//發送消息
\tprivate void doWrite(String result) {
\t\tbyte[] bytes = result.getBytes();
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
\t\twriteBuffer.put(bytes);
\t\twriteBuffer.flip();
\t\t//異步寫數據 參數與前面的read一樣
\t\tchannel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {
\t\t\t@Override
\t\t\tpublic void completed(Integer result, ByteBuffer buffer) {
\t\t\t\t//如果沒有發送完,就繼續發送直到完成
\t\t\t\tif (buffer.hasRemaining())
\t\t\t\t\tchannel.write(buffer, buffer, this);
\t\t\t\telse{
\t\t\t\t\t//創建新的Buffer
\t\t\t\t\tByteBuffer readBuffer = ByteBuffer.allocate(1024);
\t\t\t\t\t//異步讀 第三個參數為接收消息回調的業務Handler
\t\t\t\t\tchannel.read(readBuffer, readBuffer, new ReadHandler(channel));
\t\t\t\t}
\t\t\t}
\t\t\t@Override
\t\t\tpublic void failed(Throwable exc, ByteBuffer attachment) {
\t\t\t\ttry {
\t\t\t\t\tchannel.close();
\t\t\t\t} catch (IOException e) {
\t\t\t\t}
\t\t\t}
\t\t});
\t}
\t@Override
\tpublic void failed(Throwable exc, ByteBuffer attachment) {
\t\ttry {
\t\t\tthis.channel.close();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
}

OK,這樣就已經完成了,其實說起來也簡單,雖然代碼感覺很多,但是API比NIO的使用起來真的簡單多了,主要就是監聽、讀、寫等各種CompletionHandler。此處本應有一個WriteHandler的,確實,我們在ReadHandler中,以一個匿名內部類實現了它。

下面看客戶端代碼。

3.2、Client端代碼

Client:

package com.anxpp.io.calculator.aio.client;
import java.util.Scanner;
public class Client {
\tprivate static String DEFAULT_HOST = "127.0.0.1";
\tprivate static int DEFAULT_PORT = 12345;
\tprivate static AsyncClientHandler clientHandle;
\tpublic static void start(){
\t\tstart(DEFAULT_HOST,DEFAULT_PORT);
\t}
\tpublic static synchronized void start(String ip,int port){
\t\tif(clientHandle!=null)
\t\t\treturn;
\t\tclientHandle = new AsyncClientHandler(ip,port);
\t\tnew Thread(clientHandle,"Client").start();
\t}
\t//向服務器發送消息
\tpublic static boolean sendMsg(String msg) throws Exception{
\t\tif(msg.equals("q")) return false;
\t\tclientHandle.sendMsg(msg);
\t\treturn true;
\t}
\t@SuppressWarnings("resource")
\tpublic static void main(String[] args) throws Exception{
\t\tClient.start();
\t\tSystem.out.println("請輸入請求消息:");
\t\tScanner scanner = new Scanner(System.in);
\t\twhile(Client.sendMsg(scanner.nextLine()));
\t}
}

AsyncClientHandler:

package com.anxpp.io.calculator.aio.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable {
\tprivate AsynchronousSocketChannel clientChannel;
\tprivate String host;
\tprivate int port;
\tprivate CountDownLatch latch;
\tpublic AsyncClientHandler(String host, int port) {
\t\tthis.host = host;
\t\tthis.port = port;
\t\ttry {
\t\t\t//創建異步的客戶端通道
\t\t\tclientChannel = AsynchronousSocketChannel.open();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t@Override
\tpublic void run() {
\t\t//創建CountDownLatch等待
\t\tlatch = new CountDownLatch(1);
\t\t//發起異步連接操作,回調參數就是這個類本身,如果連接成功會回調completed方法
\t\tclientChannel.connect(new InetSocketAddress(host, port), this, this);
\t\ttry {
\t\t\tlatch.await();
\t\t} catch (InterruptedException e1) {
\t\t\te1.printStackTrace();
\t\t}
\t\ttry {
\t\t\tclientChannel.close();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t//連接服務器成功
\t//意味著TCP三次握手完成
\t@Override
\tpublic void completed(Void result, AsyncClientHandler attachment) {
\t\tSystem.out.println("客戶端成功連接到服務器...");
\t}
\t//連接服務器失敗
\t@Override
\tpublic void failed(Throwable exc, AsyncClientHandler attachment) {
\t\tSystem.err.println("連接服務器失敗...");
\t\texc.printStackTrace();
\t\ttry {
\t\t\tclientChannel.close();
\t\t\tlatch.countDown();
\t\t} catch (IOException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t//向服務器發送消息
\tpublic void sendMsg(String msg){
\t\tbyte[] req = msg.getBytes();
\t\tByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
\t\twriteBuffer.put(req);
\t\twriteBuffer.flip();
\t\t//異步寫
\t\tclientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch));
\t}
}

WriteHandler:

package com.anxpp.io.calculator.aio.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
\tprivate AsynchronousSocketChannel clientChannel;
\tprivate CountDownLatch latch;
\tpublic WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
\t\tthis.clientChannel = clientChannel;
\t\tthis.latch = latch;
\t}
\t@Override
\tpublic void completed(Integer result, ByteBuffer buffer) {
\t\t//完成全部數據的寫入
\t\tif (buffer.hasRemaining()) {
\t\t\tclientChannel.write(buffer, buffer, this);
\t\t}
\t\telse {
\t\t\t//讀取數據
\t\t\tByteBuffer readBuffer = ByteBuffer.allocate(1024);
\t\t\tclientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch));
\t\t}
\t}
\t@Override
\tpublic void failed(Throwable exc, ByteBuffer attachment) {
\t\tSystem.err.println("數據發送失敗...");
\t\ttry {
\t\t\tclientChannel.close();
\t\t\tlatch.countDown();
\t\t} catch (IOException e) {
\t\t}
\t}
}

ReadHandler:

package com.anxpp.io.calculator.aio.client;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
\tprivate AsynchronousSocketChannel clientChannel;
\tprivate CountDownLatch latch;
\tpublic ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
\t\tthis.clientChannel = clientChannel;
\t\tthis.latch = latch;
\t}
\t@Override
\tpublic void completed(Integer result,ByteBuffer buffer) {
\t\tbuffer.flip();
\t\tbyte[] bytes = new byte[buffer.remaining()];
\t\tbuffer.get(bytes);
\t\tString body;
\t\ttry {
\t\t\tbody = new String(bytes,"UTF-8");
\t\t\tSystem.out.println("客戶端收到結果:"+ body);
\t\t} catch (UnsupportedEncodingException e) {
\t\t\te.printStackTrace();
\t\t}
\t}
\t@Override
\tpublic void failed(Throwable exc,ByteBuffer attachment) {
\t\tSystem.err.println("數據讀取失敗...");
\t\ttry {
\t\t\tclientChannel.close();
\t\t\tlatch.countDown();
\t\t} catch (IOException e) {
\t\t}
\t}
}

這個API使用起來真的是很順手。

3.3、測試

Test:

package com.anxpp.io.calculator.aio;
import java.util.Scanner;
import com.anxpp.io.calculator.aio.client.Client;
import com.anxpp.io.calculator.aio.server.Server;
/**
* 測試方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public class Test {
\t//測試主方法
\t@SuppressWarnings("resource")
\tpublic static void main(String[] args) throws Exception{
\t\t//運行服務器
\t\tServer.start();
\t\t//避免客戶端先於服務器啟動前執行代碼
\t\tThread.sleep(100);
\t\t//運行客戶端
\t\tClient.start();
\t\tSystem.out.println("請輸入請求消息:");
\t\tScanner scanner = new Scanner(System.in);
\t\twhile(Client.sendMsg(scanner.nextLine()));
\t}
}

我們可以在控制檯輸入我們需要計算的算數字符串,服務器就會返回結果,當然,我們也可以運行大量的客戶端,都是沒有問題的,以為此處設計為單例客戶端,所以也就沒有演示大量客戶端併發。

讀者可以自己修改Client類,然後開闢大量線程,並使用構造方法創建很多的客戶端測試。

下面是其中一次參數的輸出:

服務器已啟動,端口號:12345
請輸入請求消息:
客戶端成功連接到服務器...
連接的客戶端數:1
123456+789+456
服務器收到消息: 123456+789+456
客戶端收到結果:124701
9526*56
服務器收到消息: 9526*56
客戶端收到結果:533456
...

AIO是真正的異步非阻塞的,所以,在面對超級大量的客戶端,更能得心應手。

下面就比較一下,幾種I/O編程的優缺點。

4、各種I/O的對比

先以一張表來直觀的對比一下:

Java 網絡IO編程總結

具體選擇什麼樣的模型或者NIO框架,完全基於業務的實際應用場景和性能需求,如果客戶端很少,服務器負荷不重,就沒有必要選擇開發起來相對不那麼簡單的NIO做服務端;相反,就應考慮使用NIO或者相關的框架了。

5、附錄

上文中服務端使用到的用於計算的工具類:

package com.anxpp.utils;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
public final class Calculator {
private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
public static Object cal(String expression) throws ScriptException{
return jse.eval(expression);
}
}

私信我:“資料”,可免費領取更多學習資料哦

Java 網絡IO編程總結

Java 網絡IO編程總結

"

相關推薦

推薦中...