利用ace的ACE_Task等类实现线程池的方法详解

本代码应该是ace自带的例子了,但是我觉得是非常好的,于是给大家分享一下。
注释非常详细啊。
头文件


代码如下:

#ifndef THREAD_POOL_H
#define THREAD_POOL_H
/* In order to implement a thread pool, we have to have an object that
   can create a thread.  The ACE_Task<> is the basis for doing just
   such a thing.  */
#include "ace/Task.h"
//add by ychen 20070714 below
#include "ace/Mutex.h"
//add by ychen 20070714 above
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
/* We need a forward reference for ACE_Event_Handler so that our
   enqueue() method can accept a pointer to one.  */

class ACE_Event_Handler;
/* Although we modified the rest of our program to make use of the
   thread pool implementation, if you look closely you'll see that the
   changes were rather minor.  The "ACE way" is generally to create a
   helper object that abstracts away the details not relevant to your
   application.  That's what I'm trying to do here by creating the
   Thread_Pool object.  */
class Thread_Pool : public ACE_Task<ACE_MT_SYNCH>
{
public:
  typedef ACE_Task<ACE_MT_SYNCH> inherited;
  /* Provide an enumeration for the default pool size.  By doing this,
    other objects can use the value when they want a default.  */
  enum size_t
  {
    default_pool_size_ = 1
  };
  // Basic constructor
  Thread_Pool (void);
  /* Opening the thread pool causes one or more threads to be
    activated.  When activated, they all execute the svc() method
    declared below.  */
  int open (int pool_size = default_pool_size_);
  /* Some compilers will complain that our open() above attempts to
    override a virtual function in the baseclass.  We have no
    intention of overriding that method but in order to keep the
    compiler quiet we have to add this method as a pass-thru to the
    baseclass method.  */
  virtual int open (void *void_data)
  {
    return inherited::open (void_data);
  }
  /*
   */
  virtual int close (u_long flags = 0);
  /* To use the thread pool, you have to put some unit of work into
    it.  Since we're dealing with event handlers (or at least their
    derivatives), I've chosen to provide an enqueue() method that
    takes a pointer to an ACE_Event_Handler.  The handler's
    handle_input() method will be called, so your object has to know
    when it is being called by the thread pool.  */
  int enqueue (ACE_Event_Handler *handler);
  /* Another handy ACE template is ACE_Atomic_Op<>.  When
    parameterized, this allows is to have a thread-safe counting
    object.  The typical arithmetic operators are all internally
    thread-safe so that you can share it across threads without
    worrying about any contention issues.  */
    typedef ACE_Atomic_Op<ACE_Mutex, int> counter_t;

protected:
  /* Our svc() method will dequeue the enqueued event handler objects
    and invoke the handle_input() method on each.  Since we're likely
    running in more than one thread, idle threads can take work from
    the queue while other threads are busy executing handle_input() on
    some object.  */
  int svc (void);
  /* We use the atomic op to keep a count of the number of threads in
    which our svc() method is running.  This is particularly important
    when we want to close() it down!  */
  counter_t active_threads_;
};
#endif /* THREAD_POOL_H */

实现文件


代码如下:

// thread_pool.cpp,v 1.9 1999/09/22 03:13:42 jcej Exp
#include "thread_pool.h"
/* We need this header so that we can invoke handle_input() on the
   objects we dequeue.  */
#include "ace/Event_Handler.h"
/* All we do here is initialize our active thread counter.  */
Thread_Pool::Thread_Pool (void)
  : active_threads_ (0)
{
}
/* Our open() method is a thin disguise around the ACE_Task<>
   activate() method.  By hiding activate() in this way, the users of
   Thread_Pool don't have to worry about the thread configuration
   flags.  */
int
Thread_Pool::open (int pool_size)
{
  return this->activate (THR_NEW_LWP|THR_DETACHED, pool_size);
}
/* Closing the thread pool can be a tricky exercise.  I've decided to
   take an easy approach and simply enqueue a secret message for each
   thread we have active.  */
int
Thread_Pool::close (u_long flags)
{
  ACE_UNUSED_ARG(flags);
  /* Find out how many threads are currently active */
  int counter = active_threads_.value ();
  /* For each one of the active threads, enqueue a "null" event
    handler.  Below, we'll teach our svc() method that "null" means
    "shutdown".  */
  while (counter--)
    this->enqueue (0);
  /* As each svc() method exits, it will decrement the active thread
    counter.  We just wait here for it to reach zero.  Since we don't
    know how long it will take, we sleep for a quarter of a second
    between tries.  */
  while (active_threads_.value ())
    ACE_OS::sleep (ACE_Time_Value (0, 250000));
  return(0);
}
/* When an object wants to do work in the pool, it should call the
   enqueue() method.  We introduce the ACE_Message_Block here but,
   unfortunately, we seriously misuse it.  */
int
Thread_Pool::enqueue (ACE_Event_Handler *handler)
{
  /* An ACE_Message_Block is a chunk of data.  You put them into an
    ACE_Message_Queue.  ACE_Task<> has an ACE_Message_Queue built in.
    In fact, the parameter to ACE_Task<> is passed directly to
    ACE_Message_Queue.  If you look back at our header file you'll see
    that we used ACE_MT_SYNCH as the parameter indicating that we want
    MultiThread Synch safety.  This allows us to safely put
    ACE_Message_Block objects into the message queue in one thread and
    take them out in another.  */
  /* An ACE_Message_Block wants to have char* data.  We don't have
    that.  We could cast our ACE_Event_Handler* directly to a char*
    but I wanted to be more explicit.  Since casting pointers around
    is a dangerous thing, I've gone out of my way here to be very
    clear about what we're doing.
    First: Cast the handler pointer to a void pointer.  You can't do
    any useful work on a void pointer, so this is a clear message that
    we're making the pointer unusable.
    Next: Cast the void pointer to a char pointer that the ACE_Message_Block will accept.  */
  void *v_data = (void *) handler;
  char *c_data = (char *) v_data;
  ACE_Message_Block *mb;
  /* Construct a new ACE_Message_Block.  For efficiency, you might
    want to preallocate a stack of these and reuse them.  For
    simplicity, I'll just create what I need as I need it.  */
  ACE_NEW_RETURN (mb,
                  ACE_Message_Block (c_data),
                  -1);
  /* Our putq() method is a wrapper around one of the enqueue methods
    of the ACE_Message_Queue that we own.  Like all good methods, it
    returns -1 if it fails for some reason.  */
  if (this->putq (mb) == -1)
    {
      /* Another trait of the ACE_Message_Block objects is that they
        are reference counted.  Since they're designed to be passed
        around between various objects in several threads we can't
        just delete them whenever we feel like it.  The release()
        method is similar to the destroy() method we've used
        elsewhere.  It watches the reference count and will delete the
        object when possible.  */
      mb->release ();
      return -1;
    }
  return 0;
}
/* The "guard" concept is very powerful and used throughout
   multi-threaded applications.  A guard normally does some operation
   on an object at construction and the "opposite" operation at
   destruction.  For instance, when you guard a mutex (lock) object,
   the guard will acquire the lock on construction and release it on
   destruction.  In this way, your method can simply let the guard go
   out of scope and know that the lock is released.
   Guards aren't only useful for locks however.  In this application
   I've created two guard objects for quite a different purpose.  */
/* The Counter_Guard is constructed with a reference to the thread
   pool's active thread counter.  The guard increments the counter
   when it is created and decrements it at destruction.  By creating
   one of these in svc(), I know that the counter will be decremented
   no matter how or where svc() returns.  */
class Counter_Guard
{
public:
  Counter_Guard (Thread_Pool::counter_t &counter)
    : counter_ (counter)
  {
    ++counter_;
  }
  ~Counter_Guard (void)
  {
    --counter_;
  }
protected:
  Thread_Pool::counter_t &counter_;
};
/* My Message_Block_Guard is also a little non-traditional.  It
   doesn't do anything in the constructor but it's destructor ensures
   that the message block's release() method is called.  This is a
   cheap way to prevent a memory leak if I need an additional exit
   point in svc().  */
class Message_Block_Guard
{
public:
  Message_Block_Guard (ACE_Message_Block *&mb)
    : mb_ (mb)
  {
  }
  ~Message_Block_Guard (void)
  {
    mb_->release ();
  }
protected:
  ACE_Message_Block *&mb_;
};
/* Now we come to the svc() method.  As I said, this is being executed
   in each thread of the Thread_Pool.  Here, we pull messages off of
   our built-in ACE_Message_Queue and cause them to do work.  */
int
Thread_Pool::svc (void)
{
  /* The getq() method takes a reference to a pointer.  So... we need
    a pointer to give it a reference to.  */
  ACE_Message_Block *mb;
  /* Create the guard for our active thread counter object.  No matter
    where we choose to return() from svc(), we now know that the
    counter will be decremented.  */
  Counter_Guard counter_guard (active_threads_);
  /* Get messages from the queue until we have a failure.  There's no
    real good reason for failure so if it happens, we leave
    immediately.  */
  while (this->getq (mb) != -1)
    {
      /* A successful getq() will cause "mb" to point to a valid
        refernce-counted ACE_Message_Block.  We use our guard object
        here so that we're sure to call the release() method of that
        message block and reduce it's reference count.  Once the count
        reaches zero, it will be deleted.  */
      Message_Block_Guard message_block_guard (mb);
      /* As noted before, the ACE_Message_Block stores it's data as a
        char*.  We pull that out here and later turn it into an
        ACE_Event_Handler* */
      char *c_data = mb->base ();
      /* We've chosen to use a "null" value as an indication to leave.
        If the data we got from the queue is not null then we have
        some work to do.  */
      if (c_data)
        {
          /* Once again, we go to great lengths to emphasize the fact
            that we're casting pointers around in rather impolite
            ways.  We could have cast the char* directly to an
            ACE_Event_Handler* but then folks might think that's an OK
            thing to do.
            (Note: The correct way to use an ACE_Message_Block is to
            write data into it.  What I should have done was create a
            message block big enough to hold an event handler pointer
            and then written the pointer value into the block.  When
            we got here, I would have to read that data back into a
            pointer.  While politically correct, it is also a lot of
            work.  If you're careful you can get away with casting
            pointers around.)  */
          void *v_data = (void *) c_data;
          ACE_Event_Handler *handler = (ACE_Event_Handler *) v_data;
          /* Now that we finally have an event handler pointer, invoke
            it's handle_input() method.  Since we don't know it's
            handle, we just give it a default.  That's OK because we
            know that we're not using the handle in the method anyway.  */
          if (handler->handle_input (ACE_INVALID_HANDLE) == -1)
            {
              /* Tell the handler that it's time to go home.  The
                "normal" method for shutting down a handler whose
                handler failed is to invoke handle_close().  This will
                take care of cleaning it up for us.  Notice how we use
                the handler's get_handle() method to populate it's
                "handle" parameter.  Convenient isn't it?  */
              handler->handle_close (handler->get_handle (), 0);
              /* Also notice that we don't exit the svc() method here!
                The first time I did this, I was exiting.  After a few
                clients disconnect you have an empty thread pool.
                Hard to do any more work after that...  */
            }
        }
      else
        /* If we get here, we were given a message block with "null"
           data.  That is our signal to leave, so we return(0) to
           leave gracefully.  */
          return 0;  // Ok, shutdown request
      // message_block_guard goes out of scope here and releases the
      // message_block instance.
    }
  return 0;
}

其中,对其中类中的两个变量使用了管理的思想。Counter_Guard类和Message_Block_Guard 类分别对其进行了管理。
因为ACE_Task类是使用了ACE_message_block 进行对消息的封装。因此使用类,防止了内存的泄漏。
ACE_Event_Handler  是事件句柄,类似于操作符。当我们处理的时候,对其进行处理。

(0)

相关推荐

  • c++线程池实现方法

    本文实例讲述了c++线程池实现方法.分享给大家供大家参考.具体分析如下: 下面这个线程池是我在工作中用到过的,原理还是建立一个任务队列,让多个线程互斥的在队列中取出任务,然后执行,显然,队列是要加锁的 环境:ubuntu linux 文件名:locker.h #ifndef LOCKER_H_ #define LOCKER_H_ #include "pthread.h" class locker { public: locker(); virtual ~locker(); bool l

  • 深入解析C++编程中线程池的使用

    为什么需要线程池 目前的大多数网络服务器,包括Web服务器.Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短. 传 统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务.任务执行完毕后,线程退出,这就是是"即时创建,即 时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于 不停的创建线程,

  • c++版线程池和任务池示例

    commondef.h 复制代码 代码如下: //单位秒,监测空闲列表时间间隔,在空闲队列中超过TASK_DESTROY_INTERVAL时间的任务将被自动销毁const int CHECK_IDLE_TASK_INTERVAL = 300;//单位秒,任务自动销毁时间间隔const int TASK_DESTROY_INTERVAL = 60; //监控线程池是否为空时间间隔,微秒const int IDLE_CHECK_POLL_EMPTY = 500; //线程池线程空闲自动退出时间间隔

  • c++实现简单的线程池

    这是对pthread线程的一个简单应用 1.      实现了线程池的概念,线程可以重复使用. 2.      对信号量,互斥锁等进行封装,业务处理函数中只需写和业务相关的代码. 3.      移植性好.如果想把这个线程池代码应用到自己的实现中去,只要写自己的业务处理函数和改写工作队列数据的处理方法就可以了. Sample代码主要包括一个主程序和两个线程实现类 ThreadTest.cpp:主程序 CThreadManager:线程管理Class,线程池的实现类 CThread:线程Class

  • C++线程池的简单实现方法

    本文以实例形式较为详细的讲述了C++线程池的简单实现方法.分享给大家供大家参考之用.具体方法如下: 一.几个基本的线程函数: 1.线程操纵函数: int pthread_create(pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void *), void *arg); //创建 void pthread_exit(void *retval); //终止自身 int pthread_cancel(pthread_

  • C语言实现支持动态拓展和销毁的线程池

    本文实例介绍了C 语言实现线程池,支持动态拓展和销毁,分享给大家供大家参考,具体内容如下 实现功能 1.初始化指定个数的线程 2.使用链表来管理任务队列 3.支持拓展动态线程 4.如果闲置线程过多,动态销毁部分线程 #include <stdio.h> #include <pthread.h> #include <stdlib.h> #include <signal.h> /*线程的任务队列由,函数和参数组成,任务由链表来进行管理*/ typedef str

  • 利用ace的ACE_Task等类实现线程池的方法详解

    本代码应该是ace自带的例子了,但是我觉得是非常好的,于是给大家分享一下.注释非常详细啊.头文件 复制代码 代码如下: #ifndef THREAD_POOL_H#define THREAD_POOL_H/* In order to implement a thread pool, we have to have an object that   can create a thread.  The ACE_Task<> is the basis for doing just   such a

  • 模拟简单Java线程池的方法详解

    目录 一. 前言 二.线程池是什么? 三.线程池构造方法ThreadPoolExecutor的构造方法的参数都是啥意思? 四.模拟实现一个线程池 总结 一. 前言 为了实现并发编程,于是就引入了进程这个概念.进程就相当于操作系统的一个任务.多个进程同时执行任务,就实现了并发编程,能够更快的执行. 但是由于进程还不够轻量,创建一个进程,销毁一个进程消耗的资源不可忽视.如果进程数量不多的情况下,这些资源消耗是可以接受的,但是如果频繁的创建.销毁进程.就是一笔很大的开销了. 那要怎么办呢? 为了解决这

  • Java查看线程运行状态的方法详解

    目录 一.查看线程的运行状态 二.解题思路 三.代码详解 一.查看线程的运行状态 题目 线程有以下6种状态:新建.运行.阻塞.等待.计时等待和终止. new新线程时,线程处于新建 状态. 调用start()方法时,线程处于运行状态. 当线程需要获得对象的内置锁,而该锁正被其他线程拥有,线程处于阻塞状态. 线程等待其他线程通知调度表可以运行时,该线程处于等待状态. 对于一些含有时间参数的方法,如 Thread 类的 sleep()方法,可以使线程处于计时等待状态. 当run()方法运行完毕或出现异

  • java 打造阻塞式线程池的实例详解

    java 打造阻塞式线程池的实例详解 原来以为tiger已经自带了这种线程池,就是在任务数量超出时能够阻塞住投放任务的线程,主要想用在JMS消息监听. 开始做法: 在ThreadPoolExcecutor中代入new ArrayBlockingQueue(MAX_TASK). 在任务超出时报错:RejectedExecutionException. 后来不用execute方法加入任务,直接getQueue().add(task), 利用其阻塞特性.但是发现阻塞好用了,但是任务没有被处理.一看Qu

  • Java线程池Executor用法详解

    目录 线程池类图 线程池的好处 new Thread的弊端 线程池核心类-ThreadPoolExecutor 使用Executors创建线程池 Executors.newCachedThreadPool Executors.newSingleThreadExecutor Executors.newFixedThreadPool Executors.newScheduledThreadPool 总结 如何定义线程池参数 线程池类图 我们最常使用的Executors实现创建线程池使用线程主要是用上

  • Java为实体类动态添加属性的方法详解

    目录 添加依赖 代码 测试 可以给已有实体类动态的添加字段并返回新的实体对象,不影响原来的实体对象结构. 添加依赖 <dependency> <groupId>cglib</groupId> <artifactId>cglib</artifactId> <version>2.2.2</version> </dependency> <dependency> <groupId>commons

  • Java中线程池自定义实现详解

    目录 前言 线程为什么不能多次调用start方法 线程池到底是如何复用的 前言 最初使用线程池的时候,网上的文章告诉我说线程池可以线程复用,提高线程的创建效率.从此我的脑海中便为线程池打上了一个标签——线程池可以做到线程的复用.但是我总以为线程的复用是指在创建出来的线程可以多次的更换run()方法的内容,来达到线程复用的目的,于是我尝试了一下.同一个线程调用多次,然后使run的内容不一样,但是我发现我错了,一个线程第一次运行是没问题的,当再次调用start方法是会抛出异常(java.lang.I

  • 如何利用反射批量修改java类某一属性的代码详解

    下面看下代码,具体代码如下所示: package utils.copyProperty; import java.beans.Introspector; import java.beans.PropertyDescriptor; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; public class CopyProperty { public static Pro

  • 深入java线程池的使用详解

    在Java 5.0之前启动一个任务是通过调用Thread类的start()方法来实现的,任务的提于交和执行是同时进行的,如果你想对任务的执行进行调度或是控制 同时执行的线程数量就需要额外编写代码来完成.5.0里提供了一个新的任务执行架构使你可以轻松地调度和控制任务的执行,并且可以建立一个类似数据库连接 池的线程池来执行任务.这个架构主要有三个接口和其相应的具体类组成.这三个接口是Executor, ExecutorService.ScheduledExecutorService,让我们先用一个图

  • JAVA线程池原理实例详解

    本文实例讲述了JAVA线程池原理.分享给大家供大家参考,具体如下: 线程池的优点 1.线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用. 2.可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃. 线程池的创建 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQu

随机推荐