Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 927|回复: 0

Java双缓冲队列

[复制链接]

该用户从未签到

发表于 2011-9-13 20:14:16 | 显示全部楼层 |阅读模式
统队列是生产者线程和消费者线程从同一个队列中存取数据,必然需要互斥访问,在互相同步等待中浪费了宝贵的时间,使队列吞吐量受影响。双缓冲队使用两个队列,将读写分离,一个队列专门用来读,另一个专门用来写,当读队列空或写队列满时将两个队列互换。这里为了保证队列的读写顺序,当读队列为空且写队列不为空时候才允许两个队列互换。
经过测试性能较JDK自带的queue的确有不小提高。
测试是和JDK6中性能最高的阻塞Queue:java.util.concurrent.ArrayBlockingQueue做比较,这个队列是环形队列的实现方式,性能还算不错,不过我们的目标是没有最好,只有更好。

测试场景:
   起若干个生产者线程,往Queue中放数据,起若干个消费者线程从queue中取数据,统计每个消费者线程取N个数据的平均时间。
数据如下:
场景1
生产者线程数:1
消费者线程数:1
Queue容量:5w
取元素个数:1000w
JDK ArrayBlockingQueue用时平均为:  5,302,938,177纳秒
双缓冲队列用时平均为:                      5,146,302,116纳秒
相差大概160毫秒

场景2:
生产者线程数:5
消费者线程数:4
Queue容量:5w
取元素个数:1000w
JDK ArrayBlockingQueue用时平均为:  32,824,744,868纳秒
双缓冲队列用时平均为:                      20,508,495,221纳秒
相差大概12.3秒

可见在生产者消费者都只有一个的时候存和取的同步冲突比较小,双缓冲队列优势不是很大,当存取线程比较多的时候优势就很明显了。


队列主要方法如下:
[pre]/** *  * CircularDoubleBufferedQueue.java * 囧囧有神 * @param <E>2010-6-12 */public class CircularDoubleBufferedQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable{    private static final long serialVersionUID = 1L;    private Logger logger =   Logger.getLogger(CircularDoubleBufferedQueue.class.getName());    /** The queued items  */    private final E[] itemsA;    private final E[] itemsB;        private ReentrantLock readLock, writeLock;    private Condition notEmpty;    private Condition notFull;    private Condition awake;            private E[] writeArray, readArray;    private volatile int writeCount, readCount;    private int writeArrayHP, writeArrayTP, readArrayHP, readArrayTP;            public CircularDoubleBufferedQueue(int capacity)    {        if(capacity<=0)        {            throw new IllegalArgumentException("Queue initial capacity can't less than 0!");        }                itemsA = (E[])new Object[capacity];        itemsB = (E[])new Object[capacity];        readLock = new ReentrantLock();        writeLock = new ReentrantLock();                notEmpty = readLock.newCondition();        notFull = writeLock.newCondition();        awake = writeLock.newCondition();                readArray = itemsA;        writeArray = itemsB;    }        private void insert(E e)    {        writeArray[writeArrayTP] = e;        ++writeArrayTP;        ++writeCount;    }        private E extract()    {        E e = readArray[readArrayHP];        readArray[readArrayHP] = null;        ++readArrayHP;        --readCount;        return e;    }        /**     *switch condition:      *read queue is empty && write queue is not empty     *      *Notice:This function can only be invoked after readLock is          * grabbed,or may cause dead lock     * @param timeout     * @param isInfinite: whether need to wait forever until some other     * thread awake it     * @return     * @throws InterruptedException     */    private long queueSwitch(long timeout, boolean isInfinite) throws InterruptedException    {    writeLock.lock();    try    {        if (writeCount <= 0)        {    logger.debug("Write Count:" + writeCount + ", Write Queue is empty, do not switch!");            try            {                logger.debug("Queue is empty, need wait....");                if(isInfinite && timeout<=0)                {                    awake.await();                    return -1;                }                else                {                    return awake.awaitNanos(timeout);                }            }            catch (InterruptedException ie)            {                awake.signal();                throw ie;            }        }        else        {            E[] tmpArray = readArray;            readArray = writeArray;            writeArray = tmpArray;            readCount = writeCount;            readArrayHP = 0;            readArrayTP = writeArrayTP;            writeCount = 0;            writeArrayHP = readArrayHP;            writeArrayTP = 0;                            notFull.signal();            logger.debug("Queue switch successfully!");            return -1;        }    }    finally    {        writeLock.unlock();    }}    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException    {        if(e == null)        {            throw new NullPointerException();        }                long nanoTime = unit.toNanos(timeout);        writeLock.lockInterruptibly();        try        {            for (;;)            {                if(writeCount < writeArray.length)                {                    insert(e);                    if (writeCount == 1)                    {                        awake.signal();                    }                    return true;                }                                //Time out                if(nanoTime<=0)                {                    logger.debug("offer wait time out!");                    return false;                }                //keep waiting                try                {                    logger.debug("Queue is full, need wait....");                    nanoTime = notFull.awaitNanos(nanoTime);                }                catch(InterruptedException ie)                {                    notFull.signal();                    throw ie;                }            }        }        finally        {            writeLock.unlock();        }    }    public E poll(long timeout, TimeUnit unit) throws InterruptedException    {        long nanoTime = unit.toNanos(timeout);        readLock.lockInterruptibly();                try        {            for(;;)            {                if(readCount>0)                {                    return extract();                }                                if(nanoTime<=0)                {                    logger.debug("poll time out!");                    return null;                }                nanoTime = queueSwitch(nanoTime, false);            }        }        finally        {            readLock.unlock();        }    }}                  [/pre]Ps:测试时候要把queue类中debug关掉,否则打印debug日志会对queue性能有不小的影响。
[/td][/tr][/table]
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

GMT+8, 2024-4-20 02:15 , Processed in 0.386693 second(s), 48 queries .

Powered by Discuz! X3.4

© 2001-2017 Comsenz Inc.

快速回复 返回顶部 返回列表