前置知识
nio
- nio 一般指 同步非阻塞 io,同样用于**描述程序访问数据方式 **的还有bio(同步阻塞)、aio(异步非阻塞)
- 同步异步指获取结果的方式,同步为主动去获取结果,不管结果是否准备好,异步为等待结果准备好的通知
- 阻塞非阻塞是线程在结果没有到来之前,是否进行等待,阻塞为进行等待,非阻塞则不进行等待
- nio 主动地去获取结果,但是在结果没有准备好之前,不会进行等待。而是通过一个 多路复用器 管理多个通道,由一个线程轮训地去检查是否准备好即可。在网络编程中,多路复用器通常由操作系统提供,linux中主要有 select、poll、epoll。同步非阻塞指线程不等待数据的传输,而是完成后由多路复用器通知,线程再将数据从内核缓冲区拷贝到用户空间内存进行处理。
java nio
基于 nio 实现的网络框架,可以用少量的线程,处理大量的连接,更适用于高并发场景。于是,java提供了nio包提供相关组件,用于实现同步非阻塞io
核心三个类channel、buffer、selector。channel代表一个数据传输通道,但不进行数据存取,有buffer类进行数据管理,selector为一个复用器,管理多个通道
bytebuffer
- 该类为nio 包中用于操作内存的抽象类,具体实现由heapbytebuffer、directbytebuffer两种
- heapbytebuffer为堆内内存,底层通过 byte[ ] 存取数据
- directbytebuffer 为堆外内存,通过jdk提供的 unsafe类去存取;同时创建对象会关联的一个cleaner对象,当对象被gc时,通过cleaner对象去释放堆外内存
各核心组件介绍
nioserver
为启动程序类,监听端口,初始化channel
下面为nio模式下简单服务端处理代码
// 1、创建服务端channel,绑定端口并配置非阻塞
serversocketchannel serversocketchannel = serversocketchannel.open();
serversocketchannel.socket().bind(new inetsocketaddress(6666));
serversocketchannel.configureblocking(false);
// 2、创建多路复用器selector,并将channel注册到多路复用器上
// 不能直接调用channel的accept方法,因为属于非阻塞,直接调用没有新连接会直接返回
selector selector = selector.open();
serversocketchannel.register(selector, selectionkey.op_accept);
// 3、循环处理多路复用器的io事件
while(true){
// 3.1、select属于阻塞的方法,这里阻塞等待1秒
// 如果返回0,说明没有事件处理
if (selector.select(1000) == 0){
system.out.println("服务器等待了1秒,无io事件");
continue;
}
// 3.2、遍历事件进行处理
set<selectionkey> selectionkeys = selector.selectedkeys();
iterator<selectionkey> iterator = selectionkeys.iterator();
while(iterator.hasnext()){
selectionkey key = iterator.next();
// accept事件,说明有新的客户端连接
if (key.isacceptable()){
// 新建一个socketchannel,注册到selector,并关联buffer
socketchannel socketchannel = serversocketchannel.accept();
socketchannel.configureblocking(false);
socketchannel.register(selector,selectionkey.op_read, bytebuffer.allocate(1024));
system.out.println("客户端连接:"+socketchannel.getremoteaddress());
}
// read事件 (内核缓冲区的数据准备好了)
if(key.isreadable()){
socketchannel channel = (socketchannel)key.channel();
bytebuffer bytebuffer = (bytebuffer)key.attachment();
try {
// 将数据写进buffer
int readnum = channel.read(bytebuffer);
if (readnum == -1){
system.out.println("读取-1时,表示io流已结束");
channel.close();
break;
}
// 打印buffer
bytebuffer.flip();
byte[] bytes = new byte[readnum];
bytebuffer.get(bytes, 0, readnum);
system.out.println("读取到数据:" + new string(bytes));
} catch (ioexception e) {
system.out.println("读取发生异常,广播socket");
channel.close();
}
}
// write事件 (操作系统有内存写出了)
if (key.iswritable()){
socketchannel channel = (socketchannel)key.channel();
// 读取read时暂存数据
byte[] bytes = (byte[])key.attachment();
if (bytes != null){
system.out.println("可写事件发生,写入数据: " + new string(bytes));
channel.write(bytebuffer.wrap(bytes));
}
// 清空暂存数据,并切换成关注读事件
key.attach(null);
key.interestops(selectionkey.op_read);
}
iterator.remove();
}
}
eventloop
处理 channel 中数据的读写
- 在上面的server中,大量并发时单线程地处理读写事件会导致延迟,因此将读写处理抽取出来,可利用多线程实现高并发
- 一个eventloop会关联一个selector,只会处理这个selector上的channel
public class eventloop2 implements runnable{
private final thread thread;
/**
* 复用器,当前线程只处理这个复用器上的channel
*/
public selector selector;
/**
* 待处理的注册任务
*/
private final queue<runnable> queue = new linkedblockingqueue<>();
/**
* 初始化复用器,线程启动
* @throws ioexception
*/
public eventloop2() throws ioexception {
this.selector = selectorprovider.provider().openselector();
this.thread = new thread(this);
thread.start();
}
/**
* 将通道注册给当前的线程处理
* @param socketchannel
* @param keyops
*/
public void register(socketchannel socketchannel,int keyops){
// 将注册新的socketchannel到当前selector封装成一个任务
queue.add(()->{
try {
mychannel mychannel = new mychannel(socketchannel, this);
selectionkey key = socketchannel.register(selector, keyops);
key.attach(mychannel);
} catch (exception e){
e.printstacktrace();
}
});
// 唤醒阻塞等待的selector线程
selector.wakeup();
}
/**
* 循环地处理 注册事件、读写事件
*/
@override
public void run() {
while (!thread.isinterrupted()){
try {
int select = selector.select(1000);
// 处理注册到当前selector的事件
if (select == 0){
runnable task;
while ((task = queue.poll()) != null){
task.run();
}
continue;
}
// 处理读写事件
system.out.println("服务器收到读写事件,select:" + select);
processreadwrite();
}catch (exception e){
e.printstacktrace();
}
}
}
/**
* 处理读写事件
* @throws exception
*/
private void processreadwrite() throws exception{
system.out.println(thread.currentthread() + "开始监听读写事件");
// 3.2、遍历事件进行处理
set<selectionkey> selectionkeys = selector.selectedkeys();
iterator<selectionkey> iterator = selectionkeys.iterator();
while(iterator.hasnext()){
selectionkey key = iterator.next();
mychannel mychannel = (mychannel)key.attachment();
if(key.isreadable()){
// 将数据读进buffer
mychannel.doread(key);
}
if (key.iswritable()){
mychannel.dowrite(key);
}
iterator.remove();
}
}
}eventloopgroup
一组eventloop,轮训地为eventloop分配channel
public class eventloopgroup {
private eventloop2[] children = new eventloop2[1];
private atomicinteger idx = new atomicinteger(0);
public eventloopgroup() throws ioexception {
for (int i = 0; i < children.length; i++){
children[i] = new eventloop2();
}
}
public eventloop2 next(){
// 轮训每一个children
return children[idx.getandincrement() & (children.length - 1)];
}
public void register(socketchannel channel,int ops){
next().register(channel,ops);
}
}
channel
封装了socketchannel 和 pipline,将从channel读写的消息,沿着pipline上的节点进行处理
- 在上面eventloop中,注册channel到对应的selector前,会进行封装,将自定义的channel放在读写事件触发时会返回的selectionkey里面
- 同时提供了数据读写处理方法,读写事件触发时调用该方法,数据会沿着pipline上去处理
public class mychannel {
private socketchannel channel;
private eventloop2 eventloop;
private queue<bytebuffer> writequeue;
private pipline pipline;
/**
* 一个channel关联一个eventloop、一个pipline、一个socketchannel、一个writequeue
* @param channel
* @param eventloop
*/
public mychannel(socketchannel channel, eventloop2 eventloop) {
this.channel = channel;
this.eventloop = eventloop;
this.writequeue = new arraydeque<>();
this.pipline = new pipline(this,eventloop);
this.pipline.addlast(new myhandler1());
this.pipline.addlast(new myhandler2());
}
/**
* 读事件处理
* @param key
* @throws exception
*/
public void doread(selectionkey key) throws exception{
try {
bytebuffer buffer = bytebuffer.allocate(1024);
int readnum = channel.read(buffer);
if (readnum == -1){
system.out.println("读取-1时,表示io流已结束");
channel.close();
return;
}
// 转成可读状态
buffer.flip();
// 消息放入pipline,交给头节点, 头节点开始传递
pipline.headcontext.firechannelread(buffer);
} catch (ioexception e) {
system.out.println("读取发生异常,广播socket");
channel.close();
}
}
/**
* 真正地写出数据,关注写事件后,会触发
* @param key
* @throws ioexception
*/
public void dowrite(selectionkey key) throws ioexception{
bytebuffer buffer;
while ((buffer =writequeue.poll()) != null){
channel.write(buffer);
}
// 回复读取状态
key.interestops(selectionkey.op_read);
}
/**
* 写出到队列
* @param msg
*/
public void dowritequeue(bytebuffer msg){
writequeue.add(msg);
}
/**
* 从最后一个节点进行写出,写出到头节点是调用dowritequeue
* @param msg
*/
public void write(object msg){
this.pipline.tailcontext.write(msg);
}
/**
* 从最后一个节点进行flush,写出到头节点时调用doflush
*/
public void flush(){
this.pipline.tailcontext.flush();
}
/**
* 关注写事件,才能进行真正地写出
*/
public void doflush(){
this.channel.keyfor(eventloop.selector).interestops(selectionkey.op_write);
}
}handler 和 handlercontext
handler 接口定义了可以扩展处理的消息,由开发人员实现具体的处理
handlercontext 类封装了handler的实现类,将handler的上一个节点和下一个节点,让消息可以延者链表传递
public interface handler {
/**
* 读取数据处理
* @param ctx
* @param msg
*/
void channelread(handlercontext ctx,object msg);
/**
* 写出数据
* @param ctx
* @param msg
*/
void write(handlercontext ctx,object msg);
/**
* 刷下数据
* @param ctx
*/
void flush(handlercontext ctx);
}
public class handlercontext {
private handler handler;
mychannel channel;
handlercontext prev;
handlercontext next;
public handlercontext(handler handler, mychannel channel) {
this.handler = handler;
this.channel = channel;
}
/**
* 读消息的传递,从头节点开始往后传
* @param msg
*/
public void firechannelread(object msg){
handlercontext next = this.next;
if (next != null){
next.handler.channelread(next,msg);
}
}
/**
* 从尾节点开始往前传
* @param msg
*/
public void write(object msg){
handlercontext prev = this.prev;
if (prev != null){
prev.handler.write(prev,msg);
}
}
/**
* 从尾节点开始往前传
*/
public void flush(){
handlercontext prev = this.prev;
if (prev != null){
prev.handler.flush(prev);
}
}
}pipline
本质是链表,包含了头尾节点的handlercontext,提供方法给开发人员加节点
public class pipline {
private mychannel channel;
private eventloop2 eventloop;
public handlercontext headcontext;
public handlercontext tailcontext;
public pipline(mychannel channel, eventloop2 eventloop) {
this.channel = channel;
this.eventloop = eventloop;
piphandler headhandler = new piphandler();
this.headcontext = new handlercontext(headhandler,channel);
piphandler tailhandler = new piphandler();
this.tailcontext = new handlercontext(tailhandler,channel);
// 构建链表
this.headcontext.next = this.tailcontext;
this.tailcontext.prev = this.headcontext;
}
public void addlast(handler handler){
handlercontext curr = new handlercontext(handler, channel);
// 连接在倒数第二个后面
handlercontext lastbutone = this.tailcontext.prev;
lastbutone.next = curr;
curr.prev = lastbutone;
// 连接在最后一个前面
curr.next = tailcontext;
tailcontext.prev = curr;
}
public static class piphandler implements handler{
@override
public void channelread(handlercontext ctx, object msg) {
system.out.println("接收"+(string) msg +"进行资源释放");
}
@override
public void write(handlercontext ctx, object msg) {
system.out.println("写出"+msg.tostring());
}
@override
public void flush(handlercontext ctx) {
system.out.println("flush");
}
}
}到此这篇关于基于java nio编写一个简单版netty服务端的文章就介绍到这了,更多相关java nio编写netty服务端内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论