一文了解Netty的使用ls
helloworld
使用前需要导入netty依赖
1 2 3 4 5
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.79.Final</version> </dependency>
|
hello-server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler( new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); } }); } }).bind(8080); }
|
hello-client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public static void main(String[] args) throws InterruptedException { Channel channel = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost", 8080)) .sync() .channel(); channel.writeAndFlush("hello world"); }
|
组件
EventLoop
EventLoop
所谓的EventLoop就是一个事件循环对象,是一个的单线程执行器(同时维护了一个Selector),里面有run方法处理Channel上源源不断的io事件
它的继承关系比较复杂
- 一条线是继承值juc.ScheduledExcecutorService,因此包含了线程池中所有方法
- 另一条线是继承自netty自己实现的OrderedEventExecutor
EventLoopGroup
EventLoopGroup是一组EventLoop,channel一般会调用EventLoopGroup的register方法绑定一个EventLoop,后续这个Channel上的io事件都由此EventLoop来处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| EventLoopGroup group = new NioEventLoopGroup(2);
System.out.println(group.next());
group.next().submit(() -> { System.out.println("执行任务"); });
group.next().scheduleAtFixedRate(() -> { System.out.println("执行"); },0, 1, TimeUnit.SECONDS);
EventLoopGroup group2 = new DefaultEventLoop(); new ServerBootstrap() .group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf buf = (ByteBuf) msg; System.out.println(buf.toString(Charset.defaultCharset())); ctx.fireChannelRead(msg); } }); ch.pipeline().addLast(group2, new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("给group2执行的handler"); } }); } }).bind(8080);
|
Channel
channel主要作用
1 2 3 4 5
| close() closeFutrue() pipeline() write() writeAndFlush()
|
关于ChannelFutrue
连接是非阻塞的,会返回一个ChannelFutrue,通过ChannelFuture来获取channel。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| ChannelFuture channelFuture = (ChannelFuture) new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost", 8080));
channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Channel channel = future.channel(); channel.writeAndFlush("hello world"); ChannelFuture closeFuture = channel.closeFuture(); closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { System.out.println("连接关闭"); } }); } });
|
handler & pipeline
ChannelHanderl用于处理Channel上各种事件,分为入站、出站。所有ChannelHandler被连成一串就是Pipeline
- 入站ChannelInboundHanderAdapter,用于读取客户端数据,写回结果
- 出站ChannelOutboundHandlerAdapter,用于写回结果进行加工
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("1"); super.channelRead(ctx, msg); } }); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("2"); ch.writeAndFlush(ctx.alloc().buffer().writeBytes("hello".getBytes())); } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("3"); super.write(ctx, msg, promise); } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("4"); super.write(ctx, msg, promise); } }); } }).bind(8080);
|
- head -> h1 -> h2 -> h3 -> h4 -> tail,当调用channerl.write会从tail向前去找channelOutboundHandlerAdapter,如果调用context.write则会从当前handler向前找。
byteBuf
基本使用
1 2 3 4 5 6 7 8
| ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); StringBuilder stringBuilder = new StringBuilder(); for (int i = 0; i < 300; i++) { stringBuilder.append("a"); } buf.writeBytes(stringBuilder.toString().getBytes()); System.out.println(buf);
|
直接内存和堆内存
可以申请直接内存:
1
| ByteBuf buf = ByteBufAllocator.DEFAULT.directBuffer();
|
也可以申请堆内存
1
| ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
|
- 直接内存创建和销毁的代价高,但是读写性能高,配合池化功能一起用
- 直接内存对于GC压力小,因为这部分内存不受JVM垃圾回收管理,但也要注意及时主动释放
池化和非池化
池化的可以重用ByteBuf
- 不需要每次都创建新ByteBuf实例,可以重用ByteBuf实例,并且采用与jemalloc类似内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出可能
组成
一共有四个部分
回收
byteBuf有许多不同的实现,不同的实现需要使用不同的方法进行回收
- UnpooledHeapByteBuf 使用的是JVM内存,只需要等待GC回收即可
- UnpooledDirectByteBuf 使用的是直接内存,需要特殊的方法来回收
- PooledByteBuf 和它的子类使用了池化机制,回收更复杂
Nettt 采用了引用计数来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口
- 每个ByteBuf对象初始计数为1
- 调用release方法计数减1,如果计数为0,ByteBuf内存被回收
- 调用retain方法计数加1,表示调用者没用完之前,其他handler即使调用了release也不会造成回收
- 当计数为0时,底层内存会被回收,即使ByteBuf对象还在,其各个方法均无法使用
Slice
零拷贝的体现之一,对于原始的ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf并没有发送内存的复制,而是使用原始ByteBuf的内存,切片后的ByteBuf维护独立的read,write指针
1 2 3 4
| ByteBuf buf = ByteBufAllocator.DEFAULT.directBuffer(); buf.writeBytes("abcdefg".getBytes()); ByteBuf buf1 = buf.slice(0,2); ByteBuf buf2 = buf.slice(2, 5);
|
注:
CompositeBuffer
可以将多个小的ByteBuf合并成一个大的ByteBuf,而不发生数据的拷贝
1 2 3 4 5 6 7
| ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(); buf1.writeBytes("123456".getBytes()); ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(); buf2.writeBytes("6789".getBytes()); CompositeByteBuf buf = ByteBufAllocator.DEFAULT.compositeBuffer();
buf.addComponents(true, buf1, buf2);
|