`
拖拖鞋
  • 浏览: 90479 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

实现一个自己的线程池

阅读更多
        原创!转载请注明出处http://ycde2009.iteye.com/blog/2032605
        一直想写线程池,但由于工作时间紧迫,一直没能腾出时间来写,这几天跳槽,正好把它完结掉。
        因为在java这种面向对象的语言中,创建和销毁对象是很费时间的,因为每创建一个对象不仅要在堆内存中占据掉一个句柄,而且还有可能消耗掉其他的资源,虚拟机也会将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术(或者也叫做缓存资源技术)产生的原因,线程池也就是在这种情况下被建造出来。我的想法是在一个线程池中,里面先放一些已经创建好了的线程,池子里面有一个任务队列,当有任务的时候,就让线程去执行,所有任务执行完了以后,线程就睡眠,这里用到了多线程的生产者消费者模式,而且还有一个线程池的管家,来管理线程池里面的线程。这样一想,一个能基本运行的线程池也就想好了。但是我们发现,这样想的话显得我们的线程池不够灵活,因为,它缺少了伸缩性;比如,他不能根据任务的多少来灵活的增加和减少线程数,某个任务失败后也不能重发,最关键的是任务没有优先级。这样一想的话,我们还得在原来的基础上加点东西。
        主要附加功能,可以维护线程池,在指定的空闲时间后维护,目的是:
1、可以根据任务的多少来相应的增减线程数量
2、处理的任务可以有优先级,优先级高的,最先执行
3、重发的功能
        下面我把主要的线程池和任务池的代码贴出来,废话也不多说了。代码里都有注释。需要完整的文件的话,附件里有,可以下载!
package myThreadPoolV3;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;

import myThreadPoolV3.inter.ITask;
import myThreadPoolV3.util.DateUtil;
import myThreadPoolV3.util.ThreadPoolException;

/**  
 *   
 * MyThreadPool  
 * 线程池 
 * @author yangchuang  
 * @since 2014-3-15 上午09:45:39    
 * @version 1.0.0  
 * 修改记录
 * 修改日期    修改人    修改后版本    修改内容
 */
public class MyThreadPool {
    /**  
     * maxNumThreadSize:最多的线程数量
     * @since 1.0.0  
     */  
    private int maxNumThreadSize;
    
    /**  
     * minNumThreadSize:最少的线程数量
     * @since 1.0.0  
     */  
    private int minNumThreadSize;
    
    /**  
     * keepAliveTime:线程池维护线程所允许的空闲时间 单位(秒)
     * @since 1.0.0  
     */  
    private int keepAliveTime;
    
    /**  
     * taskPool:任务池
     * 按照优先级来进行存储
     * @since 1.0.0  
     */  
    private TaskPool<ITask> taskPool=new TaskPool<ITask>();
    
    /**  
     * workThreadQuery:线程安全的线程队列
     */  
    private BlockingQueue<WorkThread> workThreadQuery=new LinkedBlockingQueue<WorkThread>();
    
    /**  
     * EXECUTE_TASK_LOCK:执行任务队列的时候,wait()与notiyfAll()的监视对象
     * @since 1.0.0  
     */  
    private static final String EXECUTE_TASK_LOCK=new String("executeTaskLock");
    
    /**  
     * threadFreeSet:空闲线程的ID存放在此
     * @since 1.0.0  
     */  
    private ConcurrentMap<String,String> threadFreeMap=new ConcurrentHashMap<String,String>();
    
    /**  
     * maintainTimer:定时维护线程池的
     * @since 1.0.0  
     */  
    private Timer maintainTimer;
    
      
    /**  
     * 创建一个新的实例 MyThreadPool.  
     *      最多的线程数量=7
     *      最少的线程数量=3
     *      线程池维护线程所允许的空闲时间 单位(秒)=30(秒)
     */
    public MyThreadPool(){
        this.maxNumThreadSize=7;
        this.minNumThreadSize=3;
        this.keepAliveTime=30*DateUtil.DATE_SECOND;
        loadWorkThreadQuery();
        maintainTimer=new Timer();
    }
    
      
    /**  
     * 创建一个新的实例 MyThreadPool.  
     *  
     * @param maxNumThreadSize        最多的线程数量
     * @param minNumThreadSize        最少的线程数量
     * @param keepAliveTime         线程池维护线程所允许的空闲时间 单位(秒)
     * @throws ThreadPoolException  
     */
    public MyThreadPool(int maxNumThreadSize,int minNumThreadSize,int keepAliveTime) throws ThreadPoolException{
        if(minNumThreadSize<1){
            throw new ThreadPoolException("线程池中不能没有线程");
        }
        if(maxNumThreadSize<minNumThreadSize){
            throw new ThreadPoolException("最多的线程数不能少于最少的线程数");
        }
        if(keepAliveTime<1){
            throw new ThreadPoolException("线程池维护线程所允许的空闲时间不能少于1秒");
        }
        this.maxNumThreadSize=maxNumThreadSize;
        this.minNumThreadSize=minNumThreadSize;
        this.keepAliveTime=keepAliveTime*DateUtil.DATE_SECOND;
        loadWorkThreadQuery();
        maintainTimer=new Timer();
    }
    
    private void loadWorkThreadQuery(){
        for(int i=0;i<minNumThreadSize;i++){
            new WorkThread();
        }
    }
    
    private class WorkThread extends Thread{
        private String ID;
        private boolean isWorking;
        private boolean isDied;
        public WorkThread(){
            ID=""+hashCode();
            workThreadQuery.add(this);
            addThreadFreeNum(this.getID());
            System.out.println("新线程被添加++++空闲线程数"+threadFreeMap.size()+"  总的线程数"+workThreadQuery.size()+"  未处理的任务数:"+taskPool.taskSize());
        }
        @Override
        public void run() {
            while(!isDied){
                while(taskPool.taskSize()<1&&!isDied){
                    System.out.println(Thread.currentThread().getName()+"  无任务!睡眠");
                    setStopWorking();
                    try {
                        synchronized(EXECUTE_TASK_LOCK){
                            EXECUTE_TASK_LOCK.wait();
                            System.out.println(Thread.currentThread().getName()+"  被唤醒");
                        }
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                while(taskPool.taskSize()>0&&!isDied){
                    System.out.println(Thread.currentThread().getName()+"  有任务执行;任务总数:"+taskPool.taskSize());
                    setStartWorking();
                    ITask task = taskPool.removeTask();
                    try {
                        if(task!=null){
                            task.stratWork();
                        }
                    }
                    catch (Exception e) {
                        try {
                            taskPool.rejoinTask(task);
                        }
                        catch (Exception e1) {
                            e1.printStackTrace();
                        }
                        e.printStackTrace();
                    }
                }
            }
            System.out.println(Thread.currentThread().getName()+"  死亡");
        }
        public void setStartWorking() {
            this.isWorking = true;
            subtractThreadFreeNum(this.getID());
        }
        public void setStopWorking() {
            this.isWorking = false;
            addThreadFreeNum(this.getID());
        }
        public void setDied(boolean isDied) {
            this.isDied = isDied;
            workThreadQuery.remove(this);
            subtractThreadFreeNum(this.getID());
            synchronized(EXECUTE_TASK_LOCK){
                EXECUTE_TASK_LOCK.notifyAll();
            }
        }
        public String getID() {
            return ID;
        }
        public void setID(String iD) {
            ID = iD;
        }
        
    }
    
    public void execute(){
        for(WorkThread workThread:workThreadQuery){
            workThread.start();
        }
        maintainTimer.schedule(new TimerTask() {
            @Override
            public void run() {
                maintainPool();
            }
        }, 3000, keepAliveTime);
    }
    
    private void maintainPool(){
        System.out.println("开始维护线程池++++++++++++++++");
        System.out.println(this);
        // 任务数量大于空闲线程数量,并且总线程数量小于线程池的规定的最大值,就新增线程
        if(taskPool.taskSize()>this.threadFreeMap.size()&&workThreadQuery.size()<maxNumThreadSize){
            // 最多再允许创建的线程数
            int maxCreatThreadNum=maxNumThreadSize-workThreadQuery.size();
            // 需要创建的线程数
            int needCreatThreadNum=taskPool.taskSize()-this.threadFreeMap.size();
            // 实际的创建数目
            int creatThreadNum=needCreatThreadNum>maxCreatThreadNum?maxCreatThreadNum:needCreatThreadNum;
            for(int i=0;i<creatThreadNum;i++){
                WorkThread workThread=new WorkThread();
                workThread.start();
            }
        }
        // 空闲线程多于任务队列的数量,并且总的线程多余最小的线程个数,就要减少线程
        if(taskPool.taskSize()<this.threadFreeMap.size()&&workThreadQuery.size()>minNumThreadSize){
            // 允许销毁的最大线程数
            int maxDestroyThreadNum=workThreadQuery.size()-minNumThreadSize;
            // 需要销毁的线程数
            int needDestroyThreadNum=this.threadFreeMap.size()-taskPool.taskSize();
            // 实际的销毁数目
            int destroyThreadNum=needDestroyThreadNum>maxDestroyThreadNum?maxDestroyThreadNum:needDestroyThreadNum;
            for(int i=0;i<destroyThreadNum;i++){
                for(WorkThread workThread:workThreadQuery){
                    if(!workThread.isWorking){
                        workThread.setDied(true);
                        break;
                    }
                }
            }
        }
        System.out.println("维护后的信息");
        System.out.println(this);
        System.out.println("结束维护线程池++++++++++++++++");
    }
    
    public void addTask(ITask task) throws Exception{
        taskPool.addTask(task);
        synchronized(EXECUTE_TASK_LOCK){
            EXECUTE_TASK_LOCK.notifyAll();
        }
    }
    
    public int getMaxNumThreadSize() {
        return maxNumThreadSize;
    }
    public void setMaxNumThreadSize(int maxNumThreadSize) {
        this.maxNumThreadSize = maxNumThreadSize;
    }
    public int getMinNumThreadSize() {
        return minNumThreadSize;
    }
    public void setMinNumThreadSize(int minNumThreadSize) {
        this.minNumThreadSize = minNumThreadSize;
    }
    public int getKeepAliveTime() {
        return keepAliveTime;
    }
    public void setKeepAliveTime(int keepAliveTime) {
        this.keepAliveTime = keepAliveTime;
    }

    public synchronized void addThreadFreeNum(String id) {
        this.threadFreeMap.put(id,id);
    }
    public synchronized void subtractThreadFreeNum(String id) {
        this.threadFreeMap.remove(id,id);
    }
    @Override
    public String toString(){
        String showInfo="%%%%%%%%空闲的线程数"+threadFreeMap.size()+"  总的线程数"+workThreadQuery.size()+"  未处理的任务数:"+taskPool.taskSize();
        showInfo+="\n%%%%%%%%threadFreeSet:"+threadFreeMap;
        showInfo+="\n%%%%%%%%workThreadQuery:"+workThreadQuery;
        return showInfo;
    }
}

package myThreadPoolV3;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;

import myThreadPoolV3.inter.ITask;
import myThreadPoolV3.util.ThreadPoolException;

/**  
 *   
 * TaskPool  
 * 任务池
 * @author yangchuang  
 * @since 2014-3-15 上午09:53:55    
 * @version 1.0.0  
 * 修改记录
 * 修改日期    修改人    修改后版本    修改内容
 */
public class TaskPool<T extends ITask> {
    /**  
     * taskMap:线程安全的任务Map集合
     * 按照优先级来进行存储
     * @since 1.0.0  
     */  
    private ConcurrentMap<Integer,BlockingQueue<T>> taskMap;
    
    /**  
     * rejoinTaskMap:重发记录表,任务执行失败后,再次加入任务队列的时候,现在这里记录
     * @since 1.0.0  
     */  
    private ConcurrentMap<T,Integer> rejoinTaskMap;
      
    /**  
     * 创建一个新的实例 TaskPool.  
     */
    public TaskPool(){
        this.taskMap=new ConcurrentHashMap<Integer,BlockingQueue<T>>();
        this.taskMap.put(ITask.MAX_PRIORITY, new LinkedBlockingQueue<T>());
        this.taskMap.put(ITask.NORM_PRIORITY, new LinkedBlockingQueue<T>());
        this.taskMap.put(ITask.MIN_PRIORITY, new LinkedBlockingQueue<T>());
        this.rejoinTaskMap=new ConcurrentHashMap<T,Integer>();
    }
    
    /**  
     * taskSize 获取当前任务池中的任务数量
     * @return  当前任务池中的任务数量
     * @since  1.0.0  
    */
    public int taskSize(){
        return this.taskMap.get(T.MAX_PRIORITY).size()
                +this.taskMap.get(T.NORM_PRIORITY).size()
                +this.taskMap.get(T.MIN_PRIORITY).size();
    }
    
    /**  
     * addTask      添加任务
     * 任务必须设置优先级,并且优先级只能在ITask接口内的三个优先级的值之一
     *      MAX_PRIORITY:最高优先级
     *      NORM_PRIORITY:默认优先级
     *      MIN_PRIORITY:最低优先级
     * @param task  
     * @throws Exception 当该task任务的优先级不在规定的范围,抛出此异常
     * @since  1.0.0  
    */
    public void addTask(T task) throws Exception{
        if(this.taskMap.containsKey(task.getPriority())){
            this.taskMap.get(task.getPriority()).add(task);
            return;
        }
        throw new ThreadPoolException("优先级的值设置有误");
    }
    
    /**  
     * romoveTask      获取任务从优先级高的开始
     * @param task  
     * @throws Exception 
     * @since  1.0.0  
    */
    public synchronized ITask removeTask(){
        if(this.taskMap.get(T.MAX_PRIORITY).size()>0){
            return this.taskMap.get(T.MAX_PRIORITY).remove();
        }
        if(this.taskMap.get(T.NORM_PRIORITY).size()>0){
            return this.taskMap.get(T.NORM_PRIORITY).remove();
        }
        if(this.taskMap.get(T.MIN_PRIORITY).size()>0){
            return this.taskMap.get(T.MIN_PRIORITY).remove();
        }
        return null;
    }
    
    /**  
     * rejoin   当任务执行失败后,再次将他加入任务池
     * @param task
     * @throws Exception 
     * @since  1.0.0  
    */
    public synchronized void rejoinTask(T task) throws Exception{
        // 如果再次执行的Map集合没有该任务那么就添加进去
        if(!rejoinTaskMap.containsKey(task)){
            rejoinTaskMap.put(task, 1);
        }
        // 如果该任务的再次执行的次数大于他设置的值,那么就将其移除出再次执行的Map集合
        if(rejoinTaskMap.get(task)>task.getMaxAgainExecuteNum()){
            rejoinTaskMap.remove(task);
            return;
        }
        // 否则的话,将其加一,再放入任务队列
        else{
            rejoinTaskMap.put(task, rejoinTaskMap.get(task)+1);
        }
        this.taskMap.get(task.getPriority()).add(task);
    }
    
}


需要完整的文件的话,附件里有,可以下载!
欢迎大家多多交流!谢谢!
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics