OGRE  1.7
Object-Oriented Graphics Rendering Engine
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
OgreWorkQueue.h
Go to the documentation of this file.
1 /*
2 -----------------------------------------------------------------------------
3 This source file is part of OGRE
4 (Object-oriented Graphics Rendering Engine)
5 For the latest info, see http://www.ogre3d.org/
6 
7 Copyright (c) 2000-2011 Torus Knot Software Ltd
8 
9 Permission is hereby granted, free of charge, to any person obtaining a copy
10 of this software and associated documentation files (the "Software"), to deal
11 in the Software without restriction, including without limitation the rights
12 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 copies of the Software, and to permit persons to whom the Software is
14 furnished to do so, subject to the following conditions:
15 
16 The above copyright notice and this permission notice shall be included in
17 all copies or substantial portions of the Software.
18 
19 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 THE SOFTWARE.
26 -----------------------------------------------------------------------------
27 */
28 #ifndef __OgreWorkQueue_H__
29 #define __OgreWorkQueue_H__
30 
31 #include "OgrePrerequisites.h"
32 #include "OgreAtomicWrappers.h"
33 #include "OgreAny.h"
34 #include "OgreSharedPtr.h"
35 
36 namespace Ogre
37 {
70  {
71  protected:
72  typedef std::map<String, uint16> ChannelMap;
75  OGRE_MUTEX(mChannelMapMutex)
76  public:
78  typedef unsigned long long int RequestID;
79 
83  {
84  friend class WorkQueue;
85  protected:
95  RequestID mID;
97  mutable bool mAborted;
98 
99  public:
101  Request(uint16 channel, uint16 rtype, const Any& rData, uint8 retry, RequestID rid);
102  ~Request();
104  void abortRequest() const { mAborted = true; }
106  uint16 getChannel() const { return mChannel; }
108  uint16 getType() const { return mType; }
110  const Any& getData() const { return mData; }
112  uint8 getRetryCount() const { return mRetryCount; }
114  RequestID getID() const { return mID; }
116  bool getAborted() const { return mAborted; }
117  };
118 
122  {
126  bool mSuccess;
131 
132  public:
133  Response(const Request* rq, bool success, const Any& data, const String& msg = StringUtil::BLANK);
134  ~Response();
136  const Request* getRequest() const { return mRequest; }
138  bool succeeded() const { return mSuccess; }
140  const String& getMessages() const { return mMessages; }
142  const Any& getData() const { return mData; }
144  void abortRequest() { mRequest->abortRequest(); mData.destroy(); }
145  };
146 
161  {
162  public:
164  virtual ~RequestHandler() {}
165 
172  virtual bool canHandleRequest(const Request* req, const WorkQueue* srcQ)
173  { (void)srcQ; return !req->getAborted(); }
174 
185  virtual Response* handleRequest(const Request* req, const WorkQueue* srcQ) = 0;
186  };
187 
196  {
197  public:
199  virtual ~ResponseHandler() {}
200 
207  virtual bool canHandleResponse(const Response* res, const WorkQueue* srcQ)
208  { (void)srcQ; return !res->getRequest()->getAborted(); }
209 
217  virtual void handleResponse(const Response* res, const WorkQueue* srcQ) = 0;
218  };
219 
220  WorkQueue() : mNextChannel(0) {}
221  virtual ~WorkQueue() {}
222 
227  virtual void startup(bool forceRestart = true) = 0;
237  virtual void addRequestHandler(uint16 channel, RequestHandler* rh) = 0;
239  virtual void removeRequestHandler(uint16 channel, RequestHandler* rh) = 0;
240 
250  virtual void addResponseHandler(uint16 channel, ResponseHandler* rh) = 0;
252  virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh) = 0;
253 
266  virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0,
267  bool forceSynchronous = false) = 0;
268 
274  virtual void abortRequest(RequestID id) = 0;
275 
281  virtual void abortRequestsByChannel(uint16 channel) = 0;
282 
287  virtual void abortAllRequests() = 0;
288 
294  virtual void setPaused(bool pause) = 0;
296  virtual bool isPaused() const = 0;
297 
302  virtual void setRequestsAccepted(bool accept) = 0;
304  virtual bool getRequestsAccepted() const = 0;
305 
314  virtual void processResponses() = 0;
315 
319  virtual unsigned long getResponseProcessingTimeLimit() const = 0;
320 
326  virtual void setResponseProcessingTimeLimit(unsigned long ms) = 0;
327 
330  virtual void shutdown() = 0;
331 
339  virtual uint16 getChannel(const String& channelName);
340 
341  };
342 
346  {
347  public:
348 
354  virtual ~DefaultWorkQueueBase();
356  const String& getName() const;
360  virtual size_t getWorkerThreadCount() const;
361 
367  virtual void setWorkerThreadCount(size_t c);
368 
378  virtual bool getWorkersCanAccessRenderSystem() const;
379 
380 
392  virtual void setWorkersCanAccessRenderSystem(bool access);
393 
401  virtual void _processNextRequest();
402 
404  virtual void _threadMain() = 0;
405 
407  virtual bool isShuttingDown() const { return mShuttingDown; }
408 
410  virtual void addRequestHandler(uint16 channel, RequestHandler* rh);
412  virtual void removeRequestHandler(uint16 channel, RequestHandler* rh);
414  virtual void addResponseHandler(uint16 channel, ResponseHandler* rh);
416  virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh);
417 
419  virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0,
420  bool forceSynchronous = false);
422  virtual void abortRequest(RequestID id);
424  virtual void abortRequestsByChannel(uint16 channel);
426  virtual void abortAllRequests();
428  virtual void setPaused(bool pause);
430  virtual bool isPaused() const;
432  virtual void setRequestsAccepted(bool accept);
434  virtual bool getRequestsAccepted() const;
436  virtual void processResponses();
438  virtual unsigned long getResponseProcessingTimeLimit() const { return mResposeTimeLimitMS; }
440  virtual void setResponseProcessingTimeLimit(unsigned long ms) { mResposeTimeLimitMS = ms; }
441  protected:
446  unsigned long mResposeTimeLimitMS;
447 
453 
455  struct WorkerFunc OGRE_THREAD_WORKER_INHERIT
456  {
458 
460  : mQueue(q) {}
461 
462  void operator()();
463 
464  void run();
465  };
466  WorkerFunc* mWorkerFunc;
467 
473  {
474  protected:
475  OGRE_RW_MUTEX(mRWMutex);
476  RequestHandler* mHandler;
477  public:
478  RequestHandlerHolder(RequestHandler* handler)
479  : mHandler(handler) {}
480 
481  // Disconnect the handler to allow it to be destroyed
483  {
484  // write lock - must wait for all requests to finish
485  OGRE_LOCK_RW_MUTEX_WRITE(mRWMutex);
486  mHandler = 0;
487  }
488 
492  RequestHandler* getHandler() { return mHandler; }
493 
497  Response* handleRequest(const Request* req, const WorkQueue* srcQ)
498  {
499  // Read mutex so that multiple requests can be processed by the
500  // same handler in parallel if required
501  OGRE_LOCK_RW_MUTEX_READ(mRWMutex);
502  Response* response = 0;
503  if (mHandler)
504  {
505  if (mHandler->canHandleRequest(req, srcQ))
506  {
507  response = mHandler->handleRequest(req, srcQ);
508  }
509  }
510  return response;
511  }
512 
513  };
514  // Hold these by shared pointer so they can be copied keeping same instance
516 
521 
524  RequestID mRequestCount;
525  bool mPaused;
528 
529  OGRE_MUTEX(mRequestMutex)
530  OGRE_MUTEX(mProcessMutex)
531  OGRE_MUTEX(mResponseMutex)
532  OGRE_RW_MUTEX(mRequestHandlerMutex);
533 
534 
535  void processRequestResponse(Request* r, bool synchronous);
536  Response* processRequest(Request* r);
537  void processResponse(Response* r);
539  virtual void notifyWorkers() = 0;
541  void addRequestWithRID(RequestID rid, uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount);
542 
543  };
544 
545 
546 
547 
548 
552 }
553 
554 
555 #endif
556 
ResponseHandlerListByChannel mResponseHandlers
unsigned char uint8
Definition: OgrePlatform.h:248
uint16 getChannel() const
Get the request channel (top level categorisation)
const String & getMessages() const
Get any diagnostic messages about the process.
#define _OgreExport
Definition: OgrePlatform.h:203
Any mData
The details of the request (user defined)
Definition: OgreWorkQueue.h:91
Variant type that can hold Any other type.
Definition: OgreAny.h:56
#define OGRE_RW_MUTEX(name)
uint16 mType
The request type, as an integer within the channel (user can define enumerations on this) ...
Definition: OgreWorkQueue.h:89
virtual void setResponseProcessingTimeLimit(unsigned long ms)
Set the time limit imposed on the processing of responses in a single frame, in milliseconds (0 indic...
General purpose request structure.
Definition: OgreWorkQueue.h:82
bool mSuccess
Whether the work item succeeded or not.
map< uint16, ResponseHandlerList >::type ResponseHandlerListByChannel
RequestID mID
Identifier (assigned by the system)
Definition: OgreWorkQueue.h:95
General purpose response structure.
unsigned long mResposeTimeLimitMS
map< uint16, RequestHandlerList >::type RequestHandlerListByChannel
list< RequestHandlerHolderPtr >::type RequestHandlerList
bool mAborted
Abort Flag.
Definition: OgreWorkQueue.h:97
Base for a general purpose request / response style background work queue.
const Any & getData() const
Return the response data (user defined, only valid on success)
#define OGRE_MUTEX(name)
#define OGRE_LOCK_RW_MUTEX_READ(name)
virtual bool canHandleResponse(const Response *res, const WorkQueue *srcQ)
Return whether this handler can process a given response.
SharedPtr< RequestHandlerHolder > RequestHandlerHolderPtr
virtual unsigned long getResponseProcessingTimeLimit() const
Get the time limit imposed on the processing of responses in a single frame, in milliseconds (0 indic...
Any mData
Data associated with the result of the process.
std::list< T, A > type
ChannelMap mChannelMap
Definition: OgreWorkQueue.h:73
RequestHandler * getHandler()
Get handler pointer - note, only use this for == comparison or similar, do not attempt to call it as ...
const Request * mRequest
Pointer to the request that this response is in relation to.
uint16 getType() const
Get the type of this request within the given channel.
virtual bool canHandleRequest(const Request *req, const WorkQueue *srcQ)
Return whether this handler can process a given request.
RequestID getID() const
Get the identifier of this request.
void abortRequest()
Abort the request.
Interface definition for a handler of requests.
Intermediate structure to hold a pointer to a request handler which provides insurance against the ha...
virtual bool isShuttingDown() const
Returns whether the queue is trying to shut down.
std::map< String, uint16 > ChannelMap
Definition: OgreWorkQueue.h:72
Superclass for all objects that wish to use custom memory allocators when their new / delete operator...
#define OGRE_LOCK_RW_MUTEX_WRITE(name)
uint8 mRetryCount
Retry count - set this to non-zero to have the request try again on failure.
Definition: OgreWorkQueue.h:93
uint8 getRetryCount() const
Get the remaining retry count.
Interface definition for a handler of responses.
deque< Request * >::type RequestQueue
bool getAborted() const
Get the abort flag.
RequestHandlerListByChannel mRequestHandlers
static const String BLANK
Constant blank string, useful for returning by ref where local does not exist.
Definition: OgreString.h:180
virtual ~WorkQueue()
void abortRequest() const
Set the abort flag.
list< ResponseHandler * >::type ResponseHandlerList
unsigned long long int RequestID
Numeric identifier for a request.
Definition: OgreWorkQueue.h:78
Response * handleRequest(const Request *req, const WorkQueue *srcQ)
Process a request if possible.
Reference-counted shared pointer, used for objects where implicit destruction is required.
Definition: OgreSharedPtr.h:60
const Request * getRequest() const
Get the request that this is a response to (NB destruction destroys this)
_StringBase String
const Any & getData() const
Get the user details of this request.
unsigned short uint16
Definition: OgrePlatform.h:247
String mMessages
Any diagnostic messages.
Interface to a general purpose request / response style background work queue.
Definition: OgreWorkQueue.h:69
deque< Response * >::type ResponseQueue
uint16 mChannel
The request channel, as an integer.
Definition: OgreWorkQueue.h:87
bool succeeded() const
Return whether this is a successful response.