2
0

PolyPeer.cpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. /*
  2. * PolyPeer.cpp
  3. * Poly
  4. *
  5. * Created by Ivan Safrin on 3/6/09.
  6. * Copyright 2009 __MyCompanyName__. All rights reserved.
  7. *
  8. */
  9. #include "PolyPeer.h"
  10. using namespace Polycode;
  11. void PeerConnection::ackPackets(unsigned int ack) {
  12. for(int i=0; i < reliablePacketQueue.size(); i++) {
  13. if(reliablePacketQueue[i].packet->header.sequence == ack) {
  14. delete reliablePacketQueue[i].packet;
  15. reliablePacketQueue.erase(reliablePacketQueue.begin()+i);
  16. }
  17. }
  18. }
  19. Peer::Peer(unsigned int port) : EventDispatcher(), Threaded() {
  20. socket = new Socket(port);
  21. socket->addEventListener(this, SocketEvent::EVENT_DATA_RECEIVED);
  22. #if USE_THREADED_SOCKETS == 1
  23. CoreServices::getInstance()->getCore()->createThread(this);
  24. updateTimer = NULL;
  25. #else
  26. updateTimer = new Timer(true, SOCKET_POLL_INTERVAL);
  27. updateTimer->addEventListener(this, Timer::EVENT_TRIGGER);
  28. #endif
  29. }
  30. Peer::~Peer() {
  31. delete socket;
  32. }
  33. PeerConnection *Peer::getPeerConnection(const Address &address) {
  34. for(int i=0; i < peerConnections.size(); i++) {
  35. if(peerConnections[i]->address == address) {
  36. return peerConnections[i];
  37. }
  38. }
  39. return NULL;
  40. }
  41. PeerConnection *Peer::addPeerConnection(const Address &address) {
  42. PeerConnection *newConnection = new PeerConnection();
  43. newConnection->address = address;
  44. peerConnections.push_back(newConnection);
  45. handlePeerConnection(newConnection);
  46. return newConnection;
  47. }
  48. Packet *Peer::createPacket(const Address &target, char *data, unsigned int size, unsigned short type) {
  49. PeerConnection *connection = getPeerConnection(target);
  50. if(!connection)
  51. connection = addPeerConnection(target);
  52. Packet *packet = new Packet();
  53. packet->header.sequence = connection->localSequence;
  54. packet->header.headerHash = 20;
  55. packet->header.reliableID = 0;
  56. packet->header.ack = connection->remoteSequence;
  57. packet->header.ackBitfield = 0;
  58. packet->header.size = size;
  59. packet->header.type = type;
  60. if(size > 0)
  61. memcpy(packet->data, data, size);
  62. connection->localSequence++;
  63. return packet;
  64. }
  65. void Peer::sendReliableData(const Address &target, char *data, unsigned int size, unsigned short type) {
  66. PeerConnection *connection = getPeerConnection(target);
  67. Packet *packet = createPacket(target, data, size, type);
  68. packet->header.reliableID = connection->reliableID;
  69. connection->reliableID++;
  70. sendPacket(target, packet);
  71. SentPacketEntry entry;
  72. entry.packet = packet;
  73. entry.timestamp = CoreServices::getInstance()->getCore()->getTicks();
  74. connection->reliablePacketQueue.push_back(entry);
  75. }
  76. void Peer::sendDataToAll(char *data, unsigned int size, unsigned short type) {
  77. for(int i=0; i < peerConnections.size(); i++) {
  78. sendData(peerConnections[i]->address, data, size, type);
  79. }
  80. }
  81. void Peer::sendReliableDataToAll(char *data, unsigned int size, unsigned short type) {
  82. for(int i=0; i < peerConnections.size(); i++) {
  83. sendReliableData(peerConnections[i]->address, data, size, type);
  84. }
  85. }
  86. void Peer::sendData(const Address &target, char *data, unsigned int size, unsigned short type) {
  87. Packet *packet = createPacket(target, data, size, type);
  88. sendPacket(target, packet);
  89. delete packet;
  90. }
  91. void Peer::sendPacket(const Address &target, Packet *packet) {
  92. unsigned int packetSize = packet->header.size + sizeof(packet->header);
  93. socket->sendData(target, (char*)packet, packetSize);
  94. }
  95. bool Peer::checkPacketAcks(PeerConnection *connection, Packet *packet) {
  96. bool retVal = true;
  97. if(packet->header.sequence > connection->remoteSequence)
  98. connection->remoteSequence = packet->header.sequence;
  99. else // ignore old packets
  100. retVal = false;
  101. // if this is a reliable packet, check if it was recently received
  102. if(packet->header.reliableID != 0) {
  103. retVal = true;
  104. for(int i=0; i < connection->recentReliableIDs.size(); i++) {
  105. if(connection->recentReliableIDs[i] == packet->header.reliableID)
  106. retVal = false;
  107. }
  108. // if still good, push the id into recent reliable acks
  109. if(retVal) {
  110. connection->recentReliableIDs.push_back(packet->header.reliableID);
  111. if(connection->recentReliableIDs.size() > 50)
  112. connection->recentReliableIDs.erase(connection->recentReliableIDs.begin());
  113. }
  114. }
  115. for(int i=0; i < peerConnections.size(); i++) {
  116. peerConnections[i]->ackPackets(packet->header.ack);
  117. }
  118. return retVal;
  119. }
  120. void Peer::handleEvent(Event *event) {
  121. if(event->getDispatcher() == socket) {
  122. SocketEvent *socketEvent = (SocketEvent*) event;
  123. switch(socketEvent->getEventCode()) {
  124. case SocketEvent::EVENT_DATA_RECEIVED:
  125. PeerConnection *connection = getPeerConnection(socketEvent->fromAddress);
  126. if(!connection)
  127. connection = addPeerConnection(socketEvent->fromAddress);
  128. if(checkPacketAcks(connection, (Packet*)socketEvent->data))
  129. handlePacket((Packet*)socketEvent->data, connection);
  130. break;
  131. }
  132. } else if(event->getDispatcher() == updateTimer) {
  133. updateThread();
  134. }
  135. }
  136. void Peer::updateThread() {
  137. for(int i=0; i < peerConnections.size(); i++) {
  138. for(int j=0; j < peerConnections[i]->reliablePacketQueue.size(); j++) {
  139. if(peerConnections[i]->reliablePacketQueue[j].timestamp < CoreServices::getInstance()->getCore()->getTicks() - 1000) {
  140. peerConnections[i]->reliablePacketQueue[j].timestamp = CoreServices::getInstance()->getCore()->getTicks();
  141. sendPacket(peerConnections[i]->address, peerConnections[i]->reliablePacketQueue[j].packet);
  142. }
  143. }
  144. }
  145. int received = 1;
  146. while( received > 0) {
  147. received = socket->receiveData();
  148. }
  149. }