OGRE  2.0
Object-Oriented Graphics Rendering Engine
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
OgreWorkQueue.h
Go to the documentation of this file.
1 /*
2 -----------------------------------------------------------------------------
3 This source file is part of OGRE
4 (Object-oriented Graphics Rendering Engine)
5 For the latest info, see http://www.ogre3d.org/
6 
7 Copyright (c) 2000-2014 Torus Knot Software Ltd
8 
9 Permission is hereby granted, free of charge, to any person obtaining a copy
10 of this software and associated documentation files (the "Software"), to deal
11 in the Software without restriction, including without limitation the rights
12 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 copies of the Software, and to permit persons to whom the Software is
14 furnished to do so, subject to the following conditions:
15 
16 The above copyright notice and this permission notice shall be included in
17 all copies or substantial portions of the Software.
18 
19 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 THE SOFTWARE.
26 -----------------------------------------------------------------------------
27 */
28 #ifndef __OgreWorkQueue_H__
29 #define __OgreWorkQueue_H__
30 
31 #include "OgrePrerequisites.h"
32 #include "OgreAny.h"
33 #include "OgreSharedPtr.h"
34 #include "OgreCommon.h"
36 #include "OgreHeaderPrefix.h"
37 
38 namespace Ogre
39 {
72  {
73  protected:
77  OGRE_MUTEX(mChannelMapMutex);
78  public:
80  typedef unsigned long long int RequestID;
81 
85  {
86  friend class WorkQueue;
87  protected:
99  mutable bool mAborted;
100 
101  public:
103  Request(uint16 channel, uint16 rtype, const Any& rData, uint8 retry, RequestID rid);
104  ~Request();
106  void abortRequest() const { mAborted = true; }
108  uint16 getChannel() const { return mChannel; }
110  uint16 getType() const { return mType; }
112  const Any& getData() const { return mData; }
114  uint8 getRetryCount() const { return mRetryCount; }
116  RequestID getID() const { return mID; }
118  bool getAborted() const { return mAborted; }
119  };
120 
124  {
128  bool mSuccess;
133 
134  public:
135  Response(const Request* rq, bool success, const Any& data, const String& msg = BLANKSTRING);
136  ~Response();
138  const Request* getRequest() const { return mRequest; }
140  bool succeeded() const { return mSuccess; }
142  const String& getMessages() const { return mMessages; }
144  const Any& getData() const { return mData; }
146  void abortRequest() { mRequest->abortRequest(); mData.destroy(); }
147  };
148 
163  {
164  public:
166  virtual ~RequestHandler() {}
167 
174  virtual bool canHandleRequest(const Request* req, const WorkQueue* srcQ)
175  { (void)srcQ; return !req->getAborted(); }
176 
187  virtual Response* handleRequest(const Request* req, const WorkQueue* srcQ) = 0;
188  };
189 
198  {
199  public:
201  virtual ~ResponseHandler() {}
202 
209  virtual bool canHandleResponse(const Response* res, const WorkQueue* srcQ)
210  { (void)srcQ; return !res->getRequest()->getAborted(); }
211 
219  virtual void handleResponse(const Response* res, const WorkQueue* srcQ) = 0;
220  };
221 
222  WorkQueue() : mNextChannel(0) {}
223  virtual ~WorkQueue() {}
224 
229  virtual void startup(bool forceRestart = true) = 0;
239  virtual void addRequestHandler(uint16 channel, RequestHandler* rh) = 0;
241  virtual void removeRequestHandler(uint16 channel, RequestHandler* rh) = 0;
242 
252  virtual void addResponseHandler(uint16 channel, ResponseHandler* rh) = 0;
254  virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh) = 0;
255 
273  virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0,
274  bool forceSynchronous = false, bool idleThread = false) = 0;
275 
281  virtual void abortRequest(RequestID id) = 0;
282 
289  virtual void abortRequestsByChannel(uint16 channel) = 0;
290 
297  virtual void abortPendingRequestsByChannel(uint16 channel) = 0;
298 
303  virtual void abortAllRequests() = 0;
304 
310  virtual void setPaused(bool pause) = 0;
312  virtual bool isPaused() const = 0;
313 
318  virtual void setRequestsAccepted(bool accept) = 0;
320  virtual bool getRequestsAccepted() const = 0;
321 
330  virtual void processResponses() = 0;
331 
335  virtual unsigned long getResponseProcessingTimeLimit() const = 0;
336 
342  virtual void setResponseProcessingTimeLimit(unsigned long ms) = 0;
343 
346  virtual void shutdown() = 0;
347 
355  virtual uint16 getChannel(const String& channelName);
356 
357  };
358 
362  {
363  public:
364 
369  DefaultWorkQueueBase(const String& name = BLANKSTRING);
370  virtual ~DefaultWorkQueueBase();
372  const String& getName() const;
376  virtual size_t getWorkerThreadCount() const;
377 
383  virtual void setWorkerThreadCount(size_t c);
384 
394  virtual bool getWorkersCanAccessRenderSystem() const;
395 
396 
408  virtual void setWorkersCanAccessRenderSystem(bool access);
409 
417  virtual void _processNextRequest();
418 
420  virtual void _threadMain() = 0;
421 
423  virtual bool isShuttingDown() const { return mShuttingDown; }
424 
426  virtual void addRequestHandler(uint16 channel, RequestHandler* rh);
428  virtual void removeRequestHandler(uint16 channel, RequestHandler* rh);
430  virtual void addResponseHandler(uint16 channel, ResponseHandler* rh);
432  virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh);
433 
435  virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0,
436  bool forceSynchronous = false, bool idleThread = false);
438  virtual void abortRequest(RequestID id);
440  virtual void abortRequestsByChannel(uint16 channel);
442  virtual void abortPendingRequestsByChannel(uint16 channel);
444  virtual void abortAllRequests();
446  virtual void setPaused(bool pause);
448  virtual bool isPaused() const;
450  virtual void setRequestsAccepted(bool accept);
452  virtual bool getRequestsAccepted() const;
454  virtual void processResponses();
456  virtual unsigned long getResponseProcessingTimeLimit() const { return mResposeTimeLimitMS; }
458  virtual void setResponseProcessingTimeLimit(unsigned long ms) { mResposeTimeLimitMS = ms; }
459  protected:
464  unsigned long mResposeTimeLimitMS;
465 
468  RequestQueue mRequestQueue; // Guarded by mRequestMutex
469  RequestQueue mProcessQueue; // Guarded by mProcessMutex
470  ResponseQueue mResponseQueue; // Guarded by mResponseMutex
471 
474  {
476 
478  : mQueue(q) {}
479 
480  void operator()();
481 
482  void operator()() const;
483 
484  void run();
485  };
486  WorkerFunc* mWorkerFunc;
487 
493  {
494  protected:
495  OGRE_RW_MUTEX(mRWMutex);
496  RequestHandler* mHandler;
497  public:
498  RequestHandlerHolder(RequestHandler* handler)
499  : mHandler(handler) {}
500 
501  // Disconnect the handler to allow it to be destroyed
503  {
504  // write lock - must wait for all requests to finish
505  OGRE_LOCK_RW_MUTEX_WRITE(mRWMutex);
506  mHandler = 0;
507  }
508 
512  RequestHandler* getHandler() { return mHandler; }
513 
517  Response* handleRequest(const Request* req, const WorkQueue* srcQ)
518  {
519  // Read mutex so that multiple requests can be processed by the
520  // same handler in parallel if required
521  OGRE_LOCK_RW_MUTEX_READ(mRWMutex);
522  Response* response = 0;
523  if (mHandler)
524  {
525  if (mHandler->canHandleRequest(req, srcQ))
526  {
527  response = mHandler->handleRequest(req, srcQ);
528  }
529  }
530  return response;
531  }
532 
533  };
534  // Hold these by shared pointer so they can be copied keeping same instance
536 
541 
544  RequestID mRequestCount; // Guarded by mRequestMutex
545  bool mPaused;
548 
549  //NOTE: If you lock multiple mutexes at the same time, the order is important!
550  // For example if threadA locks mIdleMutex first then tries to lock mProcessMutex,
551  // and threadB locks mProcessMutex first, then mIdleMutex. In this case you can get livelock and the system is dead!
552  //RULE: Lock mProcessMutex before other mutex, to prevent livelocks
553  OGRE_MUTEX(mIdleMutex);
554  OGRE_MUTEX(mRequestMutex);
555  OGRE_MUTEX(mProcessMutex);
556  OGRE_MUTEX(mResponseMutex);
557  OGRE_RW_MUTEX(mRequestHandlerMutex);
558 
559 
560  void processRequestResponse(Request* r, bool synchronous);
561  Response* processRequest(Request* r);
562  void processResponse(Response* r);
564  virtual void notifyWorkers() = 0;
566  void addRequestWithRID(RequestID rid, uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount);
567 
568  RequestQueue mIdleRequestQueue; // Guarded by mIdleMutex
569  bool mIdleThreadRunning; // Guarded by mIdleMutex
570  Request* mIdleProcessed; // Guarded by mProcessMutex
571 
572 
573  bool processIdleRequests();
574  };
575 
576 
577 
578 
579 
583 }
584 
585 #include "OgreHeaderSuffix.h"
586 
587 #endif
588 
ResponseHandlerListByChannel mResponseHandlers
unsigned char uint8
Definition: OgrePlatform.h:422
uint16 getChannel() const
Get the request channel (top level categorisation)
const String & getMessages() const
Get any diagnostic messages about the process.
#define _OgreExport
Definition: OgrePlatform.h:255
Any mData
The details of the request (user defined)
Definition: OgreWorkQueue.h:93
Variant type that can hold Any other type.
Definition: OgreAny.h:54
uint16 mType
The request type, as an integer within the channel (user can define enumerations on this) ...
Definition: OgreWorkQueue.h:91
virtual void setResponseProcessingTimeLimit(unsigned long ms)
Set the time limit imposed on the processing of responses in a single frame, in milliseconds (0 indic...
#define OGRE_MUTEX(name)
General purpose request structure.
Definition: OgreWorkQueue.h:84
bool mSuccess
Whether the work item succeeded or not.
map< uint16, ResponseHandlerList >::type ResponseHandlerListByChannel
RequestID mID
Identifier (assigned by the system)
Definition: OgreWorkQueue.h:97
General purpose response structure.
unsigned long mResposeTimeLimitMS
map< uint16, RequestHandlerList >::type RequestHandlerListByChannel
list< RequestHandlerHolderPtr >::type RequestHandlerList
bool mAborted
Abort Flag.
Definition: OgreWorkQueue.h:99
Base for a general purpose request / response style background work queue.
const Any & getData() const
Return the response data (user defined, only valid on success)
virtual bool canHandleResponse(const Response *res, const WorkQueue *srcQ)
Return whether this handler can process a given response.
SharedPtr< RequestHandlerHolder > RequestHandlerHolderPtr
virtual unsigned long getResponseProcessingTimeLimit() const
Get the time limit imposed on the processing of responses in a single frame, in milliseconds (0 indic...
Any mData
Data associated with the result of the process.
std::list< T, A > type
ChannelMap mChannelMap
Definition: OgreWorkQueue.h:75
const String BLANKSTRING
Constant blank string, useful for returning by ref where local does not exist.
Definition: OgreCommon.h:574
RequestHandler * getHandler()
Get handler pointer - note, only use this for == comparison or similar, do not attempt to call it as ...
const Request * mRequest
Pointer to the request that this response is in relation to.
uint16 getType() const
Get the type of this request within the given channel.
virtual bool canHandleRequest(const Request *req, const WorkQueue *srcQ)
Return whether this handler can process a given request.
RequestID getID() const
Get the identifier of this request.
void abortRequest()
Abort the request.
Interface definition for a handler of requests.
Intermediate structure to hold a pointer to a request handler which provides insurance against the ha...
virtual bool isShuttingDown() const
Returns whether the queue is trying to shut down.
Superclass for all objects that wish to use custom memory allocators when their new / delete operator...
uint8 mRetryCount
Retry count - set this to non-zero to have the request try again on failure.
Definition: OgreWorkQueue.h:95
uint8 getRetryCount() const
Get the remaining retry count.
map< String, uint16 >::type ChannelMap
Definition: OgreWorkQueue.h:74
Interface definition for a handler of responses.
deque< Request * >::type RequestQueue
bool getAborted() const
Get the abort flag.
RequestHandlerListByChannel mRequestHandlers
#define OGRE_LOCK_RW_MUTEX_READ(name)
virtual ~WorkQueue()
#define OGRE_LOCK_RW_MUTEX_WRITE(name)
void abortRequest() const
Set the abort flag.
list< ResponseHandler * >::type ResponseHandlerList
unsigned long long int RequestID
Numeric identifier for a request.
Definition: OgreWorkQueue.h:80
Response * handleRequest(const Request *req, const WorkQueue *srcQ)
Process a request if possible.
Reference-counted shared pointer, used for objects where implicit destruction is required.
#define OGRE_RW_MUTEX(name)
const Request * getRequest() const
Get the request that this is a response to (NB destruction destroys this)
_StringBase String
Definition: OgreCommon.h:53
const Any & getData() const
Get the user details of this request.
unsigned short uint16
Definition: OgrePlatform.h:421
String mMessages
Any diagnostic messages.
Interface to a general purpose request / response style background work queue.
Definition: OgreWorkQueue.h:71
deque< Response * >::type ResponseQueue
uint16 mChannel
The request channel, as an integer.
Definition: OgreWorkQueue.h:89
bool succeeded() const
Return whether this is a successful response.