HDK
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
PDG_EventQueue.h
Go to the documentation of this file.
1 /*
2  * PROPRIETARY INFORMATION. This software is proprietary to
3  * Side Effects Software Inc., and is not to be reproduced,
4  * transmitted, or disclosed in any way without written permission.
5  *
6  * COMMENTS:
7  */
8 
9 #ifndef __PDG_EventQueue_h__
10 #define __PDG_EventQueue_h__
11 
12 #include "PDG_API.h"
13 #include "PDG_Event.h"
14 #include "PDG_EventTypes.h"
15 
16 #include <UT/UT_Array.h>
18 #include <UT/UT_ConcurrentQueue.h>
19 #include <UT/UT_ConcurrentVector.h>
21 #include <UT/UT_UniquePtr.h>
22 
23 #include <SYS/SYS_AtomicInt.h>
24 
25 #define PDG_EVENT_QUEUE_STATS 0
26 
27 class PDG_EventEmitter;
28 class PDG_EventHandler;
29 
30 class UT_Thread;
31 
32 /* Event queue that dispatches PDG_Events to the handlers registered for
33  * the PDG_EventEmitter that produced them.
34  */
36 {
37 public:
38  /// Scoped object that pauses event handling. Pausing is implemented with
39  /// an atomic counter on the queue, so nested ScopedPause objects work as
40  /// expected. Each scoped pause increments the pause request counter, so
41  /// event processing will only proceed once all ScopedPause objects are
42  /// unwound.
44  {
45  public:
47  : myQueue(queue)
48  {
49  queue->pause();
50  }
51 
53  {
54  myQueue->resume();
55  }
56 
57  private:
58  PDG_EventQueue* myQueue;
59  };
60 
61 public:
62  /// Constructs a new event queue
63  PDG_EventQueue(bool background);
64  ~PDG_EventQueue();
65 
66  /// Queues an event that will eventually be dispatched by the background
67  /// thread running in this event queue instance.
68  void queueEvent(
69  const PDG_EventEmitter* emitter,
70  const PDG_Event& event);
71 
72  /// Emits an event immediately on the calling thread
73  void emitEvent(
74  const PDG_EventEmitter* emitter,
75  const PDG_Event& event) const;
76 
77  /// Waits for all queued events to finish, and prevents new events from
78  /// being added until that condition is reached. Blocks the caller. If
79  /// an event emitter is passed into this function, it gets deregistered
80  /// before unblocking the event queue.
81  void waitAllEvents(PDG_EventEmitter* emitter=nullptr);
82 
83  /// Waits until the specified emitter is registered, to ensure that all
84  /// events for that emitter have been flushed
85  void waitEmitter(const PDG_EventEmitter* emitter);
86 
87  /// Returns whether or not the event queue attempts to consolidate
88  /// consecutive events of the same type
89  inline bool isConsolidateEvents() const
90  { return myIsConsolidateEvents; }
91 
92  /// Configures the event consolidation flag
93  inline void setConsolidateEvents(bool consolidate)
94  { myIsConsolidateEvents = consolidate; }
95 
96 private:
97  friend class PDG_EventEmitter;
98  friend class PDG_GraphContext;
99 
100  struct EmitterInfo
101  {
102  PDG_EventHandlerArray myHandlers;
103  PDG_EventFilterArray myFilters;
104 
105  void add(
106  PDG_EventHandler* handler,
107  const PDG_EventFilter& filter,
108  PDG_EventFilterMap& filter_map);
109  bool remove(
110  PDG_EventHandler* handler,
111  PDG_EventFilterMap& filter_map);
112  void removeAll(
113  PDG_EventHandlerArray& removed,
114  PDG_EventFilterMap& filter_map,
115  bool user_only);
116  void emitEvent(const PDG_Event& event) const;
117  };
118 
119  enum EmitterUpdate
120  {
121  eHandlerAdd,
122  eHandlerRemove,
123  eHandlerRemoveAll
124  };
125 
126  struct DeferredEmitterUpdate
127  {
128  PDG_EventEmitter* myEmitter;
129  PDG_EventHandler* myHandler;
130 
131  PDG_EventFilter myFilter;
132 
133  EmitterUpdate myType;
134  bool myUserOnly;
135  };
136 
137  using EventInfo = std::pair<
138  const PDG_EventEmitter*, PDG_Event>;
139  using EventArray = UT_Array<EventInfo>;
140  using EventQueue = UT_ConcurrentQueue<EventInfo>;
141  using EmitterMap = UT_ConcurrentHashMap<
142  PDG_EventEmitter*, EmitterInfo>;
143  using EmitterArrayMap = UT_UniquePtr<EmitterMap[]>;
144  using EmitterUpdateArray = UT_ConcurrentVector<DeferredEmitterUpdate>;
145  using ActiveEmitter = UT_ThreadSpecificValue<
146  const PDG_EventEmitter*>;
147 
148  class EventFunctor
149  {
150  public:
151  EventFunctor(
153  const EventArray& events);
154  ~EventFunctor();
155 
156  void operator()(
157  const UT_BlockedRange<int64> &range) const;
158 
159  private:
160  PDG_EventQueue* myQueue;
161  const EventArray& myEvents;
162  };
163 
164 private:
165  void terminate();
166 
167  exint emitterIndex(const PDG_EventEmitter* emitter) const;
168 
169  void registerEmitter(PDG_EventEmitter* emitter);
170  void deregisterEmitter(PDG_EventEmitter* emitter);
171 
172  bool addEventHandler(
173  PDG_EventEmitter* emitter,
174  PDG_EventHandler* handler,
175  const PDG_EventFilter& filter);
176  void removeEventHandler(
177  PDG_EventEmitter* emitter,
178  PDG_EventHandler* handler);
179  void removeAllEventHandlers(
180  PDG_EventEmitter* emitter,
181  bool user_only);
182  void findEventHandlers(
183  const PDG_EventEmitter* emitter,
184  PDG_EventHandlerArray& handlers) const;
185 
186  void processQueueStats(int print_limit);
187  void processEvent(
188  const EventInfo& info,
189  const PDG_EventEmitter*& active_emitter);
190  void processEvents();
191 
192  inline void pause()
193  { myPauseRequests.add(1); }
194  inline void resume()
195  { myPauseRequests.add(-1); }
196 
197  static void* runThread(void* param);
198 
199 private:
200  EventQueue myEvents;
201  EmitterArrayMap myEmitters;
202  EmitterUpdateArray myEmitterUpdates;
203  ActiveEmitter myActiveEmitter;
204 
205  UT_Thread* myThread;
206 
207 #if PDG_EVENT_QUEUE_STATS
208  exint myDropEventCount;
209  exint myQueuedEventCount;
210  exint myPoppedEventCount;
211  exint myProcessedEventCount;
212  exint myQueueLogTime;
213 #endif
214 
215  exint myEmitterMapSize;
216  exint myProcessSleepTime;
217  SYS_AtomicInt32 myPauseRequests;
218 
219  bool myIsTerminating;
220  bool myIsWaiting;
221  bool myIsProcessing;
222  bool myIsConsolidateEvents;
223 };
224 
225 #endif
GLenum GLint * range
Definition: glcorearb.h:1925
#define PDG_API
Definition: PDG_API.h:23
int64 exint
Definition: SYS_Types.h:125
bool isConsolidateEvents() const
std::unique_ptr< T, Deleter > UT_UniquePtr
A smart pointer for unique ownership of dynamically allocated objects.
Definition: UT_UniquePtr.h:39
struct _cl_event * event
Definition: glcorearb.h:2961
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the queue
Definition: thread.h:623
#define UT_ConcurrentHashMap
GLenum GLfloat param
Definition: glcorearb.h:104
void setConsolidateEvents(bool consolidate)
Configures the event consolidation flag.
void pause(int delay) noexcept
Definition: thread.h:102
ScopedPause(PDG_EventQueue *queue)
ImageBuf OIIO_API add(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
Declare prior to use.
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition: glcorearb.h:1297