HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
msgpipe.C
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2024
3  * Side Effects Software Inc. All rights reserved.
4  *
5  * Redistribution and use of Houdini Development Kit samples in source and
6  * binary forms, with or without modification, are permitted provided that the
7  * following conditions are met:
8  * 1. Redistributions of source code must retain the above copyright notice,
9  * this list of conditions and the following disclaimer.
10  * 2. The name of Side Effects Software may not be used to endorse or
11  * promote products derived from this software without specific prior
12  * written permission.
13  *
14  * THIS SOFTWARE IS PROVIDED BY SIDE EFFECTS SOFTWARE `AS IS' AND ANY EXPRESS
15  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
16  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN
17  * NO EVENT SHALL SIDE EFFECTS SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT,
18  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
19  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
20  * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
21  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
22  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
23  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  *----------------------------------------------------------------------------
26  */
27 
28 
29 #include <CMD/CMD_Args.h>
30 #include <UT/UT_Assert.h>
31 #include <UT/UT_Exit.h>
32 #include <UT/UT_IStream.h>
33 #include <UT/UT_Main.h>
34 #include <UT/UT_OFStream.h>
35 #include <UT/UT_NetMessage.h>
36 
37 #include <ostream>
38 #include <iostream>
39 #include <stdio.h>
40 
41 
42 static void
43 usage(const char *program)
44 {
45  std::cerr << "Usage: " << program << " tracker port thisslicenumber numslice\n";
46 }
47 
48 #define DATA_LEN 80000
49 #define PIPE_ATTEMPTS 10
50 
51 
52 // Transfer data between peers using a UT_NetMessagePipe
53 //
54 // Build using:
55 // hcustom -s msgpipe.C
56 //
57 // Example usage:
58 // python simtracker.py 8000 9000
59 // msgpipe localhost 8000 0 2
60 // msgpipe localhost 8000 1 2
61 //
62 int
63 theMain(int argc, char *argv[])
64 {
65  CMD_Args args;
66 
67  args.initialize(argc, argv);
68 
69  if (args.argc() != 5)
70  {
71  usage(argv[0]);
72  return 1;
73  }
74 
75  const char *tracker = args.argv(1);
76  int port = SYSatoi(args.argv(2));
77  int thisslice = SYSatoi(args.argv(3));
78  int numslice = SYSatoi(args.argv(4));
79 
80  std::cerr << "Building connection to tracker " << tracker << ":" << port << std::endl;
81  std::cerr << "I am slice " << thisslice << " out of " << numslice << std::endl;
82 
83  // Use a net exchange to send some data between the slices.
84  {
85  std::cerr << "Simple Exchange Test" << std::endl;
86  UT_NetExchange netxchg(tracker, port, thisslice, numslice, "normalexchange");
87 
88  for (int peer = 0; peer < numslice; peer++)
89  {
90  UT_NetMessage *msg = new UT_NetMessage();
91  msg->setWriteDataLength(8 + DATA_LEN * sizeof(int32));
92 
93  for (int i = 0; i < DATA_LEN; i++)
94  {
95  msg->overwriteInt32(8 + i * sizeof(int32), i + peer);
96  }
97  netxchg.sendData(peer, msg);
98  }
99 
101  if (!netxchg.receiveDataLoop(completed, numslice))
102  {
103  std::cerr << "Failure to do traditional data exchange, got " << completed.entries() << std::endl;
104  return -1;
105  }
106 
107  // Now verify our messages are legit. They should all be encoded
108  // with our peer infor.
109  for (int i = 0; i < completed.entries(); i++)
110  {
111  UT_NetMessage *msg = completed(i);
112 
113  for (int i = 0; i < DATA_LEN; i++)
114  {
115  int32 val;
116  int32 goalval;
117  goalval = i + thisslice;
118 
119  val = msg->extractInt32(8 + i * sizeof(int32));
120 
121  if (val != goalval)
122  {
123  std::cerr << "Error, got " << val << " at integer index " << i << " rather than expected " << goalval << ", source slice was " << msg->extractInt16(6) << std::endl;
124  return -1;
125  }
126  }
127 
128  delete msg;
129  }
130  }
131 
132  // Use a net message pipe to repeatedly pump data along the same
133  // socket.
134  {
135  std::cerr << "Message Pipe Tests" << std::endl;
136  UT_NetMessagePipe pipe(tracker, port, thisslice, numslice, "messagepipe");
137 
138  // pipe.setCompressionThreshold(-1);
139 
140  if (!pipe.openPipes())
141  {
142  std::cerr << "Failed to open message pipes" << std::endl;
143  return -1;
144  }
145 
146  for (int j = 0; j < PIPE_ATTEMPTS; j++)
147  {
148  exint datalen = DATA_LEN;
149  std::cerr << "Starting pass " << j << std::endl;
150  for (int peer = 0; peer < numslice; peer++)
151  {
152  UT_NetMessage *msg = pipe.readPipe(peer);
153 
154  msg->resetLength(UT_NetMessage::STATE_READPIPE, datalen * sizeof(int32));
155 
156  msg = pipe.writePipe(peer);
157  msg->resetLength(UT_NetMessage::STATE_WRITEPIPE, datalen * sizeof(int32));
158 
159  for (int i = 0; i < datalen; i++)
160  {
161  // NOTE: No header here!
162  msg->overwriteInt32(i * sizeof(int32), i + peer + j);
163  }
164  }
165 
166  // Now do the transfer.
167  if (!pipe.transferData())
168  {
169  std::cerr << "Failed to push data across pipes!" << std::endl;
170  return -1;
171  }
172 
173  // Now see if it came through alright!
174  // Again, all the data sent to ourselves should be our own
175  // peer number.
176  for (int peer = 0; peer < numslice; peer++)
177  {
178  UT_NetMessage *msg = pipe.readPipe(peer);
179 
180  for (int i = 0; i < datalen; i++)
181  {
182  int32 goalval = i + thisslice + j;
183  int32 val;
184  val = msg->extractInt32(i * sizeof(int32));
185  if (val != goalval)
186  {
187  std::cerr << "Error, got " << val << " at integer index " << i << " rather than expected " << goalval << ", source slice was " << peer << " and this was pipe pass " << j << std::endl;
188  return -1;
189  }
190  }
191  }
192  }
193 
194  for (int peer = 0; peer < numslice; peer++)
195  {
196  UT_NetMessage *msg = pipe.readPipe(peer);
198 
199  msg = pipe.writePipe(peer);
201  }
202 
203  if (!pipe.closePipes())
204  {
205  std::cerr << "Failed to close message pipes." << std::endl;
206  return -1;
207  }
208 
209  }
210 
211  std::cerr << "All transferred successfully!" << std::endl;
212 
213  return 0;
214 }
215 UT_MAIN(theMain);// exit with proper tear down
#define PIPE_ATTEMPTS
Definition: msgpipe.C:49
int int32
Definition: SYS_Types.h:39
const char * argv(unsigned i) const
Definition: UT_Args.h:50
int32 extractInt32(exint offset)
int64 exint
Definition: SYS_Types.h:125
void overwriteInt32(exint offset, int32 val)
bool closePipes(int timeoutms=100)
Shuts down the pipes, returns true if successful.
void setWriteDataLength(exint bufsize)
UT_NetMessage * readPipe(int peer)
bool receiveDataLoop(UT_Array< UT_NetMessage * > &completed, int expectedmessages, int timeoutms=100)
int argc() const
Definition: UT_Args.h:49
UT_MAIN(theMain)
exint entries() const
Alias of size(). size() is preferred.
Definition: UT_Array.h:648
int theMain(int argc, char *argv[])
Definition: msgpipe.C:63
GLint j
Definition: glad.h:2733
GLsizeiptr const void GLenum usage
Definition: glcorearb.h:664
bool transferData(int timeoutms=100)
UT_NetMessage * writePipe(int peer)
GLuint GLfloat * val
Definition: glcorearb.h:1608
void resetLength(TransmitState state, exint newlen)
Resizes, useful for pipe messages.
**But if you need a or simply need to know when the task has * completed
Definition: thread.h:613
**If you just want to fire and args
Definition: thread.h:609
Wait for connection ack.
Definition: UT_NetMessage.h:88
#define DATA_LEN
Definition: msgpipe.C:48
void initialize(int argc, const char *const argv[])
void sendData(int destpeer, const char *data, exint len)
GLbitfield GLuint program
Definition: glcorearb.h:1931
int16 extractInt16(exint offset)
bool openPipes(int timeoutms=100)
Prepares the pipes, returns true if successful.