简介
由 JBOSS 提供的一个基于 JAVA NIO 类库的开源异步通信框架。
特点:异步非阻塞、基于事件驱动、高性能、高可靠性和高可定制性。
换句话说,Netty是一个NIO框架,使用它可以简单快速地开发网络应用程序,比如客户端和服务端的协议。
Netty大大简化了网络程序的开发过程比如TCP和UDP的 Socket的开发。
Netty 已逐渐成为 Java NIO 编程的首选框架
优点:
API 使用简单,开发门槛低;
功能强大,预置了多种编解码功能,支持多种主流协议;
定制能力强,可以通过 ChannelHandler 对通信框架进行灵活的扩展;
性能高,通过与其它业界主流的 NIO 框架对比,Netty 的综合性能最优;
社区活跃,版本迭代周期短,发现的 BUG 可以被及时修复,同时,更多的新功能会被加入;
经历了大规模的商业应用考验,质量得到验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业得到成功商用,证明了它完全满足不同行业的商用标准。
代码示例
pom依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.0.Final</version>
</dependency>
简单示例
SimpleServer(服务端)
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
*
* Netty中,通讯的双方建立连接后,会把数据按照ByteBuf的方式进行传输,
* 例如http协议中,就是通过HttpRequestDecoder对ByteBuf数据流进行处理,转换成http的对象。
*
*/
public class SimpleServer {
private int port;
public SimpleServer(int port) {
this.port = port;
}
public void run() throws Exception {
//EventLoopGroup是用来处理IO操作的多线程事件循环器
//bossGroup 用来接收进来的连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//workerGroup 用来处理已经被接收的连接
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//启动 NIO 服务的辅助启动类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
//配置 Channel
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 注册handler
ch.pipeline().addLast(new SimpleServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(port).sync();
// 等待服务器 socket 关闭 。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new SimpleServer(9999).run();
}
}
SimpleServerHandler(服务端请求处理Handler)
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class SimpleServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("SimpleServerHandler.channelRead");
ByteBuf result = (ByteBuf) msg;
byte[] result1 = new byte[result.readableBytes()];
// msg中存储的是ByteBuf类型的数据,把数据读取到byte[]中
result.readBytes(result1);
String resultStr = new String(result1);
// 接收并打印客户端的信息
System.out.println("Client said:" + resultStr);
// 释放资源,这行很关键
result.release();
// 向客户端发送消息
String response = "hello client!";
// 在当前场景下,发送的数据必须转换成ByteBuf数组
ByteBuf encoded = ctx.alloc().buffer(4 * response.length());
encoded.writeBytes(response.getBytes());
ctx.write(encoded);
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
SimpleServer(客户端)
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class SimpleClient {
public void connect(String host, int port) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SimpleClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
SimpleClient client=new SimpleClient();
client.connect("127.0.0.1", 9999);
}
}
进阶示例
服务端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* 1. 双线程组
* 2. Bootstrap配置启动信息
* 3. 注册业务处理Handler
* 4. 绑定服务监听端口并启动服务
*/
public class Server4HelloWorld {
// 监听线程组,监听客户端请求
private EventLoopGroup acceptorGroup = null;
// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
private EventLoopGroup clientGroup = null;
// 服务启动相关配置信息
private ServerBootstrap bootstrap = null;
public Server4HelloWorld(){
init();
}
private void init(){
// 初始化线程组,构建线程组的时候,如果不传递参数,则默认构建的线程组线程数是CPU核心数量。
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
// 初始化服务的配置
bootstrap = new ServerBootstrap();
// 绑定线程组
bootstrap.group(acceptorGroup, clientGroup);
// 设定通讯模式为NIO, 同步非阻塞
bootstrap.channel(NioServerSocketChannel.class);
// 设定缓冲区大小, 缓存区的单位是字节。
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
/**
* 监听处理逻辑。
* @param port 监听端口。
* @param acceptorHandlers 处理器, 如何处理客户端请求。
* @return
* @throws InterruptedException
*/
public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{
/*
* childHandler是服务的Bootstrap独有的方法。是用于提供处理对象的。
* 可以一次性增加若干个处理逻辑。是类似责任链模式的处理方式。
* 增加A,B两个处理逻辑,在处理客户端请求数据的时候,根据A-》B顺序依次处理。
*
* ChannelInitializer - 用于提供处理器的一个模型对象。
* 其中定义了一个方法,initChannel方法。
* 方法是用于初始化处理逻辑责任链条的。
* 可以保证服务端的Bootstrap只初始化一次处理器,尽量提供处理逻辑的重用。
* 避免反复的创建处理器对象。节约资源开销。
*/
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(acceptorHandlers);
}
});
// bind方法 - 绑定监听端口的。ServerBootstrap可以绑定多个监听端口。 多次调用bind方法即可
// sync - 开始监听逻辑。 返回一个ChannelFuture。 返回结果代表的是监听成功后的一个对应的未来结果
// 可以使用ChannelFuture实现后续的服务器和客户端的交互。
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
/**
* shutdownGracefully - 方法是一个安全关闭的方法。可以保证不放弃任何一个已接收的客户端请求。
*/
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4HelloWorld server = null;
try{
server = new Server4HelloWorld();
future = server.doAccept(9999,new Server4HelloWorldHandler());
System.out.println("server started.");
// 关闭连接的。
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* @Sharable注解 -
* 代表当前Handler是一个可以分享的处理器。也就意味着,服务器注册此Handler后,可以分享给多个客户端同时使用。
* 如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象。
* 如果handler是一个Sharable的,一定避免定义可写的实例变量。
* bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new XxxHandler());
}
});
*/
@Sharable
public class Server4HelloWorldHandler extends ChannelHandlerAdapter {
/**
* 业务处理逻辑
* 用于处理读取数据请求的逻辑。
* ctx - 上下文对象。其中包含于客户端建立连接的所有资源。 如: 对应的Channel
* msg - 读取到的数据。 默认类型是ByteBuf,是Netty自定义的。是对ByteBuffer的封装。 不需要考虑复位问题。
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/*获取读取的数据, 是一个缓冲。*/
ByteBuf readBuffer = (ByteBuf) msg;
// 创建一个字节数组,用于保存缓存中的数据。
byte[] tempDatas = new byte[readBuffer.readableBytes()];
// 将缓存中的数据读取到字节数组中。/**/
readBuffer.readBytes(tempDatas);
String message = new String(tempDatas, "UTF-8");
System.out.println("from client : " + message);
if("exit".equals(message)){
ctx.close();
return;
}
String line = "server message to client!";
// 写操作自动释放缓存,避免内存溢出问题。
ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
// 注意,如果调用的是write方法。不会刷新缓存,缓存中的数据不会发送到客户端,必须再次调用flush方法才行。
// ctx.write(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
// ctx.flush();
}
/**
* 异常处理逻辑, 当客户端异常退出的时候,也会运行。
* ChannelHandlerContext关闭,也代表当前与客户端连接的资源关闭。
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
客户端
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* 1. 单线程组
* 2. Bootstrap配置启动信息
* 3. 注册业务处理Handler
* 4. connect连接服务,并发起请求
*
* 因为客户端是请求的发起者,不需要监听。
* 只需要定义唯一的一个线程组即可。
*/
public class Client4HelloWorld {
// 处理请求和处理服务端响应的线程组
private EventLoopGroup group = null;
// 客户端启动相关配置信息
private Bootstrap bootstrap = null;
public Client4HelloWorld(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 绑定线程组
bootstrap.group(group);
// 设定通讯模式为NIO
bootstrap.channel(NioSocketChannel.class);
}
public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
/*
* 客户端的Bootstrap没有childHandler方法。只有handler方法。
* 方法含义等同ServerBootstrap中的childHandler
* 在客户端必须绑定处理器,也就是必须调用handler方法。
* 服务器必须绑定处理器,必须调用childHandler方法。
*/
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(handlers);
}
});
// 建立连接。
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4HelloWorld client = null;
ChannelFuture future = null;
try{
client = new Client4HelloWorld();
future = client.doRequest("localhost", 9999, new Client4HelloWorldHandler());
Scanner s = null;
while(true){
s = new Scanner(System.in);
System.out.print("enter message send to server (enter 'exit' for close client) > ");
String line = s.nextLine();
if("exit".equals(line)){
// addListener - 增加监听,当某条件满足的时候,触发监听器。
// ChannelFutureListener.CLOSE - 关闭监听器,代表ChannelFuture执行返回后,关闭连接。
future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")))
.addListener(ChannelFutureListener.CLOSE);
break;
}
future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
TimeUnit.SECONDS.sleep(1);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class Client4DelimiterHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
String message = msg.toString();
System.out.println("from server : " + message);
}finally{
// 用于释放缓存。避免内存溢出
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
线程模型
Netty 中支持单线程模型,多线程模型,主从多线程模型。
单线程模型
在 ServerBootstrap 调用方法 group 的时候,传递的参数是同一个线程组,且在构造线程组的时候,构造参数为 1,这种开发方式,就是一个单线程模型。
个人机开发测试使用。不推荐。
多线程模型
在 ServerBootstrap 调用方法 group 的时候,传递的参数是两个不同的线程组。负责监听的 acceptor 线程组,线程数为 1,也就是构造参数为 1。负责处理客户端任务的线程组,线程数大于 1,也就是构造参数大于 1。这种开发方式,就是多线程模型。
长连接,且客户端数量较少,连接持续时间较长情况下使用。如:企业内部交流应用。
主从多线程模型
在 ServerBootstrap 调用方法 group 的时候,传递的参数是两个不同的线程组。负责监听的 acceptor 线程组,线程数大于 1,也就是构造参数大于 1。负责处理客户端任务的线程组,线程数大于 1,也就是构造参数大于 1。这种开发方式,就是主从多线程模型。
长连接,客户端数量相对较多,连接持续时间比较长的情况下使用。如:对外提供服务的相册服务器。
拆包粘包问题解决
netty 使用 tcp/ip 协议传输数据。而 tcp/ip 协议是类似水流一样的数据传输方式。多次访问的时候有可能出现数据粘包的问题,解决这种问题的方式如下:
定长数据流
客户端和服务器,提前协调好,每个消息长度固定。(如:长度 10)。如果客户端或服务器写出的数据不足 10,则使用空白字符补足(如:使用空格)。
特殊结束符
客户端和服务器,协商定义一个特殊的分隔符号,分隔符号长度自定义。如:‘ #’、‘$_$’、‘AA@’。在通讯的时候,只要没有发送分隔符号,则代表一条数据没有结束。
协议
相对最成熟的数据传递方式。有服务器的开发者提供一个固定格式的协议标准。客户端和服务器发送数据和接受数据的时候,都依据协议制定和解析消息。
序列化对象
JBoss Marshalling 序列化
Java 是面向对象的开发语言。传递的数据如果是 Java 对象,应该是最方便且可靠。
定时断线重连
客户端断线重连机制。
客户端数量多,且需要传递的数据量级较大。可以周期性的发送数据的时候,使用。要求对数据的即时性不高的时候,才可使用。
优点: 可以使用数据缓存。不是每条数据进行一次数据交互。可以定时回收资源,对资源利用率高。相对来说,即时性可以通过其他方式保证。如: 120 秒自动断线。数据变化 1000 次请求服务器一次。300 秒中自动发送不足 1000 次的变化数据。
心跳监测
使用定时发送消息的方式,实现硬件检测,达到心态检测的目的。
心跳监测是用于检测电脑硬件和软件信息的一种技术。如:CPU 使用率,磁盘使用率,内存使用率,进程情况,线程情况等。
HTTP 协议处理
使用 Netty 服务开发。实现 HTTP 协议处理逻辑。