13 #ifndef __NET_PACKETSOCKET_H__
14 #define __NET_PACKETSOCKET_H__
37 : myTimer(socket.get_executor()), myStream(std::move(socket))
45 myTimer.expires_from_now(hboost::posix_time::minutes(10));
47 [me = weak_from_this()](
48 const hboost::system::error_code& ec)
50 if (ec == hboost::asio::error::operation_aborted)
52 if (
auto s = me.lock();
s)
66 hboost::system::error_code ignored_ec;
67 myStream.shutdown(ASIO_TCPSocket::shutdown_both, ignored_ec);
68 myStream.close(ignored_ec);
77 hboost::posix_time::time_duration myTimeout;
92 hboost::asio::dispatch(
93 myImpl->stream().get_executor(),
94 [me = myImpl->weak_from_this()]()
96 if (
auto s = me.lock();
s)
106 template <
typename DynamicBuffer,
typename Handler>
109 void(hboost::system::error_code, std::size_t))
110 asyncRead(DynamicBuffer&
buffer, Handler&& handler);
111 template <typename Handler>
114 void(hboost::system::error_code, std::
size_t))
116 template <typename Handler>
120 return myImpl->stream().async_connect(
121 ep, std::forward<Handler>(handler));
125 template <
class AsyncStream,
class DynamicBuffer>
127 template <
class AsyncStream>
128 class AsyncWriteImpl;
137 template <
typename AsyncStream,
typename DynamicBuffer>
138 class NET_PacketSocket::AsyncReadImpl
145 : myStream(stream), myImpl(backend), myBuffer(buffer)
149 template <
typename Self>
152 hboost::system::error_code ec = {},
153 std::size_t bytes_transfered = 0)
155 myBytesWritten += bytes_transfered;
160 return self.complete(
161 hboost::asio::error::operation_aborted, myBytesWritten);
166 return self.complete(ec, myBytesWritten);
174 myStream.async_read_some(myBuffer.prepare(512), std::move(
self));
181 myBuffer.commit(bytes_transfered);
185 if (!myIsCompressed && myReadPacketSize < 0)
188 static constexpr
int header_size = 5;
189 if (myBuffer.size() >= header_size)
192 static_assert(
sizeof(len) == 4);
196 hboost::asio::buffer_copy(tmp, read_buffer);
198 myReadPacketSize = len;
199 myFinalPacketSize = len;
201 if (myReadPacketSize > 4096)
203 return self.complete(
204 hboost::asio::error::message_size, myBytesWritten);
209 = *((
const char*)read_buffer.data() +
sizeof(unsigned))
212 myReadPacketSize = -1;
213 myBuffer.consume(header_size);
217 if (myIsCompressed && myReadPacketSize < 0)
219 static constexpr
int header_size = 4;
220 if (myBuffer.size() >= header_size)
223 static_assert(
sizeof(len) == header_size);
227 hboost::asio::buffer_copy(tmp, read_buffer);
229 myReadPacketSize = len;
230 myBuffer.consume(header_size);
234 if (myFinalPacketSize < myReadPacketSize)
236 return self.complete(
237 hboost::asio::error::connection_aborted,
244 if (myReadPacketSize >= 0 && myBuffer.size() >= myReadPacketSize)
248 (
const char*)read_buffer.data(), myReadPacketSize);
253 data, myReadPacketSize, myFinalPacketSize))
255 return self.complete(
257 hboost::system::errc::bad_message),
263 self.complete({}, myBytesWritten);
268 std::size_t read_size = 512;
269 if (myReadPacketSize > 0)
270 read_size =
static_cast<std::size_t
>(myReadPacketSize);
271 const auto size = myBuffer.size();
272 const auto limit = myBuffer.max_size() -
size;
273 std::size_t actual_read_size =
std::min(
278 myStream.async_read_some(
279 myBuffer.prepare(actual_read_size), std::move(
self));
283 DynamicBuffer& myBuffer;
284 bool myIsFirst =
true;
285 bool myIsCompressed =
false;
286 exint myReadPacketSize = -1;
287 exint myFinalPacketSize = -1;
288 std::size_t myBytesWritten = 0;
289 AsyncStream& myStream;
297 template <
typename AsyncStream>
298 class NET_PacketSocket::AsyncWriteImpl
305 : myStream(stream), myImpl(impl), myWriteQueue(write_queue)
309 template <
typename Self>
312 hboost::system::error_code ec = {},
313 std::size_t bytes_transfered = 0)
315 myBytesWritten += bytes_transfered;
320 return self.complete(
321 hboost::asio::error::operation_aborted, myBytesWritten);
326 return self.complete(ec, myBytesWritten);
329 exint total = bytes_transfered;
330 while (total > 0 && myWriteQueue.size() > 0)
332 exint i = myIndex + total;
333 if (i >= myWriteQueue[0].
length())
335 exint remain = myWriteQueue[0].length() - myIndex;
338 myWriteQueue.removeIndex(0);
347 if (myWriteQueue.size() == 0)
349 return self.complete({}, myBytesWritten);
354 if (myWriteQueue.size() == 1)
357 myWriteQueue[0].
buffer() + myIndex,
358 myWriteQueue[0].
length() - myIndex);
359 myStream.async_write_some(
buffer, std::move(
self));
363 std::vector<ASIO_ConstBuffer>
buffers;
364 buffers.reserve(myWriteQueue.size());
365 buffers.emplace_back(
366 myWriteQueue[0].
buffer() + myIndex,
367 myWriteQueue[0].
length() - myIndex);
368 for (
exint i = 1; i < myWriteQueue.size(); i++)
369 buffers.emplace_back(
370 myWriteQueue[i].buffer(), myWriteQueue[i].length());
371 myStream.async_write_some(buffers, std::move(
self));
378 std::size_t myBytesWritten = 0;
379 AsyncStream& myStream;
383 template <
typename DynamicBuffer,
typename Handler>
386 void(hboost::system::error_code, std::size_t))
390 return hboost::asio::async_compose<
392 void(hboost::system::error_code, std::size_t)>(
393 AsyncReadImpl{myImpl->stream(), myImpl, buffer}, handler,
397 template <
typename Handler>
400 void(hboost::system::error_code, std::size_t))
412 uint32 orig_len =
static_cast<uint32>(msg.length() + 1);
421 msg = std::move(wbuf);
424 return hboost::asio::async_compose<
425 Handler,
void(hboost::system::error_code, std::size_t)>(
426 AsyncWriteImpl{myImpl->stream(), myImpl, std::move(queue)},
427 handler, myImpl->stream()
431 #endif // __NET_PACKETSOCKET_H__
hboost::asio::ip::tcp::socket ASIO_TCPSocket
hboost::asio::const_buffer ASIO_ConstBuffer
GLsizei const GLfloat * value
hboost::asio::mutable_buffer ASIO_MutableBuffer
GLuint GLsizei GLsizei * length
NET_API bool net_uncompressBuffer(UT_WorkBuffer &wbuf, exint comp_size, exint orig_size)
ImageBuf OIIO_API min(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
hboost::asio::deadline_timer ASIO_DeadlineTimer
UT_ErrorCode make_error_code(NET::CurlError e)
void UTtomips(int16 *, int64)
std::enable_shared_from_this< T > UTenable_shared_from_this
std::shared_ptr< T > UT_SharedPtr
Wrapper around std::shared_ptr.
#define UT_NON_COPYABLE(CLASS)
Define deleted copy constructor and assignment operator inside a class.
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the queue
hboost::asio::io_context ASIO_IOContext
HBOOST_ASIO_INITFN_RESULT_TYPE(Handler, void(hboost::system::error_code, std::size_t)) NET_PacketSocket
ASIO_TCPSocket & nextLayer()
SYS_FORCE_INLINE void append(char character)
ImageBuf OIIO_API max(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
std::weak_ptr< T > UT_WeakPtr