HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NET_PacketSocket.h
Go to the documentation of this file.
1 /*
2  * PROPRIETARY INFORMATION. This software is proprietary to
3  * Side Effects Software Inc., and is not to be reproduced,
4  * transmitted, or disclosed in any way without written permission.
5  *
6  * NAME: NET_PacketSocket.h
7  *
8  * COMMENTS:
9  *
10  *
11  */
12 
13 #ifndef __NET_PACKETSOCKET_H__
14 #define __NET_PACKETSOCKET_H__
15 
16 #include "NET_API.h"
17 
18 #include <UT/UT_BoostAsio.h>
19 #include <UT/UT_Debug.h>
20 #include <UT/UT_NonCopyable.h>
21 #include <UT/UT_SharedPtr.h>
22 #include <UT/UT_StringHolder.h>
23 #include <UT/UT_Endian.h>
24 #include <UT/UT_WorkBuffer.h>
25 #include <UT/UT_StringArray.h>
26 
27 NET_API bool
28 net_uncompressBuffer(UT_WorkBuffer& wbuf, exint comp_size, exint orig_size);
29 
31 {
32 private:
33  class Impl : public UTenable_shared_from_this<Impl>
34  {
35  public:
36  explicit Impl(ASIO_TCPSocket&& socket)
37  : myTimer(socket.get_executor()), myStream(std::move(socket))
38  {
39  }
40 
41  ASIO_TCPSocket& stream() { return myStream; }
42  void resetTimer()
43  {
44  UT_WeakPtr<Impl> me = weak_from_this();
45  myTimer.expires_from_now(hboost::posix_time::minutes(10));
46  myTimer.async_wait(
47  [me = weak_from_this()](
48  const hboost::system::error_code& ec)
49  {
50  if (ec == hboost::asio::error::operation_aborted)
51  return;
52  if (auto s = me.lock(); s)
53  {
54  s->close();
55  }
56  });
57  }
58  void cancelTimer()
59  {
60  myTimer.cancel();
61  }
62 
63  void close()
64  {
65  // Closing the socket will propogate
66  hboost::system::error_code ignored_ec;
67  myStream.shutdown(ASIO_TCPSocket::shutdown_both, ignored_ec);
68  myStream.close(ignored_ec);
69  myTimer.cancel();
70  }
71 
72  private:
73  /// {@
74  /// Used to kill the socket if no activity has occurred within the specified
75  /// amount of time.
76  ASIO_DeadlineTimer myTimer;
77  hboost::posix_time::time_duration myTimeout;
78  /// @}
79  ASIO_TCPSocket myStream;
80  };
81 
82 public:
83  explicit NET_PacketSocket(ASIO_TCPSocket&& socket);
84  explicit NET_PacketSocket(ASIO_IOContext& ctx);
85  virtual ~NET_PacketSocket() = default;
87 
88  /// Shutdown the socket. After this is called the socket becomes dead
89  /// and no other calls should be made.
91  {
92  hboost::asio::dispatch(
93  myImpl->stream().get_executor(),
94  [me = myImpl->weak_from_this()]()
95  {
96  if (auto s = me.lock(); s)
97  s->close();
98  });
99  }
100  /// Access to the underlying socket. This only meant to be used in cases
101  /// where information about the connection is required (ie. remote endpoint)
102  /// NB: this method should only ever be called prior to the first read/write
103  /// of the socket for thread safety.
104  ASIO_TCPSocket& nextLayer() { return myImpl->stream(); }
105 
106  template <typename DynamicBuffer, typename Handler>
108  Handler,
109  void(hboost::system::error_code, std::size_t))
110  asyncRead(DynamicBuffer& buffer, Handler&& handler);
111  template <typename Handler>
113  Handler,
114  void(hboost::system::error_code, std::size_t))
115  asyncWrite(UT_StringArray&& buffers, Handler&& handler);
116  template <typename Handler>
117  HBOOST_ASIO_INITFN_RESULT_TYPE(Handler, void(hboost::system::error_code))
118  asyncConnect(const ASIO_TCPSocket::endpoint_type& ep, Handler&& handler)
119  {
120  return myImpl->stream().async_connect(
121  ep, std::forward<Handler>(handler));
122  }
123 
124 private:
125  template <class AsyncStream, class DynamicBuffer>
126  class AsyncReadImpl;
127  template <class AsyncStream>
128  class AsyncWriteImpl;
129 
130  UT_SharedPtr<Impl> myImpl;
131 };
132 
133 // -----------------------------------------------------------------------------
134 // Packet Read Op
135 // -----------------------------------------------------------------------------
136 
137 template <typename AsyncStream, typename DynamicBuffer>
138 class NET_PacketSocket::AsyncReadImpl
139 {
140 public:
141  AsyncReadImpl(
142  AsyncStream& stream,
144  DynamicBuffer& buffer)
145  : myStream(stream), myImpl(backend), myBuffer(buffer)
146  {
147  }
148 
149  template <typename Self>
150  void operator()(
151  Self &self,
152  hboost::system::error_code ec = {},
153  std::size_t bytes_transfered = 0)
154  {
155  myBytesWritten += bytes_transfered;
156 
157  UT_SharedPtr<NET_PacketSocket::Impl> impl = myImpl.lock();
158  if (!impl)
159  {
160  return self.complete(
161  hboost::asio::error::operation_aborted, myBytesWritten);
162  }
163 
164  if (ec)
165  {
166  return self.complete(ec, myBytesWritten);
167  }
168 
169  if (myIsFirst)
170  {
171  // If this is our first request then kick off a read
172  myIsFirst = false;
173  impl->resetTimer();
174  myStream.async_read_some(myBuffer.prepare(512), std::move(self));
175  return;
176  }
177 
178  // We received more data. Reset the connection dead timer.
179  impl->cancelTimer();
180 
181  myBuffer.commit(bytes_transfered);
182 
183  // If the packet is not compressed and no read size has been read then
184  // check the header
185  if (!myIsCompressed && myReadPacketSize < 0)
186  {
187  // Parse the original payload length and compressed flag.
188  static constexpr int header_size = 5;
189  if (myBuffer.size() >= header_size)
190  {
191  uint32 len = 0;
192  static_assert(sizeof(len) == 4);
193  ASIO_MutableBuffer tmp(&len, sizeof(len));
194  ASIO_ConstBuffer read_buffer = myBuffer.data();
195  // Retrieve the payload length
196  hboost::asio::buffer_copy(tmp, read_buffer);
197  UTtomips(len);
198  myReadPacketSize = len;
199  myFinalPacketSize = len;
200  // We have a max size of 4096
201  if (myReadPacketSize > 4096)
202  {
203  return self.complete(
204  hboost::asio::error::message_size, myBytesWritten);
205  }
206 
207  // Check if the playload is compressed or not.
208  myIsCompressed
209  = *((const char*)read_buffer.data() + sizeof(unsigned))
210  == 1;
211  if (myIsCompressed)
212  myReadPacketSize = -1;
213  myBuffer.consume(header_size);
214  }
215  }
216  // If the packet is compressed read the compressed size
217  if (myIsCompressed && myReadPacketSize < 0)
218  {
219  static constexpr int header_size = 4;
220  if (myBuffer.size() >= header_size)
221  {
222  uint32 len = 0;
223  static_assert(sizeof(len) == header_size);
224  ASIO_MutableBuffer tmp(&len, sizeof(len));
225  ASIO_ConstBuffer read_buffer = myBuffer.data();
226  // Retrieve the payload length
227  hboost::asio::buffer_copy(tmp, read_buffer);
228  UTtomips(len);
229  myReadPacketSize = len;
230  myBuffer.consume(header_size);
231 
232  /// Something went wrong. The uncompressed size should never
233  /// be smaller then the compressed size.
234  if (myFinalPacketSize < myReadPacketSize)
235  {
236  return self.complete(
237  hboost::asio::error::connection_aborted,
238  myBytesWritten);
239  }
240  }
241  }
242  // If there is at least enough read data for this packet then handle the
243  // request
244  if (myReadPacketSize >= 0 && myBuffer.size() >= myReadPacketSize)
245  {
246  ASIO_ConstBuffer read_buffer = myBuffer.data();
248  (const char*)read_buffer.data(), myReadPacketSize);
249 
250  if (myIsCompressed)
251  {
253  data, myReadPacketSize, myFinalPacketSize))
254  {
255  return self.complete(
257  hboost::system::errc::bad_message),
258  myBytesWritten);
259  }
260  }
261 
262  // Reset the buffer information so that we can read again
263  self.complete({}, myBytesWritten);
264  return;
265  }
266 
267  // Guess our next read size
268  std::size_t read_size = 512;
269  if (myReadPacketSize > 0)
270  read_size = static_cast<std::size_t>(myReadPacketSize);
271  const auto size = myBuffer.size();
272  const auto limit = myBuffer.max_size() - size;
273  std::size_t actual_read_size = std::min(
274  std::max(std::size_t(512), myBuffer.capacity() - size),
275  std::min(read_size, limit));
276  impl->resetTimer();
277  // We need more data so read more and then continue parsing.
278  myStream.async_read_some(
279  myBuffer.prepare(actual_read_size), std::move(self));
280  }
281 
282 private:
283  DynamicBuffer& myBuffer;
284  bool myIsFirst = true;
285  bool myIsCompressed = false;
286  exint myReadPacketSize = -1;
287  exint myFinalPacketSize = -1;
288  std::size_t myBytesWritten = 0;
289  AsyncStream& myStream;
291 };
292 
293 // -----------------------------------------------------------------------------
294 // NET_IPacketSocket::AsyncWriteImpl
295 // -----------------------------------------------------------------------------
296 
297 template <typename AsyncStream>
298 class NET_PacketSocket::AsyncWriteImpl
299 {
300 public:
301  AsyncWriteImpl(
302  AsyncStream& stream,
304  UT_StringArray&& write_queue)
305  : myStream(stream), myImpl(impl), myWriteQueue(write_queue)
306  {
307  }
308 
309  template <typename Self>
310  void operator()(
311  Self& self,
312  hboost::system::error_code ec = {},
313  std::size_t bytes_transfered = 0)
314  {
315  myBytesWritten += bytes_transfered;
316 
317  UT_SharedPtr<NET_PacketSocket::Impl> impl = myImpl.lock();
318  if (!impl)
319  {
320  return self.complete(
321  hboost::asio::error::operation_aborted, myBytesWritten);
322  }
323 
324  if (ec)
325  {
326  return self.complete(ec, myBytesWritten);
327  }
328 
329  exint total = bytes_transfered;
330  while (total > 0 && myWriteQueue.size() > 0)
331  {
332  exint i = myIndex + total;
333  if (i >= myWriteQueue[0].length())
334  {
335  exint remain = myWriteQueue[0].length() - myIndex;
336  total -= remain;
337  myIndex = 0;
338  myWriteQueue.removeIndex(0);
339  }
340  else
341  {
342  myIndex += total;
343  break;
344  }
345  }
346 
347  if (myWriteQueue.size() == 0)
348  {
349  return self.complete({}, myBytesWritten);
350  }
351 
352  // To improve performance dont use scatter/gather if the messages sent
353  // are only 1.
354  if (myWriteQueue.size() == 1)
355  {
357  myWriteQueue[0].buffer() + myIndex,
358  myWriteQueue[0].length() - myIndex);
359  myStream.async_write_some(buffer, std::move(self));
360  }
361  else
362  {
363  std::vector<ASIO_ConstBuffer> buffers;
364  buffers.reserve(myWriteQueue.size());
365  buffers.emplace_back(
366  myWriteQueue[0].buffer() + myIndex,
367  myWriteQueue[0].length() - myIndex);
368  for (exint i = 1; i < myWriteQueue.size(); i++)
369  buffers.emplace_back(
370  myWriteQueue[i].buffer(), myWriteQueue[i].length());
371  myStream.async_write_some(buffers, std::move(self));
372  }
373  }
374 
375 private:
376  UT_StringArray myWriteQueue;
377  exint myIndex = 0;
378  std::size_t myBytesWritten = 0;
379  AsyncStream& myStream;
381 };
382 
383 template <typename DynamicBuffer, typename Handler>
385  Handler,
386  void(hboost::system::error_code, std::size_t))
387 NET_PacketSocket::asyncRead(DynamicBuffer& buffer, Handler&& handler)
388 {
390  return hboost::asio::async_compose<
391  Handler,
392  void(hboost::system::error_code, std::size_t)>(
393  AsyncReadImpl{myImpl->stream(), myImpl, buffer}, handler,
394  myImpl->stream());
395 }
396 
397 template <typename Handler>
399  Handler,
400  void(hboost::system::error_code, std::size_t))
401 NET_PacketSocket::asyncWrite(UT_StringArray&& queue, Handler&& handler)
402 {
403  // Convert each of the messages to the packet format so that we can
404  // send all of them in one go instead of having to send each packet
405  // one at a time.
406  for (UT_StringHolder& msg : queue)
407  {
408  UT_WorkBuffer wbuf;
409  // Always include the null terminator. I have no clue why this is needed
410  // but there are some checks that specifically include the null
411  // terminator.
412  uint32 orig_len = static_cast<uint32>(msg.length() + 1);
413  uint32 len = orig_len;
414  UTtomips(len);
415  // TODO: add support for compressed responses
416  wbuf.append((char*)&len, sizeof(uint32));
417  wbuf.append('\0');
418  wbuf.append(msg);
419  // Make sure to always include the null character.
420  wbuf.append('\0');
421  msg = std::move(wbuf);
422  }
423 
424  return hboost::asio::async_compose<
425  Handler, void(hboost::system::error_code, std::size_t)>(
426  AsyncWriteImpl{myImpl->stream(), myImpl, std::move(queue)},
427  handler, myImpl->stream()
428  );
429 }
430 
431 #endif // __NET_PACKETSOCKET_H__
GLuint GLuint stream
Definition: glcorearb.h:1832
hboost::asio::ip::tcp::socket ASIO_TCPSocket
Definition: UT_BoostAsio.h:44
void
Definition: png.h:1083
hboost::asio::const_buffer ASIO_ConstBuffer
Definition: UT_BoostAsio.h:62
GLboolean * data
Definition: glcorearb.h:131
GLsizei const GLfloat * value
Definition: glcorearb.h:824
hboost::asio::mutable_buffer ASIO_MutableBuffer
Definition: UT_BoostAsio.h:63
int64 exint
Definition: SYS_Types.h:125
GLdouble s
Definition: glad.h:3009
GLuint GLsizei GLsizei * length
Definition: glcorearb.h:795
NET_API bool net_uncompressBuffer(UT_WorkBuffer &wbuf, exint comp_size, exint orig_size)
ImageBuf OIIO_API min(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
hboost::asio::deadline_timer ASIO_DeadlineTimer
Definition: UT_BoostAsio.h:69
void close() override
GLuint buffer
Definition: glcorearb.h:660
UT_ErrorCode make_error_code(NET::CurlError e)
#define NET_API
Definition: NET_API.h:9
void UTtomips(int16 *, int64)
Definition: UT_Endian.h:145
std::enable_shared_from_this< T > UTenable_shared_from_this
Definition: UT_SharedPtr.h:39
Definition: core.h:760
std::shared_ptr< T > UT_SharedPtr
Wrapper around std::shared_ptr.
Definition: UT_SharedPtr.h:36
#define UT_NON_COPYABLE(CLASS)
Define deleted copy constructor and assignment operator inside a class.
const GLuint * buffers
Definition: glcorearb.h:661
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the queue
Definition: thread.h:623
hboost::asio::io_context ASIO_IOContext
Definition: UT_BoostAsio.h:74
HBOOST_ASIO_INITFN_RESULT_TYPE(Handler, void(hboost::system::error_code, std::size_t)) NET_PacketSocket
GLsizeiptr size
Definition: glcorearb.h:664
ASIO_TCPSocket & nextLayer()
SYS_FORCE_INLINE void append(char character)
ImageBuf OIIO_API max(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
unsigned int uint32
Definition: SYS_Types.h:40
std::weak_ptr< T > UT_WeakPtr
Definition: UT_SharedPtr.h:49
Definition: format.h:895