Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 771|回复: 0

[默认分类] JAVA多线程与队列

[复制链接]
  • TA的每日心情
    开心
    2021-12-13 21:45
  • 签到天数: 15 天

    [LV.4]偶尔看看III

    发表于 2018-7-7 10:21:23 | 显示全部楼层 |阅读模式

              java 已经给我们提供了比较好的队列实现Queue,继承于Collection。 本次我使用的是BlockingQueue,继承于Queue。   
              在Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
              首先利用BlockingQueue封装了一个队列类。队列里存放Map对象,这个依项目需求而定,供参考。
             
    1. import java.util.AbstractMap;
    2. import java.util.HashSet;
    3. import java.util.Iterator;
    4. import java.util.Map;
    5. import java.util.Set;
    6. import java.util.concurrent.BlockingQueue;
    7. import java.util.concurrent.LinkedBlockingQueue;
    8. /**
    9. * 单例的缓存map
    10. */
    11. public class CachePool<Key, Value> extends AbstractMap<Key, Value>{
    12.        
    13.                 // 私有化缓存对象实例
    14.                 private static CachePool cachePool = new CachePool();
    15.                 private int maxCount = 1000;
    16.                 private BlockingQueue<Entry> queue = new LinkedBlockingQueue<Entry>();
    17.                 /**
    18.                  * private Constructor.
    19.                  * @return
    20.                  */
    21.                 private CachePool() {
    22.                 }
    23.                 /**
    24.                  * 开放一个公有方法,判断是否已经存在实例,有返回,没有新建一个在返回
    25.                  * @return
    26.                  */
    27.             public static CachePool getInstance(){  
    28.                 return cachePool;  
    29.             }  
    30.                
    31.                 /**
    32.                  * The Entry for this Map.
    33.                  * @author  AnCan
    34.                  *
    35.                  */
    36.                 private class Entry implements Map.Entry<Key, Value>{
    37.                         private Key key;
    38.                         private Value value;
    39.                        
    40.                         public Entry(Key key, Value value){
    41.                                 this.key = key;
    42.                                 this.value = value;
    43.                         }
    44.                         @Override
    45.                         public String toString() {
    46.                                 return key + "=" + value;
    47.                         }
    48.                        
    49.                         public Key getKey() {
    50.                                 return key;
    51.                         }
    52.                         public Value getValue() {
    53.                                 return value;
    54.                         }
    55.                         public Value setValue(Value value) {
    56.                                 return this.value = value;
    57.                         }
    58.                 }
    59.                
    60.                
    61.                
    62.                 /**
    63.                  * Constructor.
    64.                  * @param size the size of the pooled map;
    65.                  */
    66.                 public CachePool(int size) {
    67.                         maxCount = size;
    68.                 }
    69.                 @Override
    70.                 public  Value put(Key key, Value value) {
    71.                         while(queue.size() >= maxCount){
    72.                                 queue.remove();
    73.                         }
    74.                         queue.add(new Entry(key, value));
    75.                         return value;
    76.                 }
    77.                
    78.                 @Override
    79.                 public Value get(Object key){
    80.                         for(Iterator<Entry> iter = queue.iterator();iter.hasNext();){
    81.                                 Entry type = iter.next();
    82.                                 if(type.key.equals(key)){
    83.                                         queue.remove(type);
    84.                                         queue.add(type);
    85.                                         return type.value;
    86.                                 }
    87.                         }
    88.                         return null;
    89.                 }
    90.                
    91.                 @Override
    92.                 public Set<Map.Entry<Key, Value>> entrySet() {
    93.                         Set<Map.Entry<Key, Value>> set = new HashSet<Map.Entry<Key, Value>>();
    94.                         set.addAll(queue);
    95.                         return set;
    96.                 }
    97.                 @Override
    98.                 public void clear() {
    99.                         queue.clear();
    100.                 }
    101.                
    102.                 @Override
    103.                 public Set<Key> keySet() {
    104.                         Set<Key> set = new HashSet<Key>();
    105.                         for(Entry e : queue){
    106.                                 set.add(e.getKey());
    107.                         }
    108.                         return set;
    109.                 }
    110.                 @Override
    111.                 public Value remove(Object obj) {
    112.                         for(Entry e : queue){
    113.                                 if(e.getKey().equals(obj)){
    114.                                         queue.remove(e);
    115.                                         return e.getValue();
    116.                                 }
    117.                         }
    118.                         return null;
    119.                 }
    120.                
    121.                 @Override
    122.                 public int size() {
    123.             return queue.size();
    124.         }
    125.         }
    复制代码
               其中根据项目的需求重写了一些方法。

                 先看下消费者类,使用多线程来处理队列中的内容:
          
    1. import java.util.Date;
    2. import java.util.Iterator;
    3. import java.util.Map;
    4. import java.util.Timer;
    5. import java.util.TimerTask;
    6. import java.util.concurrent.Executor;
    7. import java.util.concurrent.Executors;
    8. import javax.servlet.ServletException;
    9. import javax.servlet.http.HttpServlet;
    10. import org.springframework.context.ApplicationContext;
    11. import org.springframework.context.support.ClassPathXmlApplicationContext;
    12. /**
    13. * 操作业务类,通过参数中的方法参数进行具体的操作
    14. */
    15. public class TicketTradeOper extends  HttpServlet
    16. {
    17.     /**
    18.      * 缓存对象 map
    19.      */
    20.     public static CachePool<String, Object> mapPool = CachePool.getInstance();
    21.    
    22.     private static final int NTHREADS=5;  
    23.     // 使用线程池来避免 为每个请求创建一个线程。  
    24.     private static final Executor threadPool=Executors.newFixedThreadPool(NTHREADS);  
    25.    
    26.     //业务操作
    27.     IETicketTradeOper ticketTradeOper;
    28.     @Override
    29.     public void init() throws ServletException
    30.     {
    31.         Timer timer = new Timer();  
    32.         timer.schedule(new TimerTask(){     
    33.             @Override         
    34.                 public void run() {        
    35.                     startThread();
    36.                 }     
    37.             }, new Date(), 5000);//间隔5秒执行一次定时器任务
    38.         super.init();
    39.     }
    40.    
    41.         
    42.     public void startThread(){
    43.         threadPool.execute(new Runnable(){  
    44.             public void run() {  
    45.                 executeCodeOper();  
    46.              }  
    47.         });
    48.     }
    49.    
    50.     public  void executeCodeOper()
    51.     {
    52.         String key = "";
    53.         Map param = null;
    54.         synchronized (mapPool)
    55.         {
    56.             System.out.println(Thread.currentThread().getName() + "进来了。。。。");
    57.             System.out.println("现在队列中共有----"+mapPool.size()+"---条数据");
    58.             Iterator it = mapPool.keySet().iterator();
    59.             //缓存不为空时,取出一个值
    60.             while (it.hasNext())
    61.             {
    62.                 key = (String) it.next();
    63.                 param = (Map) mapPool.get(key);
    64.             }
    65.             if (null != param)
    66.             {
    67.                   //为防止重复,将其移除
    68.                   mapPool.remove(key);
    69.             }
    70.         }
    71.         
    72.         if (null != param)
    73.         {
    74.             boolean result =ticketTradeOperator(param);
    75.             System.out.println("此条数据处理========"+result);
    76.             if(!result){
    77.                 //若处理失败,重新放回队列
    78.                 mapPool.put(key, param);
    79.             };
    80.         }
    81.     }
    82.    
    83.     public boolean ticketTradeOperator(Map<String, String> params)
    84.     {
    85.         //具体的处理工作
    86.         return resultCode;
    87.     }
    88.     public IETicketTradeOper getTicketTradeOper()
    89.     {
    90.         return ticketTradeOper;
    91.     }
    92.     public void setTicketTradeOper(IETicketTradeOper ticketTradeOper)
    93.     {
    94.         this.ticketTradeOper = ticketTradeOper;
    95.     }
    96. }
    复制代码
               生产者,根据业务需求将接收到的数据放到队列里:


    1.      TicketTradeOper.mapPool.put(newParams.get("order_id"), newParams);
    复制代码



    以上便是整个队列生产消费的过程,有问题的欢迎交流。关于队列类Queue的介绍。下篇博客进行。。








             
       


    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-3-29 04:30 , Processed in 0.357333 second(s), 38 queries .

    Powered by Discuz! X3.4

    © 2001-2017 Comsenz Inc.

    快速回复 返回顶部 返回列表