PhysicsClientUDP.cpp 13 KB


  1. #include "PhysicsClientUDP.h"
  2. #include <enet/enet.h>
  3. #include <stdio.h>
  4. #include <string.h>
  5. #include "../Utils/b3Clock.h"
  6. #include "PhysicsClient.h"
  7. //#include "LinearMath/btVector3.h"
  8. #include "SharedMemoryCommands.h"
  9. #include <string>
  10. #include "Bullet3Common/b3Logging.h"
  11. #include "../MultiThreading/b3ThreadSupportInterface.h"
  12. void UDPThreadFunc(void* userPtr, void* lsMemory);
  13. void* UDPlsMemoryFunc();
  14. void UDPlsMemoryReleaseFunc(void* ptr);
  15. bool gVerboseNetworkMessagesClient = false;
  16. #ifndef _WIN32
  17. #include "../MultiThreading/b3PosixThreadSupport.h"
  18. b3ThreadSupportInterface* createUDPThreadSupport(int numThreads)
  19. {
  20. b3PosixThreadSupport::ThreadConstructionInfo constructionInfo("UDPThread",
  21. UDPThreadFunc,
  22. UDPlsMemoryFunc,
  23. UDPlsMemoryReleaseFunc,
  24. numThreads);
  25. b3ThreadSupportInterface* threadSupport = new b3PosixThreadSupport(constructionInfo);
  26. return threadSupport;
  27. }
  28. #elif defined(_WIN32)
  29. #include "../MultiThreading/b3Win32ThreadSupport.h"
  30. b3ThreadSupportInterface* createUDPThreadSupport(int numThreads)
  31. {
  32. b3Win32ThreadSupport::Win32ThreadConstructionInfo threadConstructionInfo("UDPThread", UDPThreadFunc, UDPlsMemoryFunc, UDPlsMemoryReleaseFunc, numThreads);
  33. b3Win32ThreadSupport* threadSupport = new b3Win32ThreadSupport(threadConstructionInfo);
  34. return threadSupport;
  35. }
  36. #endif
  37. struct UDPThreadLocalStorage
  38. {
  39. int threadId;
  40. };
  41. unsigned int b3DeserializeInt(const unsigned char* input)
  42. {
  43. unsigned int tmp = (input[3] << 24) + (input[2] << 16) + (input[1] << 8) + input[0];
  44. return tmp;
  45. }
  46. struct UdpNetworkedInternalData
  47. {
  48. ENetHost* m_client;
  49. ENetAddress m_address;
  50. ENetPeer* m_peer;
  51. ENetEvent m_event;
  52. bool m_isConnected;
  53. b3ThreadSupportInterface* m_threadSupport;
  54. b3CriticalSection* m_cs;
  55. UdpNetworkedInternalData* m_udpInternalData;
  56. SharedMemoryCommand m_clientCmd;
  57. bool m_hasCommand;
  58. bool m_hasStatus;
  59. SharedMemoryStatus m_lastStatus;
  60. b3AlignedObjectArray<char> m_stream;
  61. std::string m_hostName;
  62. int m_port;
  63. double m_timeOutInSeconds;
  64. UdpNetworkedInternalData()
  65. : m_client(0),
  66. m_peer(0),
  67. m_isConnected(false),
  68. m_threadSupport(0),
  69. m_hasCommand(false),
  70. m_hasStatus(false),
  71. m_timeOutInSeconds(60)
  72. {
  73. }
  74. bool connectUDP()
  75. {
  76. if (m_isConnected)
  77. return true;
  78. if (enet_initialize() != 0)
  79. {
  80. fprintf(stderr, "Error initialising enet");
  81. exit(EXIT_FAILURE);
  82. }
  83. m_client = enet_host_create(NULL, /* create a client host */
  84. 1, /* number of clients */
  85. 2, /* number of channels */
  86. 57600 / 8, /* incoming bandwith */
  87. 14400 / 8); /* outgoing bandwith */
  88. if (m_client == NULL)
  89. {
  90. fprintf(stderr, "Could not create client host");
  91. return false;
  92. }
  93. enet_address_set_host(&m_address, m_hostName.c_str());
  94. m_address.port = m_port;
  95. m_peer = enet_host_connect(m_client,
  96. &m_address, /* address to connect to */
  97. 2, /* number of channels */
  98. 0); /* user data supplied to
  99. the receiving host */
  100. if (m_peer == NULL)
  101. {
  102. fprintf(stderr,
  103. "No available peers for initiating an ENet "
  104. "connection.\n");
  105. return false;
  106. }
  107. /* Try to connect to server within 5 seconds */
  108. if (enet_host_service(m_client, &m_event, 5000) > 0 &&
  109. m_event.type == ENET_EVENT_TYPE_CONNECT)
  110. {
  111. puts("Connection to server succeeded.");
  112. }
  113. else
  114. {
  115. /* Either the 5 seconds are up or a disconnect event was */
  116. /* received. Reset the peer in the event the 5 seconds */
  117. /* had run out without any significant event. */
  118. enet_peer_reset(m_peer);
  119. fprintf(stderr, "Connection to server failed.");
  120. return false;
  121. }
  122. int serviceResult = enet_host_service(m_client, &m_event, 0);
  123. if (serviceResult > 0)
  124. {
  125. switch (m_event.type)
  126. {
  127. case ENET_EVENT_TYPE_CONNECT:
  128. printf("A new client connected from %x:%u.\n",
  129. m_event.peer->address.host,
  130. m_event.peer->address.port);
  131. m_event.peer->data = (void*)"New User";
  132. break;
  133. case ENET_EVENT_TYPE_RECEIVE:
  134. if (gVerboseNetworkMessagesClient)
  135. {
  136. printf(
  137. "A packet of length %lu containing '%s' was "
  138. "received from %s on channel %u.\n",
  139. m_event.packet->dataLength,
  140. (char*)m_event.packet->data,
  141. (char*)m_event.peer->data,
  142. m_event.channelID);
  143. }
  144. /* Clean up the packet now that we're done using it.
  145. > */
  146. enet_packet_destroy(m_event.packet);
  147. break;
  148. case ENET_EVENT_TYPE_DISCONNECT:
  149. printf("%s disconnected.\n", (char*)m_event.peer->data);
  150. break;
  151. default:
  152. {
  153. printf("unknown event type: %d.\n", m_event.type);
  154. }
  155. }
  156. }
  157. else if (serviceResult > 0)
  158. {
  159. puts("Error with servicing the client");
  160. return false;
  161. }
  162. m_isConnected = true;
  163. return m_isConnected;
  164. }
  165. bool checkData()
  166. {
  167. bool hasStatus = false;
  168. int serviceResult = enet_host_service(m_client, &m_event, 0);
  169. if (serviceResult > 0)
  170. {
  171. switch (m_event.type)
  172. {
  173. case ENET_EVENT_TYPE_CONNECT:
  174. printf("A new client connected from %x:%u.\n",
  175. m_event.peer->address.host,
  176. m_event.peer->address.port);
  177. m_event.peer->data = (void*)"New User";
  178. break;
  179. case ENET_EVENT_TYPE_RECEIVE:
  180. {
  181. if (gVerboseNetworkMessagesClient)
  182. {
  183. printf(
  184. "A packet of length %lu containing '%s' was "
  185. "received from %s on channel %u.\n",
  186. m_event.packet->dataLength,
  187. (char*)m_event.packet->data,
  188. (char*)m_event.peer->data,
  189. m_event.channelID);
  190. }
  191. int packetSizeInBytes = b3DeserializeInt(m_event.packet->data);
  192. if (packetSizeInBytes == m_event.packet->dataLength)
  193. {
  194. SharedMemoryStatus* statPtr = (SharedMemoryStatus*)&m_event.packet->data[4];
  195. if (statPtr->m_type == CMD_STEP_FORWARD_SIMULATION_COMPLETED)
  196. {
  197. SharedMemoryStatus dummy;
  198. dummy.m_type = CMD_STEP_FORWARD_SIMULATION_COMPLETED;
  199. m_lastStatus = dummy;
  200. m_stream.resize(0);
  201. }
  202. else
  203. {
  204. m_lastStatus = *statPtr;
  205. int streamOffsetInBytes = 4 + sizeof(SharedMemoryStatus);
  206. int numStreamBytes = packetSizeInBytes - streamOffsetInBytes;
  207. m_stream.resize(numStreamBytes);
  208. for (int i = 0; i < numStreamBytes; i++)
  209. {
  210. m_stream[i] = m_event.packet->data[i + streamOffsetInBytes];
  211. }
  212. }
  213. }
  214. else
  215. {
  216. printf("unknown status message received\n");
  217. }
  218. enet_packet_destroy(m_event.packet);
  219. hasStatus = true;
  220. break;
  221. }
  222. case ENET_EVENT_TYPE_DISCONNECT:
  223. {
  224. printf("%s disconnected.\n", (char*)m_event.peer->data);
  225. break;
  226. }
  227. default:
  228. {
  229. printf("unknown event type: %d.\n", m_event.type);
  230. }
  231. }
  232. }
  233. else if (serviceResult > 0)
  234. {
  235. puts("Error with servicing the client");
  236. }
  237. return hasStatus;
  238. }
  239. };
  240. enum UDPThreadEnums
  241. {
  242. eUDPRequestTerminate = 13,
  243. eUDPIsUnInitialized,
  244. eUDPIsInitialized,
  245. eUDPInitializationFailed,
  246. eUDPHasTerminated
  247. };
  248. enum UDPCommandEnums
  249. {
  250. eUDPIdle = 13,
  251. eUDP_ConnectRequest,
  252. eUDP_Connected,
  253. eUDP_ConnectionFailed,
  254. eUDP_DisconnectRequest,
  255. eUDP_Disconnected,
  256. };
  257. void UDPThreadFunc(void* userPtr, void* lsMemory)
  258. {
  259. printf("UDPThreadFunc thread started\n");
  260. // UDPThreadLocalStorage* localStorage = (UDPThreadLocalStorage*)lsMemory;
  261. UdpNetworkedInternalData* args = (UdpNetworkedInternalData*)userPtr;
  262. // int workLeft = true;
  263. b3Clock clock;
  264. clock.reset();
  265. bool init = true;
  266. if (init)
  267. {
  268. args->m_cs->lock();
  269. args->m_cs->setSharedParam(0, eUDPIsInitialized);
  270. args->m_cs->unlock();
  271. double deltaTimeInSeconds = 0;
  272. do
  273. {
  274. b3Clock::usleep(0);
  275. deltaTimeInSeconds += double(clock.getTimeMicroseconds()) / 1000000.;
  276. {
  277. clock.reset();
  278. deltaTimeInSeconds = 0.f;
  279. switch (args->m_cs->getSharedParam(1))
  280. {
  281. case eUDP_ConnectRequest:
  282. {
  283. bool connected = args->connectUDP();
  284. if (connected)
  285. {
  286. args->m_cs->setSharedParam(1, eUDP_Connected);
  287. }
  288. else
  289. {
  290. args->m_cs->setSharedParam(1, eUDP_ConnectionFailed);
  291. }
  292. break;
  293. }
  294. default:
  295. {
  296. }
  297. };
  298. if (args->m_isConnected)
  299. {
  300. args->m_cs->lock();
  301. bool hasCommand = args->m_hasCommand;
  302. args->m_cs->unlock();
  303. if (hasCommand)
  304. {
  305. int sz = 0;
  306. ENetPacket* packet = 0;
  307. if (args->m_clientCmd.m_type == CMD_STEP_FORWARD_SIMULATION)
  308. {
  309. sz = sizeof(int);
  310. packet = enet_packet_create(&args->m_clientCmd.m_type, sz, ENET_PACKET_FLAG_RELIABLE);
  311. }
  312. else
  313. {
  314. sz = sizeof(SharedMemoryCommand);
  315. packet = enet_packet_create(&args->m_clientCmd, sz, ENET_PACKET_FLAG_RELIABLE);
  316. }
  317. int res;
  318. res = enet_peer_send(args->m_peer, 0, packet);
  319. args->m_cs->lock();
  320. args->m_hasCommand = false;
  321. args->m_cs->unlock();
  322. }
  323. bool hasNewStatus = args->checkData();
  324. if (hasNewStatus)
  325. {
  326. if (args->m_hasStatus)
  327. {
  328. //overflow: last status hasn't been processed yet
  329. b3Assert(0);
  330. printf("Error: received new status but previous status not processed yet");
  331. }
  332. else
  333. {
  334. args->m_cs->lock();
  335. args->m_hasStatus = hasNewStatus;
  336. args->m_cs->unlock();
  337. }
  338. }
  339. }
  340. }
  341. } while (args->m_cs->getSharedParam(0) != eUDPRequestTerminate);
  342. }
  343. else
  344. {
  345. args->m_cs->lock();
  346. args->m_cs->setSharedParam(0, eUDPInitializationFailed);
  347. args->m_cs->unlock();
  348. }
  349. printf("finished\n");
  350. }
  351. void* UDPlsMemoryFunc()
  352. {
  353. //don't create local store memory, just return 0
  354. return new UDPThreadLocalStorage;
  355. }
  356. void UDPlsMemoryReleaseFunc(void* ptr)
  357. {
  358. UDPThreadLocalStorage* p = (UDPThreadLocalStorage*)ptr;
  359. delete p;
  360. }
  361. UdpNetworkedPhysicsProcessor::UdpNetworkedPhysicsProcessor(const char* hostName, int port)
  362. {
  363. m_data = new UdpNetworkedInternalData;
  364. if (hostName)
  365. {
  366. m_data->m_hostName = hostName;
  367. }
  368. m_data->m_port = port;
  369. }
  370. UdpNetworkedPhysicsProcessor::~UdpNetworkedPhysicsProcessor()
  371. {
  372. disconnect();
  373. delete m_data;
  374. }
  375. bool UdpNetworkedPhysicsProcessor::processCommand(const struct SharedMemoryCommand& clientCmd, struct SharedMemoryStatus& serverStatusOut, char* bufferServerToClient, int bufferSizeInBytes)
  376. {
  377. if (gVerboseNetworkMessagesClient)
  378. {
  379. printf("PhysicsClientUDP::processCommand\n");
  380. }
  381. // int sz = sizeof(SharedMemoryCommand);
  382. b3Clock clock;
  383. double startTime = clock.getTimeInSeconds();
  384. double timeOutInSeconds = m_data->m_timeOutInSeconds;
  385. m_data->m_cs->lock();
  386. m_data->m_clientCmd = clientCmd;
  387. m_data->m_hasCommand = true;
  388. m_data->m_cs->unlock();
  389. while ((m_data->m_hasCommand) && (clock.getTimeInSeconds() - startTime < timeOutInSeconds))
  390. {
  391. b3Clock::usleep(0);
  392. }
  393. #if 0
  394. bool hasStatus = false;
  395. b3Clock clock;
  396. double startTime = clock.getTimeInSeconds();
  397. double timeOutInSeconds = m_data->m_timeOutInSeconds;
  398. const SharedMemoryStatus* stat = 0;
  399. while ((!hasStatus) && (clock.getTimeInSeconds() - startTime < timeOutInSeconds))
  400. {
  401. hasStatus = receiveStatus(serverStatusOut, bufferServerToClient, bufferSizeInBytes);
  402. b3Clock::usleep(100);
  403. }
  404. return hasStatus;
  405. #endif
  406. return false;
  407. }
  408. bool UdpNetworkedPhysicsProcessor::receiveStatus(struct SharedMemoryStatus& serverStatusOut, char* bufferServerToClient, int bufferSizeInBytes)
  409. {
  410. bool hasStatus = false;
  411. if (m_data->m_hasStatus)
  412. {
  413. if (gVerboseNetworkMessagesClient)
  414. {
  415. printf("UdpNetworkedPhysicsProcessor::receiveStatus\n");
  416. }
  417. hasStatus = true;
  418. serverStatusOut = m_data->m_lastStatus;
  419. int numStreamBytes = m_data->m_stream.size();
  420. if (numStreamBytes < bufferSizeInBytes)
  421. {
  422. for (int i = 0; i < numStreamBytes; i++)
  423. {
  424. bufferServerToClient[i] = m_data->m_stream[i];
  425. }
  426. }
  427. else
  428. {
  429. printf("Error: steam buffer overflow\n");
  430. }
  431. m_data->m_cs->lock();
  432. m_data->m_hasStatus = false;
  433. m_data->m_cs->unlock();
  434. }
  435. return hasStatus;
  436. }
  437. void UdpNetworkedPhysicsProcessor::renderScene(int renderFlags)
  438. {
  439. }
  440. void UdpNetworkedPhysicsProcessor::physicsDebugDraw(int debugDrawFlags)
  441. {
  442. }
  443. void UdpNetworkedPhysicsProcessor::setGuiHelper(struct GUIHelperInterface* guiHelper)
  444. {
  445. }
  446. bool UdpNetworkedPhysicsProcessor::isConnected() const
  447. {
  448. return m_data->m_isConnected;
  449. }
  450. bool UdpNetworkedPhysicsProcessor::connect()
  451. {
  452. if (m_data->m_threadSupport == 0)
  453. {
  454. m_data->m_threadSupport = createUDPThreadSupport(1);
  455. m_data->m_cs = m_data->m_threadSupport->createCriticalSection();
  456. m_data->m_cs->setSharedParam(0, eUDPIsUnInitialized);
  457. m_data->m_threadSupport->runTask(B3_THREAD_SCHEDULE_TASK, (void*)m_data, 0);
  458. while (m_data->m_cs->getSharedParam(0) == eUDPIsUnInitialized)
  459. {
  460. b3Clock::usleep(1000);
  461. }
  462. m_data->m_cs->lock();
  463. m_data->m_cs->setSharedParam(1, eUDP_ConnectRequest);
  464. m_data->m_cs->unlock();
  465. while (m_data->m_cs->getSharedParam(1) == eUDP_ConnectRequest)
  466. {
  467. b3Clock::usleep(1000);
  468. }
  469. }
  470. unsigned int sharedParam = m_data->m_cs->getSharedParam(1);
  471. bool isConnected = (sharedParam == eUDP_Connected);
  472. return isConnected;
  473. }
  474. void UdpNetworkedPhysicsProcessor::disconnect()
  475. {
  476. if (m_data->m_threadSupport)
  477. {
  478. m_data->m_cs->lock();
  479. m_data->m_cs->setSharedParam(0, eUDPRequestTerminate);
  480. m_data->m_cs->unlock();
  481. int numActiveThreads = 1;
  482. while (numActiveThreads)
  483. {
  484. int arg0, arg1;
  485. if (m_data->m_threadSupport->isTaskCompleted(&arg0, &arg1, 0))
  486. {
  487. numActiveThreads--;
  488. printf("numActiveThreads = %d\n", numActiveThreads);
  489. }
  490. else
  491. {
  492. b3Clock::usleep(1000);
  493. }
  494. };
  495. printf("stopping threads\n");
  496. delete m_data->m_threadSupport;
  497. m_data->m_threadSupport = 0;
  498. m_data->m_isConnected = false;
  499. }
  500. }
  501. void UdpNetworkedPhysicsProcessor::setTimeOut(double timeOutInSeconds)
  502. {
  503. m_data->m_timeOutInSeconds = timeOutInSeconds;
  504. }