NIo开发利器ByteBuffer

源码学徒

共 13895字,需浏览 28分钟

 · 2021-06-28

有道无术,术尚可求也!有术无道,止于术!

想要使用NIO开发Socket分服务端和客户端,必须掌握的一个知识点就是ByteBuffer的使用,他是NIO再数据传输中的利器!相比于BIO传输过程中的字节流,ByteBuffer更能体现出服务端/客户端对于数据的操作效率,ByteBuffer内存维护一个指针,使得传输的数据真正的能够达到重复使用,重复读写的能力!

主要API和属性

ByteBuffer类图

他是对于Buffer的一个默认实现,具体主要的属性和方法我们需要看Buffer类:

主要属性

//指针标记
private int mark = -1;
//指针的当前位置
private int position = 0;
//翻转后界限
private int limit;
//最大容量
private int capacity;
//当为堆外内存的时候,内存的地址
long address;

主要方法

//返回当前缓冲区的最大容量
public final int capacity() {return capacity;}
//返回当前的指针位置
public final int position() {return position;}
//返回当前的读写界限
public final int limit() {return limit;}
//标记当前指针位置
public final Buffer mark() {
mark = position;
return this;
}
//恢复当前指针位置
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
//清空缓冲区,注意这里并不会清空数据,只是将各项指标初始化,后续再写入数据就直接覆盖
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
//切换读写模式
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
//重新从头进行读写,初始化指针和标记位置
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}
//剩余可读可写的数量
public final int remaining() {return limit - position;}
//当前是否可读/可写
public final boolean hasRemaining() {return position < limit;}
//是不是只读的
public abstract boolean isReadOnly();
//是不是支持数组访问
public abstract boolean hasArray();
//获取当前缓存的字节数组(当hasArray返回为true的时候)
public abstract Object array();
//是不是堆外缓冲区也就是直接缓冲区
public abstract boolean isDirect();
//取消缓冲区
final void discardMark() {mark = -1;}

堆内缓冲

什么是堆内缓冲区?所谓的堆内缓冲区,顾名思义就是再JVM对上分配的缓冲区,一般由**byte[]**实现,它有一个好处,就是它的内存的分配与回收由JVM自动完成,用户不必自己再操心内存释放的问题,但是缺点也很明显,就是它再数据传输的时候,需要将数据从JVM复制到本地物理内存上,多了一次复制操作!

创建堆内缓冲区

java堆内缓冲区的默认实现是 HeapByteBuffer,但是这个对象是一个 default权限的类,你是无法直接创建的,只能通过JDK底层暴露的api来创建:

//1. 分配一个最大能够存储128个字节的堆内存
ByteBuffer heapRam = ByteBuffer.allocate(128);
//2. 或者直接初始化数据创建
ByteBuffer wrapBuffer = ByteBuffer.wrap("欢迎关注公众号:【源码学徒】 学习更多源码知识!".getBytes());

堆内缓冲区API源码解析

构造方法

以上两种方案创建的都是一个堆内缓冲区,他们创建的逻辑大致相同,我们以 ByteBuffer.allocate为例进行分析:

public static ByteBuffer allocate(int capacity) {
if (capacity < 0) {
throw new IllegalArgumentException();
}
//创建一个堆内缓冲区
return new HeapByteBuffer(capacity, capacity);
}

我们可以看到,通过 ByteBuffer.allocate创建的缓冲区是一个 HeapByteBuffer,他是堆内缓冲区!我们继续往下分析:

HeapByteBuffer(int cap, int lim) {
super(-1, 0, lim, cap, new byte[cap], 0);
}

注意此时,cap和lim都是我们传递的大小,内部还创建了一个cap大小的字节数组传递下去!他就是堆内最终存储数据的数组!

// mark = -1      pos = 0      lim = 128    cap = 128   hb = 字节数组对象    offset = 0
ByteBuffer(int mark, int pos, int lim, int cap, byte[] hb, int offset) {
super(mark, pos, lim, cap);
//前面创建的字节数组对象
this.hb = hb;
//保存一个偏移量 默认为0
this.offset = offset;
}

然后再调用父类的构造参数:

Buffer(int mark, int pos, int lim, int cap) {
if (cap < 0){
throw new IllegalArgumentException("Negative capacity: " + cap);
}
//保存最大容量
this.capacity = cap;
//保存limit
limit(lim);
//保存pos指针位置
position(pos);
//........ 忽略无必要代码.........
}

此时我们的一个堆内缓冲区就创建完成了,它的内部结构如下:

init 堆内缓冲区的结构

put方法

现在我们的容器创建好了,我们就需要往里面怼数据了呀,我们需要往里面写入一段字节数组:

heapRam.put("A".getBytes());

我们调用put方法往ByteBuffer里面写入一段数据会发生什么呢?

public ByteBuffer put(byte[] src, int offset, int length) {
//先判断当前数据的长度是否超过可写长度了
// remaining() = limit - position = 128 - 0
if (length > remaining()) {
throw new BufferOverflowException();
}
//hb还记得吗,就是我们再创建堆内缓冲区所创建的字节数组
//这里就是将我们的数据拷贝到从当前的指针位置开始的堆内缓存(hb字节数组)
System.arraycopy(src, offset, hb, ix(position()), length);
//将当前的指针位置 + 数据长度,我们本次写入的数据长度是1 那么当前的指针索引就是 1
position(position() + length);
return this;
}

当调用put方法后,内部数据结构如下:

调用put方法后的数据结构

为了方便后续的讲解我们再次写入几个数据的时候,逻辑和上方一样:

heapRam.put("B".getBytes());
heapRam.put("C".getBytes());
heapRam.put("D".getBytes());
heapRam.put("E".getBytes());
再次调用put方法

get方法

我们现在再缓冲区里面写入了 ABCDE五个数据,此时我们如果想从缓冲区取数据,就应该调用另外一个api:get()方法

byte b = heapRam.get();
System.out.println(new String(new byte[]{b}));

但是,很奇怪的是,我们打印了一个空,并没有想象中的打印一个A,这是为什么呢?我们由上面的分析可以知道,每次缓冲区对于数据的操作都是基于指针来做的,我们每一次操作数据,指针都会后移一位,当我们发生一个get()请求后,指针依旧会后移,将下标为5的数据返回同时自身自增变为6.但是下标为5的并没有数据,只能返回一个空数据,所以我们如果想从头读数据,就必须想办法将指针复位,重新变为0,我们此时往里面写数据,我们称之为写模式,想要切换到读模式就必须调用 **heapRam.flip();**方法来切换读写模式,复位读写指针!

heapRam.flip();

那这个api具体做了什么呢?仅仅是将读写指针复位吗?  那我提出一个问题,不妨读者读到这里思考一下,如果仅仅是指针复位的话,我们如何控制不让用户读超呢? **我们只写入了5个数,如何避免用户读第六个数据呢?**我们带着疑问,看下 flip方法究竟做了什么:

public final Buffer flip() {
//将当前的指针位置赋值给limit
limit = position;
//读写指针复位
position = 0;
mark = -1;
return this;
}

filp方法

我们可以看到,filp方法再复位读写指针之前,记录了一个位置 limit,具体他是干嘛的,我么稍后再说       到现在为止,我们的数据结构如下:

filp方法
byte b = heapRam.get();
System.out.println(new String(new byte[]{b}));

此时我们再次调用get方法,指针后移,同时返回当前指针位置代表的数据:注意 ix方法不用管,他是计算偏移量的,这里始终是0

get方法
public byte get() {
return hb[ix(nextGetIndex())];
}

nextGetIndex:主要是判断当前指针是否超过了 limit的限制,同时自增指针位置

final int nextGetIndex() {
//limit的作用在这里被体现,判断你的读指针是不是读超了数据范围
if (position >= limit){
throw new BufferUnderflowException();
}
//返回读指针的位置,并自增1
return position++;
}

get方法主要是直接返回字节数组某个下标的位置的字节数据!

我们多读一些数据:

byte[] bytes = new byte[4];
heapRam.get(bytes);
System.out.println(new String(bytes));

当我们传递了一个字节数组去读取的时候,它的内部是如何做的呢?

public ByteBuffer get(byte[] dst) {
return get(dst, 0, dst.length);
}
public ByteBuffer get(byte[] dst, int offset, int length) {
checkBounds(offset, length, dst.length);
if (length > remaining())
throw new BufferUnderflowException();
//将数据拷贝至我们传递的字节数组中
System.arraycopy(hb, ix(position()), dst, offset, length);
//读指针位置+我们要读取的长度
position(position() + length);
return this;
}

此时缓冲区的内部数据结构如下:

image-20210321132547345

我们把数据读完了,下面,我又想往里面写数据了,假设直接写是否能写呢? put方法向下表为5的地方写一个数据,同时指针后移,似乎可行,我们分析一下put方法,具体我们前面已经分析过了,再put方法的源码中,有这么一段逻辑:

if (length > remaining())
throw new BufferOverflowException();

假设写入数据的长度,大于剩余可写长度,就会报错,我们具体看下这个方法的逻辑:

public final int remaining() {
return limit - position;
}

我们看上图的数据结构数据可知,该结果为0,就必定会报错,所以说,当我们向再次切换成写模式的话,就一定要初始化 pos,还是调用filp方法吗?重新调用filp方法固然可行,但是,调用filp方法并不会初始化limit的大小,造成明明我们分配了128个字节的大小,但是可用的永远都只有5个,所以,我们如果想让数据重新能够初始化,就必须让limit = capacity,JDK也为我们提供了接口:clear

clear方法

heapRam.clear();
 public final Buffer clear() {
//读写指针归零
position = 0;
//limit初始化为初始状态
limit = capacity;
//标记初始化为初始状态
mark = -1;
return this;
}

可以看到,clear方法将我们缓冲区中的所有指标全部的进行初始化了,指针重新归0,但是JDK考虑到性能影响byte数组中的数据并没有被清除,只会被新数据覆盖调!

由同学会问,你不是说ByteBuffer可以进行重复的读取吗? 这明明只能读一遍,读完就得初始化指针位置,你骗人!

别着急,想要进行重复的读写操作,我们必须还要掌握另外一组API:mark() 、reset();

我们假设我们此时处于读模式,数据结构如下:

image-20210321140847866

mark方法

我们此时想,一会读完数据了,还想再次回到当前的位置进行数据的二次读取,我们此时就应该调用mark()方法,打个标记,它的底层会记录当前指针的位置:

heapRam.mark();
public final Buffer mark() {
//记录当前读指针的位置
mark = position;
return this;
}

调用mark方法之后,我们的数据结构如下:

mark方法

然后我们将数据读完:

image-20210321143705395

reset方法

我们将数据读完之后,想再从标记位置开始读取的时候:

heapRam.reset();
public final Buffer reset() {
//获取当前标记的位置
int m = mark;
if (m < 0)
//如果标记位为负数,就证明没有进行标记过直接报错
throw new InvalidMarkException();
//然后将标记位置赋值给当前的指针位置
position = m;
return this;
}

当前的数据结构如下:

reset方法

rewind方法

如此,我们就可以进行复读了,相类似的方法还有:rewind

heapRam.rewind();
public final Buffer rewind() {
//回复读写指针为0
position = 0;
//废弃标记位置
mark = -1;
return this;
}

rewind方法是直接返回的 缓冲区的头部,同时废弃标记的位置 !

rewind

堆外缓冲区

创建堆外缓冲区

//1. 分配一个最大能够存储128个字节的堆外内存
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(128);

jvm如何操作堆外内存

public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
//获取JDK底层的操作物理内存的工具类
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Unsafe o = (Unsafe) theUnsafe.get(null);
//从物理内存分配一块128的内存
long address = o.allocateMemory(128);
//获取字节数组的一个基本偏移 数组基本偏移
long arrayBaseOffset = (long)o.arrayBaseOffset(byte[].class);
byte[] bytes = "欢迎关注公众号:【源码学徒】 学习更多源码知识!".getBytes();

//向物理内存复制一段数据
// 数据源 数据的基本偏移 目标数据源 要复制到的内存地址 复制数据的长度
o.copyMemory(bytes, arrayBaseOffset, null, address, bytes.length);
//从物理机将数据拷贝回JVM内存中
byte[] copy = new byte[bytes.length];
// 数据源 物理地址 目标数据源 数组基本偏移量 复制数据的长度
o.copyMemory(null, address, copy, arrayBaseOffset, bytes.length);
//释放内存
o.freeMemory(address);
System.out.println(new String(copy));
}

上述的操作是分配一个物理内存将一段数据写进物理内存然后将数据从物理内存读进JVM数组释放物理内存

有了基本的知识,我们一起分下下堆外内存的源码把!

堆外缓冲区Api源码解析

构造方法

public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}

我们可以看到,堆外缓冲区是由DirectByteBuffer来代表的!

DirectByteBuffer(int cap) {

super(-1, 0, cap, cap);
//......忽略其他代码..........

long base = 0;
try {
//从物理内存分配一块指定大小的内存 并返回当前分配内存的地址
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
//初始化内存
unsafe.setMemory(base, size, (byte) 0);
//判断是否对其的页面
if (pa && (base % ps != 0)) {
// 向上对其页面 并保存地址
address = base + ps - (base & (ps - 1));
} else {
//保存地址
address = base;
}
//这个极其重要,是JVM管理堆外内存的重要方法
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}

我们来逐行进行分析,首先是  super(-1, 0, cap, cap);

//      -1   0   128    128
MappedByteBuffer(int mark, int pos, int lim, int cap) {
//继续调用父类
super(mark, pos, lim, cap);
this.fd = null;
}
// -1   0   128   128
ByteBuffer(int mark, int pos, int lim, int cap) {
//在往上
this(mark, pos, lim, cap, null, 0);
}
//   -1   0   128   128   null    0
ByteBuffer(int mark, int pos, int lim, int cap, byte[] hb, int offset) {
super(mark, pos, lim, cap);
this.hb = hb;
this.offset = offset;
}

到这里就不往上分析了,它和创建堆内缓冲是一样的,保存一些基本的变量,但是注意  这里传递的hb是一个null,因为它是堆外缓冲区,不依赖与JVM内部的内存分配!

image-20210321150824205

此时基本数据保存完毕,开始分配一块堆外内存:

base = unsafe.allocateMemory(size);

这里是调用的 native方法分配的缓冲区,是C来实现的,unsafe是JDK内部使用的一个操作物理内存的工具类,一般不对外开放,如果想要使用可以通过反射的方式获取,获取方式上面已经写出来了,同学们没事可以玩一下!

unsafe.setMemory(base, size, (byte) 0);

初始化内存区域,将所分配的内存里面的数据默认设置为字节0

address = base;

保存物理内存的地址,方面后面进行数据的读写

cleaner = Cleaner.create(this, new Deallocator(base, size, cap));

这个方法极其重要,主要负责改堆外内存的释放,他是一个虚引用,具体的讲解上一篇文章  深入分析NIO的零拷贝述的很详细,看一下JVM是如何释放一个不由JVM控制的堆外内存的!这里就不做具体的讲解了!

image-20210321160738660

put方法

现在我们向堆外内存写入一段数据:

byteBuffer.put("ABCDE".getBytes());

我们看下源码是如何来操作堆外内存的

public final ByteBuffer put(byte[] src) {
return put(src, 0, src.length);
}
public ByteBuffer put(byte[] src, int offset, int length) {

if (((long)length << 0) > Bits.JNI_COPY_FROM_ARRAY_THRESHOLD) {
//检查越界
checkBounds(offset, length, src.length);
//获取当前的读写指针
int pos = position();
//获取当前的学些界限
int lim = limit();
//判断是否超过读写界限
assert (pos <= lim);
//计算剩余空间
int rem = (pos <= lim ? lim - pos : 0);
//判断写入数据是否大于剩余空间
if (length > rem) {
throw new BufferOverflowException();
}
//向物理内存拷贝数据
Bits.copyFromArray(src, arrayBaseOffset, (long)offset << 0, ix(pos), (long)length << 0);
//重新计算当前的读写指针
position(pos + length);
} else {
super.put(src, offset, length);
}
return this;



}

我们发现,里面最关键的一段代码是  Bits.copyFromArray(src, arrayBaseOffset, (long)offset << 0, ix(pos), (long)length << 0);

它传递的是:要写入的数据的字节数组、字节基准偏移、偏移量(0)、地址+读写指针的位置(addres + pos)、要写入的数据的长度

static void copyFromArray(Object src, long srcBaseOffset, long srcPos, long dstAddr, long length) {
//计算 偏移量 = 字节数组基准偏移量 + offset(0)
long offset = srcBaseOffset + srcPos;
//如果存在数据
while (length > 0) {
//判断数据是否大于1MB 如果大于1MB就默认只传递1MB,剩余数据交给下一次循环
long size = (length > UNSAFE_COPY_THRESHOLD) ? UNSAFE_COPY_THRESHOLD : length;
//拷贝数据
unsafe.copyMemory(src, offset, null, dstAddr, size);
//判断本次拷贝后剩余未拷贝的数据
length -= size;
//计算偏移量 本次应该偏移的数量
offset += size;
//计算地址 下次读取的起始位置
dstAddr += size;
}
}

我们会发现,里面的代码有一部分我们极其熟悉,正是上面我演示unsafe如何使用的代码,这里就是将数据拷贝至堆外内存的!现在它的内存结构如下:

image-20210321161014909

filp方法

byteBuffer.flip();
image-20210321193732857

现在我们要获取数据了就也必须调用filp方法切换读写模式,直接缓冲区的切换方式和堆内内存的切换方式 一致,不做讲述,忘记的小伙伴请翻到上面看下!

get方法

byte[] bytes = new byte[5];
byteBuffer.get(bytes);
public ByteBuffer get(byte[] dst) {
return get(dst, 0, dst.length);
}
// bytes     0     5
public ByteBuffer get(byte[] dst, int offset, int length) {
if (((long)length << 0) > Bits.JNI_COPY_TO_ARRAY_THRESHOLD) {
checkBounds(offset, length, dst.length);
//获取当前的读指针 0
int pos = position();
//获取当前的limit
int lim = limit();
//判断是否超过界限
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
if (length > rem) {
throw new BufferUnderflowException();
}
//关键方法 将物理内存中的数据复制到JVM内存中来
Bits.copyToArray(ix(pos), dst, arrayBaseOffset, (long)offset << 0, (long)length << 0);
//将读写指针切换至对应位置 5
position(pos + length);
} else {
super.get(dst, offset, length);
}
return this;
}

可以看出,当前的关键代码是 Bits.copyToArray(ix(pos), dst, arrayBaseOffset, (long)offset << 0, (long)length << 0);   我们分析一下

static void copyToArray(long srcAddr, Object dst, long dstBaseOffset, long dstPos, long length) {
//计算当前的偏移量
long offset = dstBaseOffset + dstPos;
while (length > 0) {
//最大拷贝长度是 1MB 高于1MB的下次循环再次拷贝
long size = (length > UNSAFE_COPY_THRESHOLD) ? UNSAFE_COPY_THRESHOLD : length;
// 将数据拷贝回指定的数组中
// 源数据 数据所在的内存地址 目标位置 偏移量 拷贝的长度
unsafe.copyMemory(null, srcAddr, dst, offset, size);
//计算剩余的数据长度
length -= size;
//计算下次拷贝的地址的偏移量
srcAddr += size;
//计算下次复制的偏移量
offset += size;
}
}

当前数据的数据结构为:

image-20210321200004469

堆外缓冲区比较重要的几个点:

  1. 缓冲区的创建(构造函数)
  2. 数据的存储(put方法)
  3. 数据的获取(get方法)
  4. 堆外内存的释放

都已经介绍完毕,其他类似的API譬如 clearmarkresetrewind 再上面的外内内存的介绍中都已经介绍完毕了,逻辑都一样,感兴趣的小伙伴可以自己追一下源码!

对于NIO的学习,这个缓冲区是必不可少的一节课!务必要搞明白呀!

才疏学浅,如果文章中理解有误,欢迎大佬们私聊指正!欢迎关注作者的公众号,一起进步,一起学习!



❤️「转发」「在看」,是对我最大的支持❤️



浏览 34
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报