Netty的介绍与使用--第三天(Netty的介绍与简单的使用)
Netty的介绍与使用--第三天(Netty的介绍与简单的使用)
1.Netty说明
- 1.Netty是由JBOSS提供的一个java开源框架,Netty提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠的网络IO程序;
- 2.Netty是目前最流行的NIO框架,Netty在互联网领域中,大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,Dubbo框架内部都采用了Netty;
- Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了 NIO 的 开发过程。
2.Netty的优点
- 设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展 的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个 线程池;
- 使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。
- 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
- 安全:完整的 SSL/TLS 和 StartTLS 支持。
- 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复, 同时,更多的新功能会被加入
3.Netty版本说明
- netty版本分为 netty3.x 和 netty4.x、netty5.x;
- 因为Netty5出现重大bug,已经被官网废弃了,目前推荐使用的是Netty4.x的稳定版本;
- 目前在官网可下载的版本 netty3.x netty4.0.x 和 netty4.1.x;
- netty 下载地址: https://bintray.com/netty/downloads/netty/。
4.Netty的工作原理
Netty工作原理示意图:
对该图进行解释:
(1)Netty抽象出两组线程池BossGroup专门负责接收客户端的连接,workerGroup专门负责网络的读写;
(2)BossGroup和workGroup类型都是NioEventLoopGroup;
(3)NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循 环 ,每一个事件循环是 NioEventLoop
(4)NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用户监听绑定在其上的socket的网络通讯;
(5)NioEventLoopGroup 可以有多个线程, 即可以含有多个NioEventLoop;
(6)每个Boss NioEventLoop 循环执行的步骤有3步:
a:轮询accept 事件;
b:处理accept 事件 , 与client建立连接 , 生成NioScocketChannel , 并将其 注册到某个worker NIOEventLoop 上的 selector;
c:处理任务队列的任务 , 即 runAllTasks.
(7) 每个 Worker NIOEventLoop 循环执行的步骤
a:轮询read, write 事件;
b:处理i/o事件, 即read , write 事件,在对应NioScocketChannel 处理;
c:处理任务队列的任务 , 即 runAllTasks。
(8)每个Worker NIOEventLoop 处理业务时,会使用pipeline(管道), pipeline。
5.代码实现
功能说明:
使用Netty的技术创建客户端和服务端,并且两者进行通讯:
在创建的项目的时候想要先添加Netty的相关的包
可以通过这种方式添加相关的包
服务端代码
package netty.NettySimple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.Date;
/**
* @author linjiazeng
* @version 1.0
* @date 2020/10/22 20:32
**/
public class NettyServer {
public static void main(String[] args)throws Exception {
//说明
//1,创建两个线程组bossGroup和wokerGroup
//2,bossGroup只是处理连接请求,真正的和客户端业务处理,会交给workerGroup完成
//3.两个都是无限循环
EventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程进行设置
try {
bootstrap.group(boosGroup,workerGroup)//设置两个线程
.channel(NioServerSocketChannel.class)//NioServerSocketChannel
.option(ChannelOption.SO_BACKLOG,128)//设置线程队列的连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("客户端socketchannel hashcode="+ch.hashCode());
//可以使用一个集合管理,SokcetChannel,再推送消息时,可以将业务到各个channel对应的NIOEventLpp的taskQueen或者scheduleTaskQueue
ch.pipeline().addLast(new NettyServerHandler());
}
}) ; //给我们的workerGroup的eventloop对应的管道设置处理器
System.out.println(".......服务器is really!!");
//绑定一个端口并同步,生成一个ChannelFuture对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//Future-Listener机制
/*
* 1.当future对象刚刚建立时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作
* 2.常见有如下操作
* (1)通过isDone方法来判断当前操作是否完成
* (2)通过isSuccess方法来判断已完成的当前操作是否成功
* (3)通过getCause方法来获取已完成的当前操作失败的原因
* (4)通过isCancelled方法来判断已完成的当前操作是否被取消
* (5)通过addListener方法来注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器;如果Future对象已完成,则通知指定的监听器
*
* */
//给cf注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()){
System.out.println(new Date()+":端口6668成功");
} else {
System.out.println("监听端口 6668 失败!");
}
if (cf.isDone()){
} if (cf.isCancelled()){
}
}
});
//对关闭通道进行监听
cf.channel().closeFuture().sync();
}finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package netty.NettySimple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author linjiazeng
* @version 1.0
* @date 2020/10/22 20:45
**/
/**
* 1.我们之定义一个handler,需要继承netty规定号的某个HandlerAdapter
* 2,这时我们定义的一个handler,才是真正的handler
*
* */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//读取实际数据
//1.ChannelHandlerContext ctx:上下文对象,含有管道pipline,通道channel,地址
//2.Obeject msg :就是客户端发送的数据 默认 Object
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server cxt="+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端 喵2" ,CharsetUtil.UTF_8));
//比如这里我们有一个非常耗时的业务-》异步执行-》提交该channel对应的
//NIOEvenLoop的TaskQueue中
//解决方案1 用户程序自定义的普通任务
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端 " ,CharsetUtil.UTF_8));
} catch (InterruptedException e) {
System.out.println("发生异常"+e.getMessage());
}
}
});
//将msg转成一个bytebuf
//ByteBuf是Netty‘提供 的,不是NIO的ByteBuff
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端发送消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush是write + flush
//将数据写入缓存,并刷新
//一般讲。我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello ,客户端",CharsetUtil.UTF_8));
}
//处理异常,一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端:
package netty.NettySimple;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author linjiazeng
* @version 1.0
* @date 2020/10/22 21:07
**/
public class NettyClient {
public static void main(String[] args) throws Exception{
//客户端需要一个事件循环组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
//创建客户端启动对象
Bootstrap bootstrap = new Bootstrap();
try{
//设置相关参数
bootstrap.group(eventExecutors)//设置线程组
.channel(NioSocketChannel.class)//设置客户端通道的实现类
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客户端-----ok");
// 启动客户端去连接服务器端
//关于channelFuture要分析,涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
channelFuture.channel().closeFuture().sync();
}finally {
eventExecutors.shutdownGracefully();
}
}
}
package netty.NettySimple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author linjiazeng
* @version 1.0
* @date 2020/10/22 21:25
**/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client"+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端", CharsetUtil.UTF_8));
}
//当同懂啊有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器地址:"+ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
}
运行后的效果:
加油吧!打工人!!!!!
作者:一个还没入门的程序员
来源链接:https://blog.csdn.net/weixin_37541878/article/details/109339466