HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
PDG_EventQueue.h
Go to the documentation of this file.
1 /*
2  * PROPRIETARY INFORMATION. This software is proprietary to
3  * Side Effects Software Inc., and is not to be reproduced,
4  * transmitted, or disclosed in any way without written permission.
5  *
6  * COMMENTS:
7  */
8 
9 #ifndef __PDG_EventQueue_h__
10 #define __PDG_EventQueue_h__
11 
12 #include "PDG_API.h"
13 #include "PDG_Event.h"
14 #include "PDG_EventTypes.h"
15 
16 #include <UT/UT_Array.h>
18 #include <UT/UT_ConcurrentQueue.h>
19 #include <UT/UT_ConcurrentVector.h>
21 #include <UT/UT_UniquePtr.h>
22 
23 #include <SYS/SYS_AtomicInt.h>
24 
25 #define PDG_EVENT_QUEUE_STATS 0
26 
27 class PDG_EventEmitter;
28 class PDG_EventHandler;
29 
30 class UT_Thread;
31 
32 /* Event queue that dispatches PDG_Events to the handlers registered for
33  * the PDG_EventEmitter that produced them.
34  */
36 {
37 public:
38  /// Scoped object that pauses event handling. Pausing is implemented with
39  /// an atomic counter on the queue, so nested ScopedPause objects work as
40  /// expected. Each scoped pause increments the pause request counter, so
41  /// event processing will only proceed once all ScopedPause objects are
42  /// unwound.
44  {
45  public:
47  : myQueue(queue)
48  {
49  queue->pause();
50  }
51 
53  {
54  myQueue->resume();
55  }
56 
57  private:
58  PDG_EventQueue* myQueue;
59  };
60 
61 public:
62  /// Constructs a new event queue
63  PDG_EventQueue(bool background);
64  ~PDG_EventQueue();
65 
66  /// Terminates the queue, waits for the event thread to finish, and
67  /// unsets the queue from all event emitters registered with this event
68  /// queue instance.
69  void terminate();
70 
71  /// Queues an event that will eventually be dispatched by the background
72  /// thread running in this event queue instance.
73  void queueEvent(
74  const PDG_EventEmitter* emitter,
75  const PDG_Event& event);
76 
77  /// Emits an event immediately on the calling thread
78  void emitEvent(
79  const PDG_EventEmitter* emitter,
80  const PDG_Event& event) const;
81 
82  /// Waits for all queued events to finish, and prevents new events from
83  /// being added until that condition is reached. Blocks the caller. If
84  /// an event emitter is passed into this function, it gets deregistered
85  /// before unblocking the event queue.
86  void waitAllEvents(PDG_EventEmitter* emitter=nullptr);
87 
88  /// Waits until the specified emitter is registered, to ensure that all
89  /// events for that emitter have been flushed
90  void waitEmitter(const PDG_EventEmitter* emitter);
91 
92  /// Returns whether or not the event queue attempts to consolidate
93  /// consecutive events of the same type
94  inline bool isConsolidateEvents() const
95  { return myIsConsolidateEvents; }
96 
97  /// Configures the event consolidation flag
98  inline void setConsolidateEvents(bool consolidate)
99  { myIsConsolidateEvents = consolidate; }
100 
101 private:
102  friend class PDG_EventEmitter;
103 
104  struct EmitterInfo
105  {
106  PDG_EventHandlerArray myHandlers;
107  PDG_EventFilterArray myFilters;
108 
109  void add(
110  PDG_EventHandler* handler,
111  const PDG_EventFilter& filter,
112  PDG_EventFilterMap& filter_map);
113  bool remove(
114  PDG_EventHandler* handler,
115  PDG_EventFilterMap& filter_map);
116  void removeAll(
117  PDG_EventHandlerArray& removed,
118  PDG_EventFilterMap& filter_map,
119  bool user_only);
120  void emitEvent(const PDG_Event& event) const;
121  };
122 
123  enum EmitterUpdate
124  {
125  eHandlerAdd,
126  eHandlerRemove,
127  eHandlerRemoveAll
128  };
129 
130  struct DeferredEmitterUpdate
131  {
132  PDG_EventEmitter* myEmitter;
133  PDG_EventHandler* myHandler;
134 
135  PDG_EventFilter myFilter;
136 
137  EmitterUpdate myType;
138  bool myUserOnly;
139  };
140 
141  using EventInfo = std::pair<
142  const PDG_EventEmitter*, PDG_Event>;
143  using EventArray = UT_Array<EventInfo>;
144  using EventQueue = UT_ConcurrentQueue<EventInfo>;
145  using EmitterMap = UT_ConcurrentHashMap<
146  PDG_EventEmitter*, EmitterInfo>;
147  using EmitterArrayMap = UT_UniquePtr<EmitterMap[]>;
148  using EmitterUpdateArray = UT_ConcurrentVector<DeferredEmitterUpdate>;
149  using ActiveEmitter = UT_ThreadSpecificValue<
150  const PDG_EventEmitter*>;
151 
152  class EventFunctor
153  {
154  public:
155  EventFunctor(
157  const EventArray& events);
158  ~EventFunctor();
159 
160  void operator()(
161  const UT_BlockedRange<int64> &range) const;
162 
163  private:
164  PDG_EventQueue* myQueue;
165  const EventArray& myEvents;
166  };
167 
168 private:
169 
170  exint emitterIndex(const PDG_EventEmitter* emitter) const;
171 
172  void registerEmitter(PDG_EventEmitter* emitter);
173  void deregisterEmitter(PDG_EventEmitter* emitter);
174 
175  bool addEventHandler(
176  PDG_EventEmitter* emitter,
177  PDG_EventHandler* handler,
178  const PDG_EventFilter& filter);
179  void removeEventHandler(
180  PDG_EventEmitter* emitter,
181  PDG_EventHandler* handler);
182  void removeAllEventHandlers(
183  PDG_EventEmitter* emitter,
184  bool user_only);
185  void findEventHandlers(
186  const PDG_EventEmitter* emitter,
187  PDG_EventHandlerArray& handlers) const;
188 
189  void processQueueStats(int print_limit);
190  void processEvent(
191  const EventInfo& info,
192  const PDG_EventEmitter*& active_emitter);
193  void processEvents();
194 
195  inline void pause()
196  { myPauseRequests.add(1); }
197  inline void resume()
198  { myPauseRequests.add(-1); }
199 
200  static void* runThread(void* param);
201 
202 private:
203  EventQueue myEvents;
204  EmitterArrayMap myEmitters;
205  EmitterUpdateArray myEmitterUpdates;
206  ActiveEmitter myActiveEmitter;
207 
208  UT_Thread* myThread;
209 
210 #if PDG_EVENT_QUEUE_STATS
211  exint myDropEventCount;
212  exint myQueuedEventCount;
213  exint myPoppedEventCount;
214  exint myProcessedEventCount;
215  exint myQueueLogTime;
216 #endif
217 
218  exint myEmitterMapSize;
219  SYS_AtomicInt32 myPauseRequests;
220 
221  bool myIsTerminating;
222  bool myIsWaiting;
223  bool myIsProcessing;
224  bool myIsConsolidateEvents;
225 };
226 
227 #endif
GLenum GLint * range
Definition: glcorearb.h:1925
#define PDG_API
Definition: PDG_API.h:23
int64 exint
Definition: SYS_Types.h:125
bool isConsolidateEvents() const
std::unique_ptr< T, Deleter > UT_UniquePtr
A smart pointer for unique ownership of dynamically allocated objects.
Definition: UT_UniquePtr.h:39
struct _cl_event * event
Definition: glcorearb.h:2961
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the queue
Definition: thread.h:623
#define UT_ConcurrentHashMap
GLenum GLfloat param
Definition: glcorearb.h:104
void setConsolidateEvents(bool consolidate)
Configures the event consolidation flag.
void pause(int delay) noexcept
Definition: thread.h:102
ScopedPause(PDG_EventQueue *queue)
ImageBuf OIIO_API add(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
Declare prior to use.
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition: glcorearb.h:1297