java容器中的几种计数方法浅谈
本文讨论java集合容器中的几种元素数量获取的方式,命题很小,但是也足以让我们思考一些东西。
所谓计数:即是给出所在容器的元素总数的方式。一般能想到的就是两种方式:一是使用某个字段直接存储该计数值,二是在请求计数值时临时去计算所有元素数量。貌似本文的答案已经出来了。好吧,那我们还是从源码的角度来验证下想法吧:
一般在java的集合容器中,可以分为普通容器和并发容器,我们就姑且以这种方式来划分下,验证下其实现计数的方式吧!
1:普通容器 --HashMap
一般非并发容器在进行增删改时,都会同时维护一个count值,如hashmap中的实现:
// HashMap 增加和修改都在此实现/*** Implements Map.put and related methods** @param hash hash for key* @param key the key* @param value the value to put* @param onlyIfAbsent if true, don't change existing value* @param evict if false, the table is in creation mode.* @return previous value, or null if none*/final V putVal(int hash, K key, V value, boolean onlyIfAbsent,boolean evict) {Node<K,V>[] tab; Node<K,V> p; int n, i;if ((tab = table) == null || (n = tab.length) == 0)n = (tab = resize()).length;if ((p = tab[i = (n - 1) & hash]) == null)tab[i] = newNode(hash, key, value, null);else {Node<K,V> e; K k;if (p.hash == hash &&((k = p.key) == key || (key != null && key.equals(k))))e = p;else if (p instanceof TreeNode)e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);else {for (int binCount = 0; ; ++binCount) {if ((e = p.next) == null) {p.next = newNode(hash, key, value, null);if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1sttreeifyBin(tab, hash);break;}if (e.hash == hash &&((k = e.key) == key || (key != null && key.equals(k))))break;p = e;}}if (e != null) { // existing mapping for keyV oldValue = e.value;if (!onlyIfAbsent || oldValue == null)e.value = value;afterNodeAccess(e);return oldValue;}}++modCount;// 直接对size进行增加1即可, 如果是更新key的值,则不会运行到此处,即不会进行相加if (++size > threshold)resize();afterNodeInsertion(evict);return null;}// 删除元素的实现,同时维护 size 大小/*** Implements Map.remove and related methods** @param hash hash for key* @param key the key* @param value the value to match if matchValue, else ignored* @param matchValue if true only remove if value is equal* @param movable if false do not move other nodes while removing* @return the node, or null if none*/final Node<K,V> removeNode(int hash, Object key, Object value,boolean matchValue, boolean movable) {Node<K,V>[] tab; Node<K,V> p; int n, index;if ((tab = table) != null && (n = tab.length) > 0 &&(p = tab[index = (n - 1) & hash]) != null) {Node<K,V> node = null, e; K k; V v;// 先查找node所在的位置if (p.hash == hash &&((k = p.key) == key || (key != null && key.equals(k))))node = p;else if ((e = p.next) != null) {if (p instanceof TreeNode)node = ((TreeNode<K,V>)p).getTreeNode(hash, key);else {do {if (e.hash == hash &&((k = e.key) == key ||(key != null && key.equals(k)))) {node = e;break;}p = e;} while ((e = e.next) != null);}}if (node != null && (!matchValue || (v = node.value) == value ||(value != null && value.equals(v)))) {if (node instanceof TreeNode)((TreeNode<K,V>)node).removeTreeNode(this, tab, movable);else if (node == p)tab[index] = node.next;elsep.next = node.next;++modCount;// 直接减小size即可--size;afterNodeRemoval(node);return node;}}return null;}
因为有了增删改时对计数器的维护,所以在想要获取总数时,就容易许多了。只需把size字段返回即可。
// HashMap.size()/*** Returns the number of key-value mappings in this map.** @return the number of key-value mappings in this map*/public int size() {return size;}
所以,在这种情况下,获取计数值的方式非常简单。但是不管怎么样,size字段对外部是不可见的,因为它是容器内部的一个实现逻辑,它完全在将来的某个时刻改变掉。即 size() != size
2. 普通容器 --LinkedList
看完hash类的计数实现,咱们再来看另外一个类型的容器LinkedList:
// LinkedList.add(E) 添加一个元素public boolean add(E e) {linkLast(e);return true;}/*** Links e as last element.*/void linkLast(E e) {final Node<E> l = last;final Node<E> newNode = new Node<>(l, e, null);last = newNode;if (l == null)first = newNode;elsel.next = newNode;// 同样,直接使用一个 size 计数器统计即可size++;modCount++;}// 删除一个元素, 同时维护 size 字段public E removeFirst() {final Node<E> f = first;if (f == null)throw new NoSuchElementException();return unlinkFirst(f);}/*** Unlinks non-null first node f.*/private E unlinkFirst(Node<E> f) {// assert f == first && f != null;final E element = f.item;final Node<E> next = f.next;f.item = null;f.next = null; // help GCfirst = next;if (next == null)last = null;elsenext.prev = null;// 元素计数减1size--;modCount++;return element;}// 同样,统计元素数量时,直接返回size即可public int size() {return size;}
可见,LinkedList 也同样是简单地维护一个计数器字段,从而实现了高效地计数方法。而这简单地实现,则是基于单线程的访问的,它同时维护一个计数字段,基本没有多少开销,却给取值时带来了便利。
总结: 普通容器直接维护一个计数器字段,可以很方便地进行大小统计操作。
3. 并发容器 --ConcurrentHashMap
而对于并发容器,则可能会不一样些,但也有一些情况是一样的。比较,HashTable, 直接使用 synchronized 来保证线程安全,则它也同样可以直接使用一个size即可完成元素大小的统计。事实上,有些版本的HashTable仅仅是在HashMap的上面加上了synchronizd锁而已(有些版本则是 不一样的哦),细节咱们无需再看。
而稍微有点不一样的如: ConcurrentHashMap.size(), 早期的 ConcurrentHashMap 使用分段锁,则需要统计各segement的元素,相加起来然后得到整体元素大小. 而jdk1.8中,已经放弃使用分段锁来实现高性能安全的hash容器了,而是直接使用 synchronized + CAS + 红黑树 实现. 那么,我们来看看其实现元素统计这一功能的实现有何不同吧!
// ConcurrentHashMap.putVal() 新增或修改一个元素/** Implementation for put and putIfAbsent */final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}// 主要是在进行新增成功时,再进行计数器的操作, 看起来不是 ++size 这么简单了addCount(1L, binCount);return null;}// 这个计数的相加看起来相当复杂/*** Adds to count, and if table is too small and not already* resizing, initiates transfer. If already resizing, helps* perform transfer if work is available. Rechecks occupancy* after a transfer to see if another resize is already needed* because resizings are lagging additions.** @param x the count to add* @param check if <0, don't check resize, if <= 1 only check if uncontended*/private final void addCount(long x, int check) {CounterCell[] as; long b, s;// 使用 CounterCell 来实现计数操作// 使用 CAS 保证更新计数时只会有一个线程成功if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||// 使用一个类似随机负载均衡的方式,将计数值随机添加到 CounterCell 的某个值下面,减少多线程竞争的可能性(a = as[ThreadLocalRandom.getProbe() & m]) == null ||// 通过cas将计数值x添加到 CounterCell 的 value 字段中!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {// 如果上面添加失败,则使用 fullAddCount 进行重新添加该计数fullAddCount(x, uncontended);return;}if (check <= 1)return;// 基于 CounterCell 做一此汇总操作s = sumCount();}// 在进行put值时, check的值都是大于等于0的if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;// rehash 处理while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))// 辅助进行hash扩容transfer(tab, null);s = sumCount();}}}// fullAddCount 比较复杂, 它的目的是为了保证多线程可以快速进行添加完成, 目标很简单, 即向数组 CounterCell 中添加一个值 x// See LongAdder version for explanationprivate final void fullAddCount(long x, boolean wasUncontended) {int h;if ((h = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit(); // force initializationh = ThreadLocalRandom.getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonemptyfor (;;) {CounterCell[] as; CounterCell a; int n; long v;if ((as = counterCells) != null && (n = as.length) > 0) {if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) { // Try to attach new CellCounterCell r = new CounterCell(x); // Optimistic createif (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean created = false;try { // Recheck under lockCounterCell[] rs; int m, j;if ((rs = counterCells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}collide = false;}else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehashelse if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))break;else if (counterCells != as || n >= NCPU)collide = false; // At max size or staleelse if (!collide)collide = true;else if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {try {if (counterCells == as) {// Expand table unless staleCounterCell[] rs = new CounterCell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];counterCells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}h = ThreadLocalRandom.advanceProbe(h);}else if (cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean init = false;try { // Initialize tableif (counterCells == as) {CounterCell[] rs = new CounterCell[2];rs[h & 1] = new CounterCell(x);counterCells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))break; // Fall back on using base}}// ConcurrentHashMap.remove 删除元素/*** Implementation for the four public remove/replace methods:* Replaces node value with v, conditional upon match of cv if* non-null. If resulting value is null, delete.*/final V replaceNode(Object key, V value, Object cv) {int hash = spread(key.hashCode());for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0 ||(f = tabAt(tab, i = (n - 1) & hash)) == null)break;else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;boolean validated = false;synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {validated = true;for (Node<K,V> e = f, pred = null;;) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {V ev = e.val;if (cv == null || cv == ev ||(ev != null && cv.equals(ev))) {oldVal = ev;if (value != null)e.val = value;// 删除元素else if (pred != null)pred.next = e.next;elsesetTabAt(tab, i, e.next);}break;}pred = e;if ((e = e.next) == null)break;}}else if (f instanceof TreeBin) {validated = true;TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> r, p;if ((r = t.root) != null &&(p = r.findTreeNode(hash, key, null)) != null) {V pv = p.val;if (cv == null || cv == pv ||(pv != null && cv.equals(pv))) {oldVal = pv;if (value != null)p.val = value;// 删除元素else if (t.removeTreeNode(p))setTabAt(tab, i, untreeify(t.first));}}}}}if (validated) {if (oldVal != null) {// value = null, 代表需要将元素删除,所以需要对计数器做减1操作if (value == null)addCount(-1L, -1);return oldVal;}break;}}}return null;}
同样是由于在增删时,维护一个计数器(CounterCell数组), 所以对于返回计数值操作则会比较简单化:
// ConcurrentHashMap.size()public int size() {long n = sumCount();return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);}// 直接将 CounterCell 中的值相加起来即可final long sumCount() {CounterCell[] as = counterCells; CounterCell a;long sum = baseCount;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}
虽然ConcurrentHash的元素本身没有使用分段式存储了,但是其计数值还是存在了多个 CounterCell 中,目的自然是为了减少多线程竞争对计数器的更新成性能瓶颈。在进行 size() 计数时,并未有上锁操作,整个 CounterCell 使用 volatile 修饰,保证其可见性,但是整个size 却是不保证绝对准确的哦。
4. 并发容器 --ArrayBlockingQueue
下面我们再来看看另一各类型的并发容器: ArrayBlockingQueue
// ArrayBlockingQueue.offer()/*** Inserts the specified element at the tail of this queue if it is* possible to do so immediately without exceeding the queue's capacity,* returning {@code true} upon success and {@code false} if this queue* is full. This method is generally preferable to method {@link #add},* which can fail to insert an element only by throwing an exception.** @throws NullPointerException if the specified element is null*/public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;// 直接上锁操作lock.lock();try {if (count == items.length)return false;else {// 进行入队操作enqueue(e);return true;}} finally {lock.unlock();}}/*** Inserts element at current put position, advances, and signals.* Call only when holding lock.*/private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;// 同样,它还是通过一个 count 的计数器完成统计工作count++;notEmpty.signal();}// 移除动作时,也需要维护 count 的值/*** Deletes item at array index removeIndex.* Utility for remove(Object) and iterator.remove.* Call only when holding lock.*/void removeAt(final int removeIndex) {// assert lock.getHoldCount() == 1;// assert items[removeIndex] != null;// assert removeIndex >= 0 && removeIndex < items.length;final Object[] items = this.items;if (removeIndex == takeIndex) {// removing front item; just advanceitems[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;// 移除成功, 将计数器减1count--;if (itrs != null)itrs.elementDequeued();} else {// an "interior" remove// slide over all others up through putIndex.// 通过轮询的方式, 必然有一个元素被删除final int putIndex = this.putIndex;for (int i = removeIndex;;) {int next = i + 1;if (next == items.length)next = 0;if (next != putIndex) {items[i] = items[next];i = next;} else {items[i] = null;this.putIndex = i;break;}// 计数器相减count--;if (itrs != null)itrs.removedAt(removeIndex);}notFull.signal();}
同样是维护了一个计数器,但是因为有上锁机制的保证,整个过程看起来就简单了许多。在获取元素大小时,自然也就简单了.
// ArrayBlockingQueue.size()/*** Returns the number of elements in this queue.** @return the number of elements in this queue*/public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return count;} finally {lock.unlock();}}
但是它为了保证结果的准确性,在计数时,同样进行了上锁操作。可见,并发容器的实现思路也基本一致.并无太多奇淫技巧. 咱们再来看一下并发容器的实现: CopyOnWriteArrayList
5. 并发容器 --CopyOnWriteArrayList
顾名思义,是在写操作的时候,使用复制方式进行实现。
// CopyOnWriteArrayList.add()/*** Appends the specified element to the end of this list.** @param e element to be appended to this list* @return {@code true} (as specified by {@link Collection#add})*/public boolean add(E e) {final ReentrantLock lock = this.lock;// 同样上锁保证线程安全lock.lock();try {Object[] elements = getArray();int len = elements.length;// 将元素copy出来, 但其并非维护一个len字段Object[] newElements = Arrays.copyOf(elements, len + 1);newElements[len] = e;setArray(newElements);return true;} finally {lock.unlock();}}// CopyOnWriteArrayList, 删除一个字段, 同其名称一样, 还是使用写时复制实现public E remove(int index) {final ReentrantLock lock = this.lock;lock.lock();try {Object[] elements = getArray();int len = elements.length;E oldValue = get(elements, index);int numMoved = len - index - 1;if (numMoved == 0)setArray(Arrays.copyOf(elements, len - 1));else {// 找到移除的字段位置, 依次复制其前后元素到新数组中,完成功能Object[] newElements = new Object[len - 1];System.arraycopy(elements, 0, newElements, 0, index);System.arraycopy(elements, index + 1, newElements, index,numMoved);setArray(newElements);}return oldValue;} finally {lock.unlock();}}// CopyOnWriteArrayList.size(), 直接使用数组长度字段/*** Returns the number of elements in this list.** @return the number of elements in this list*/public int size() {// 获取元素大小时,直接获取所有元素,取数组的长度即可. 借用jvm提供的数组长度元信息实现return getArray().length;}/*** Gets the array. Non-private so as to also be accessible* from CopyOnWriteArraySet class.*/final Object[] getArray() {// 该array字段一定是要保证可见性的, 即至少得是 volatile 修饰的数据return array;}
CopyOnWriteArrayList, 因为其语义决定,其在一定程度上是线程安全的,所以,在读操作时,就不需要上锁,从而性能在某些场景会比较好。
根据功能特性的不同, CopyOnWriteArrayList 采用了一个不同实现方式, 实现了元素的统计功能. 另外像 SynchronousQueue#size, 则永久返回0, 因为它的定义是当被放一个元素后,必须等到有线程消费之后才可返回,而其本身并不存储元素. 所以, 虽然元素计数道理比较简单通用, 但是还是要按照具体的场景进行相应的实现, 才能满足具体的需求. 即不可脱离场景谈技术.
6. 更多计数
类似数据库类的产品,同样的这样的计数刚性需求,各自实现方式也有不同,但大体思路也差不多。比如 redis 的计数使用在计数时临时遍历元素实现,mysql myisam 引擎使用一个表级的计数器等等。

腾讯、阿里、滴滴后台面试题汇总总结 — (含答案)
面试:史上最全多线程面试题 !
最新阿里内推Java后端面试题
JVM难学?那是因为你没认真看完这篇文章

关注作者微信公众号 —《JAVA烂猪皮》
了解更多java后端架构知识以及最新面试宝典


看完本文记得给作者点赞+在看哦~~~大家的支持,是作者源源不断出文的动力
作者:等你归去来
出处:https://www.cnblogs.com/yougewe/p/13238124.html
