00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef _Dispatcher_
00022 #define _Dispatcher_
00023
00024 #include <map>
00025 #include <memory>
00026 #include <string>
00027 #include <boost/shared_ptr.hpp>
00028 #include "qpid/client/Session.h"
00029 #include "qpid/sys/Mutex.h"
00030 #include "qpid/sys/Runnable.h"
00031 #include "qpid/sys/Thread.h"
00032 #include "MessageListener.h"
00033 #include "SubscriptionImpl.h"
00034
00035 namespace qpid {
00036 namespace client {
00037
00038 class SubscriptionImpl;
00039
00041 typedef framing::Handler<framing::FrameSet> FrameSetHandler;
00042
00044 class Dispatcher : public sys::Runnable
00045 {
00046 typedef std::map<std::string, boost::intrusive_ptr<SubscriptionImpl> >Listeners;
00047 sys::Mutex lock;
00048 sys::Thread worker;
00049 Session session;
00050 Demux::QueuePtr queue;
00051 bool running;
00052 bool autoStop;
00053 Listeners listeners;
00054 boost::intrusive_ptr<SubscriptionImpl> defaultListener;
00055 std::auto_ptr<FrameSetHandler> handler;
00056
00057 boost::intrusive_ptr<SubscriptionImpl> find(const std::string& name);
00058 bool isStopped();
00059
00060 boost::function<void ()> failoverHandler;
00061
00062 public:
00063 Dispatcher(const Session& session, const std::string& queue = "");
00064
00065 void start();
00066 void wait();
00067 void run();
00068 void stop();
00069 void setAutoStop(bool b);
00070
00071 void registerFailoverHandler ( boost::function<void ()> fh )
00072 {
00073 failoverHandler = fh;
00074 }
00075
00076 void listen(const boost::intrusive_ptr<SubscriptionImpl>& subscription);
00077 void cancel(const std::string& destination);
00078 };
00079
00080 }}
00081
00082 #endif