Apache Qpid C++ API
Apache Qpid - AMQP Messaging for Java JMS, C++, Python, Ruby, and .NET Apache Qpid Documentation

qpid/client/SessionImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  * Licensed to the Apache Software Foundation (ASF) under one
00004  * or more contributor license agreements.  See the NOTICE file
00005  * distributed with this work for additional information
00006  * regarding copyright ownership.  The ASF licenses this file
00007  * to you under the Apache License, Version 2.0 (the
00008  * "License"); you may not use this file except in compliance
00009  * with the License.  You may obtain a copy of the License at
00010  * 
00011  *   http://www.apache.org/licenses/LICENSE-2.0
00012  * 
00013  * Unless required by applicable law or agreed to in writing,
00014  * software distributed under the License is distributed on an
00015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
00016  * KIND, either express or implied.  See the License for the
00017  * specific language governing permissions and limitations
00018  * under the License.
00019  *
00020  */
00021 
00022 #ifndef _SessionImpl_
00023 #define _SessionImpl_
00024 
00025 #include "Demux.h"
00026 #include "Execution.h"
00027 #include "Results.h"
00028 
00029 #include "qpid/SessionId.h"
00030 #include "qpid/SessionState.h"
00031 #include "boost/shared_ptr.hpp"
00032 #include "boost/weak_ptr.hpp"
00033 #include "qpid/framing/FrameHandler.h"
00034 #include "qpid/framing/ChannelHandler.h"
00035 #include "qpid/framing/SequenceNumber.h"
00036 #include "qpid/framing/AMQP_ClientOperations.h"
00037 #include "qpid/framing/AMQP_ServerProxy.h"
00038 #include "qpid/sys/Semaphore.h"
00039 #include "qpid/sys/StateMonitor.h"
00040 #include "qpid/sys/ExceptionHolder.h"
00041 
00042 #include <boost/optional.hpp>
00043 
00044 namespace qpid {
00045 
00046 namespace framing {
00047 
00048 class FrameSet;
00049 class MethodContent;
00050 class SequenceSet;
00051 
00052 }
00053 
00054 namespace client {
00055 
00056 class Future;
00057 class ConnectionImpl;
00058 class SessionHandler;
00059 
00061 class SessionImpl : public framing::FrameHandler::InOutHandler,
00062                     public Execution,
00063                     private framing::AMQP_ClientOperations::SessionHandler,
00064                     private framing::AMQP_ClientOperations::ExecutionHandler,
00065                     private framing::AMQP_ClientOperations::MessageHandler
00066 {
00067 public:
00068     SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>);
00069     ~SessionImpl();
00070 
00071 
00072     //NOTE: Public functions called in user thread.
00073     framing::FrameSet::shared_ptr get();
00074 
00075     const SessionId getId() const;
00076 
00077     uint16_t getChannel() const;
00078     void setChannel(uint16_t channel);
00079 
00080     void open(uint32_t detachedLifetime);
00081     void close();
00082     void resume(shared_ptr<ConnectionImpl>);
00083     void suspend();
00084 
00085     void assertOpen() const;
00086 
00087     Future send(const framing::AMQBody& command);
00088     Future send(const framing::AMQBody& command, const framing::MethodContent& content);
00089     Future send(const framing::AMQBody& command, const framing::FrameSet& content);
00090     void sendRawFrame(framing::AMQFrame& frame);
00091 
00092     Demux& getDemux();
00093     void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
00094     void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
00095     bool isComplete(const framing::SequenceNumber& id);
00096     bool isCompleteUpTo(const framing::SequenceNumber& id);
00097     void waitForCompletion(const framing::SequenceNumber& id);
00098     void sendCompletion();
00099     void sendFlush();
00100 
00101     void setException(const sys::ExceptionHolder&);
00102     
00103     //NOTE: these are called by the network thread when the connection is closed or dies
00104     void connectionClosed(uint16_t code, const std::string& text);
00105     void connectionBroke(const std::string& text);
00106 
00108     uint32_t setTimeout(uint32_t requestedSeconds);
00109 
00111     uint32_t getTimeout() const;
00112 
00116     void setWeakPtr(bool weak=true);
00117 
00121     shared_ptr<ConnectionImpl> getConnection();
00122 
00124     void disableAutoDetach();
00125 
00126 private:
00127     enum State {
00128         INACTIVE,
00129         ATTACHING,
00130         ATTACHED,
00131         DETACHING,
00132         DETACHED
00133     };
00134     typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
00135     typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler;
00136     typedef framing::AMQP_ClientOperations::MessageHandler MessageHandler;
00137     typedef sys::StateMonitor<State, DETACHED> StateMonitor;
00138     typedef StateMonitor::Set States;
00139 
00140     inline void setState(State s);
00141     inline void waitFor(State);
00142 
00143     void setExceptionLH(const sys::ExceptionHolder&);      // LH = lock held when called.
00144     void detach();
00145     
00146     void check() const;
00147     void checkOpen() const;
00148     void handleClosed();
00149 
00150     void handleIn(framing::AMQFrame& frame);
00151     void handleOut(framing::AMQFrame& frame);
00158     void proxyOut(framing::AMQFrame& frame);
00159     void sendFrame(framing::AMQFrame& frame, bool canBlock);
00160     void deliver(framing::AMQFrame& frame);
00161 
00162     Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
00163     void sendContent(const framing::MethodContent&);
00164     void waitForCompletionImpl(const framing::SequenceNumber& id);
00165     
00166     void sendCompletionImpl();
00167 
00168     // Note: Following methods are called by network thread in
00169     // response to session controls from the broker
00170     void attach(const std::string& name, bool force);    
00171     void attached(const std::string& name);    
00172     void detach(const std::string& name);    
00173     void detached(const std::string& name, uint8_t detachCode);
00174     void requestTimeout(uint32_t timeout);    
00175     void timeout(uint32_t timeout);    
00176     void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset);    
00177     void expected(const framing::SequenceSet& commands, const framing::Array& fragments);    
00178     void confirmed(const framing::SequenceSet& commands, const framing::Array& fragments);    
00179     void completed(const framing::SequenceSet& commands, bool timelyReply);    
00180     void knownCompleted(const framing::SequenceSet& commands);    
00181     void flush(bool expected, bool confirmed, bool completed);    
00182     void gap(const framing::SequenceSet& commands);
00183 
00184     // Note: Following methods are called by network thread in
00185     // response to execution commands from the broker
00186     void sync();    
00187     void result(const framing::SequenceNumber& commandId, const std::string& value);    
00188     void exception(uint16_t errorCode,
00189                    const framing::SequenceNumber& commandId,
00190                    uint8_t classCode,
00191                    uint8_t commandCode,
00192                    uint8_t fieldIndex,
00193                    const std::string& description,
00194                    const framing::FieldTable& errorInfo);
00195                    
00196     // Note: Following methods are called by network thread in
00197     // response to message commands from the broker
00198     // EXCEPT Message.Transfer
00199     void accept(const qpid::framing::SequenceSet&);
00200     void reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&);
00201     void release(const qpid::framing::SequenceSet&, bool);
00202     qpid::framing::MessageResumeResult resume(const std::string&, const std::string&);
00203     void setFlowMode(const std::string&, uint8_t);
00204     void flow(const std::string&, uint8_t, uint32_t);
00205     void stop(const std::string&);
00206 
00207 
00208     sys::ExceptionHolder exceptionHolder;
00209     mutable StateMonitor state;
00210     mutable sys::Semaphore sendLock;
00211     uint32_t detachedLifetime;
00212     const uint64_t maxFrameSize;
00213     const SessionId id;
00214 
00215     shared_ptr<ConnectionImpl> connectionShared;
00216     boost::weak_ptr<ConnectionImpl> connectionWeak;
00217     bool weakPtr;
00218 
00219     framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
00220     framing::ChannelHandler channel;
00221     framing::AMQP_ServerProxy::Session proxy;
00222 
00223     Results results;
00224     Demux demux;
00225     framing::FrameSet::shared_ptr arriving;
00226 
00227     framing::SequenceSet incompleteIn;//incoming commands that are as yet incomplete
00228     framing::SequenceSet completedIn;//incoming commands that are have completed
00229     framing::SequenceSet incompleteOut;//outgoing commands not yet known to be complete
00230     framing::SequenceSet completedOut;//outgoing commands that we know to be completed
00231     framing::SequenceNumber nextIn;
00232     framing::SequenceNumber nextOut;
00233 
00234     SessionState sessionState;
00235 
00236     // Only keep track of message credit 
00237     sys::Semaphore* sendMsgCredit;
00238 
00239     bool autoDetach;
00240     
00241   friend class client::SessionHandler;
00242 };
00243 
00244 }} // namespace qpid::client
00245 
00246 #endif

Qpid C++ API Reference
Generated on Tue Dec 8 15:39:48 2009 for Qpid C++ Client API by doxygen 1.4.7