|
@@ -138,15 +138,16 @@ void SceneReplicationInterface::on_network_process() {
|
|
|
spawn_queue.clear();
|
|
|
}
|
|
|
|
|
|
- // Process timed syncs.
|
|
|
- uint64_t msec = OS::get_singleton()->get_ticks_msec();
|
|
|
+ // Process syncs.
|
|
|
+ uint64_t usec = OS::get_singleton()->get_ticks_usec();
|
|
|
for (KeyValue<int, PeerInfo> &E : peers_info) {
|
|
|
const HashSet<ObjectID> to_sync = E.value.sync_nodes;
|
|
|
if (to_sync.is_empty()) {
|
|
|
continue; // Nothing to sync
|
|
|
}
|
|
|
uint16_t sync_net_time = ++E.value.last_sent_sync;
|
|
|
- _send_sync(E.key, to_sync, sync_net_time, msec);
|
|
|
+ _send_sync(E.key, to_sync, sync_net_time, usec);
|
|
|
+ _send_delta(E.key, to_sync, usec, E.value.last_watch_usecs);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -280,6 +281,7 @@ Error SceneReplicationInterface::on_replication_stop(Object *p_obj, Variant p_co
|
|
|
sync_nodes.erase(sid);
|
|
|
for (KeyValue<int, PeerInfo> &E : peers_info) {
|
|
|
E.value.sync_nodes.erase(sid);
|
|
|
+ E.value.last_watch_usecs.erase(sid);
|
|
|
if (sync->get_net_id()) {
|
|
|
E.value.recv_sync_ids.erase(sync->get_net_id());
|
|
|
}
|
|
@@ -357,6 +359,7 @@ Error SceneReplicationInterface::_update_sync_visibility(int p_peer, Multiplayer
|
|
|
E.value.sync_nodes.insert(sid);
|
|
|
} else {
|
|
|
E.value.sync_nodes.erase(sid);
|
|
|
+ E.value.last_watch_usecs.erase(sid);
|
|
|
}
|
|
|
}
|
|
|
return OK;
|
|
@@ -369,6 +372,7 @@ Error SceneReplicationInterface::_update_sync_visibility(int p_peer, Multiplayer
|
|
|
peers_info[p_peer].sync_nodes.insert(sid);
|
|
|
} else {
|
|
|
peers_info[p_peer].sync_nodes.erase(sid);
|
|
|
+ peers_info[p_peer].last_watch_usecs.erase(sid);
|
|
|
}
|
|
|
return OK;
|
|
|
}
|
|
@@ -670,8 +674,126 @@ Error SceneReplicationInterface::on_despawn_receive(int p_from, const uint8_t *p
|
|
|
return OK;
|
|
|
}
|
|
|
|
|
|
-void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p_synchronizers, uint16_t p_sync_net_time, uint64_t p_msec) {
|
|
|
- MAKE_ROOM(sync_mtu);
|
|
|
+bool SceneReplicationInterface::_verify_synchronizer(int p_peer, MultiplayerSynchronizer *p_sync, uint32_t &r_net_id) {
|
|
|
+ r_net_id = p_sync->get_net_id();
|
|
|
+ if (r_net_id == 0 || (r_net_id & 0x80000000)) {
|
|
|
+ int path_id = 0;
|
|
|
+ bool verified = multiplayer->get_path_cache()->send_object_cache(p_sync, p_peer, path_id);
|
|
|
+ ERR_FAIL_COND_V_MSG(path_id < 0, false, "This should never happen!");
|
|
|
+ if (r_net_id == 0) {
|
|
|
+ // First time path based ID.
|
|
|
+ r_net_id = path_id | 0x80000000;
|
|
|
+ p_sync->set_net_id(r_net_id | 0x80000000);
|
|
|
+ }
|
|
|
+ return verified;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
+MultiplayerSynchronizer *SceneReplicationInterface::_find_synchronizer(int p_peer, uint32_t p_net_id) {
|
|
|
+ MultiplayerSynchronizer *sync = nullptr;
|
|
|
+ if (p_net_id & 0x80000000) {
|
|
|
+ sync = Object::cast_to<MultiplayerSynchronizer>(multiplayer->get_path_cache()->get_cached_object(p_peer, p_net_id & 0x7FFFFFFF));
|
|
|
+ } else if (peers_info[p_peer].recv_sync_ids.has(p_net_id)) {
|
|
|
+ const ObjectID &sid = peers_info[p_peer].recv_sync_ids[p_net_id];
|
|
|
+ sync = get_id_as<MultiplayerSynchronizer>(sid);
|
|
|
+ }
|
|
|
+ return sync;
|
|
|
+}
|
|
|
+
|
|
|
+void SceneReplicationInterface::_send_delta(int p_peer, const HashSet<ObjectID> p_synchronizers, uint64_t p_usec, const HashMap<ObjectID, uint64_t> p_last_watch_usecs) {
|
|
|
+ MAKE_ROOM(/* header */ 1 + /* element */ 4 + 8 + 4 + delta_mtu);
|
|
|
+ uint8_t *ptr = packet_cache.ptrw();
|
|
|
+ ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC | (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT);
|
|
|
+ int ofs = 1;
|
|
|
+ for (const ObjectID &oid : p_synchronizers) {
|
|
|
+ MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);
|
|
|
+ ERR_CONTINUE(!sync || !sync->get_replication_config().is_valid() || !sync->is_multiplayer_authority());
|
|
|
+ uint32_t net_id;
|
|
|
+ if (!_verify_synchronizer(p_peer, sync, net_id)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ uint64_t last_usec = p_last_watch_usecs.has(oid) ? p_last_watch_usecs[oid] : 0;
|
|
|
+ uint64_t indexes;
|
|
|
+ List<Variant> delta = sync->get_delta_state(p_usec, last_usec, indexes);
|
|
|
+
|
|
|
+ if (!delta.size()) {
|
|
|
+ continue; // Nothing to update.
|
|
|
+ }
|
|
|
+
|
|
|
+ Vector<const Variant *> varp;
|
|
|
+ varp.resize(delta.size());
|
|
|
+ const Variant **vptr = varp.ptrw();
|
|
|
+ int i = 0;
|
|
|
+ for (const Variant &v : delta) {
|
|
|
+ vptr[i] = &v;
|
|
|
+ }
|
|
|
+ int size;
|
|
|
+ Error err = MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), nullptr, size);
|
|
|
+ ERR_CONTINUE_MSG(err != OK, "Unable to encode delta state.");
|
|
|
+
|
|
|
+ ERR_CONTINUE_MSG(size > delta_mtu, vformat("Synchronizer delta bigger than MTU will not be sent (%d > %d): %s", size, delta_mtu, sync->get_path()));
|
|
|
+
|
|
|
+ if (ofs + 4 + 8 + 4 + size > delta_mtu) {
|
|
|
+ // Send what we got, and reset write.
|
|
|
+ _send_raw(packet_cache.ptr(), ofs, p_peer, true);
|
|
|
+ ofs = 1;
|
|
|
+ }
|
|
|
+ if (size) {
|
|
|
+ ofs += encode_uint32(sync->get_net_id(), &ptr[ofs]);
|
|
|
+ ofs += encode_uint64(indexes, &ptr[ofs]);
|
|
|
+ ofs += encode_uint32(size, &ptr[ofs]);
|
|
|
+ MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), &ptr[ofs], size);
|
|
|
+ ofs += size;
|
|
|
+ }
|
|
|
+#ifdef DEBUG_ENABLED
|
|
|
+ _profile_node_data("delta_out", oid, size);
|
|
|
+#endif
|
|
|
+ peers_info[p_peer].last_watch_usecs[oid] = p_usec;
|
|
|
+ }
|
|
|
+ if (ofs > 1) {
|
|
|
+ // Got some left over to send.
|
|
|
+ _send_raw(packet_cache.ptr(), ofs, p_peer, true);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+Error SceneReplicationInterface::on_delta_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
|
|
|
+ int ofs = 1;
|
|
|
+ while (ofs + 4 + 8 + 4 < p_buffer_len) {
|
|
|
+ uint32_t net_id = decode_uint32(&p_buffer[ofs]);
|
|
|
+ ofs += 4;
|
|
|
+ uint64_t indexes = decode_uint64(&p_buffer[ofs]);
|
|
|
+ ofs += 8;
|
|
|
+ uint32_t size = decode_uint32(&p_buffer[ofs]);
|
|
|
+ ofs += 4;
|
|
|
+ ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);
|
|
|
+ MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);
|
|
|
+ Node *node = sync ? sync->get_root_node() : nullptr;
|
|
|
+ if (!sync || sync->get_multiplayer_authority() != p_from || !node) {
|
|
|
+ ofs += size;
|
|
|
+ ERR_CONTINUE_MSG(true, "Ignoring delta for non-authority or invalid synchronizer.");
|
|
|
+ }
|
|
|
+ List<NodePath> props = sync->get_delta_properties(indexes);
|
|
|
+ ERR_FAIL_COND_V(props.size() == 0, ERR_INVALID_DATA);
|
|
|
+ Vector<Variant> vars;
|
|
|
+ vars.resize(props.size());
|
|
|
+ int consumed = 0;
|
|
|
+ Error err = MultiplayerAPI::decode_and_decompress_variants(vars, p_buffer + ofs, size, consumed);
|
|
|
+ ERR_FAIL_COND_V(err != OK, err);
|
|
|
+ ERR_FAIL_COND_V(uint32_t(consumed) != size, ERR_INVALID_DATA);
|
|
|
+ err = MultiplayerSynchronizer::set_state(props, node, vars);
|
|
|
+ ERR_FAIL_COND_V(err != OK, err);
|
|
|
+ ofs += size;
|
|
|
+ sync->emit_signal(SNAME("delta_synchronized"));
|
|
|
+#ifdef DEBUG_ENABLED
|
|
|
+ _profile_node_data("delta_in", sync->get_instance_id(), size);
|
|
|
+#endif
|
|
|
+ }
|
|
|
+ return OK;
|
|
|
+}
|
|
|
+
|
|
|
+void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p_synchronizers, uint16_t p_sync_net_time, uint64_t p_usec) {
|
|
|
+ MAKE_ROOM(/* header */ 3 + /* element */ 4 + 4 + sync_mtu);
|
|
|
uint8_t *ptr = packet_cache.ptrw();
|
|
|
ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC;
|
|
|
int ofs = 1;
|
|
@@ -681,26 +803,16 @@ void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p
|
|
|
for (const ObjectID &oid : p_synchronizers) {
|
|
|
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);
|
|
|
ERR_CONTINUE(!sync || !sync->get_replication_config().is_valid() || !sync->is_multiplayer_authority());
|
|
|
- if (!sync->update_outbound_sync_time(p_msec)) {
|
|
|
+ if (!sync->update_outbound_sync_time(p_usec)) {
|
|
|
continue; // nothing to sync.
|
|
|
}
|
|
|
|
|
|
Node *node = sync->get_root_node();
|
|
|
ERR_CONTINUE(!node);
|
|
|
uint32_t net_id = sync->get_net_id();
|
|
|
- if (net_id == 0 || (net_id & 0x80000000)) {
|
|
|
- int path_id = 0;
|
|
|
- bool verified = multiplayer->get_path_cache()->send_object_cache(sync, p_peer, path_id);
|
|
|
- ERR_CONTINUE_MSG(path_id < 0, "This should never happen!");
|
|
|
- if (net_id == 0) {
|
|
|
- // First time path based ID.
|
|
|
- net_id = path_id | 0x80000000;
|
|
|
- sync->set_net_id(net_id | 0x80000000);
|
|
|
- }
|
|
|
- if (!verified) {
|
|
|
- // The path based sync is not yet confirmed, skipping.
|
|
|
- continue;
|
|
|
- }
|
|
|
+ if (!_verify_synchronizer(p_peer, sync, net_id)) {
|
|
|
+ // The path based sync is not yet confirmed, skipping.
|
|
|
+ continue;
|
|
|
}
|
|
|
int size;
|
|
|
Vector<Variant> vars;
|
|
@@ -711,7 +823,7 @@ void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p
|
|
|
err = MultiplayerAPI::encode_and_compress_variants(varp.ptrw(), varp.size(), nullptr, size);
|
|
|
ERR_CONTINUE_MSG(err != OK, "Unable to encode sync state.");
|
|
|
// TODO Handle single state above MTU.
|
|
|
- ERR_CONTINUE_MSG(size > 3 + 4 + 4 + sync_mtu, vformat("Node states bigger then MTU will not be sent (%d > %d): %s", size, sync_mtu, node->get_path()));
|
|
|
+ ERR_CONTINUE_MSG(size > sync_mtu, vformat("Node states bigger than MTU will not be sent (%d > %d): %s", size, sync_mtu, node->get_path()));
|
|
|
if (ofs + 4 + 4 + size > sync_mtu) {
|
|
|
// Send what we got, and reset write.
|
|
|
_send_raw(packet_cache.ptr(), ofs, p_peer, false);
|
|
@@ -735,6 +847,10 @@ void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p
|
|
|
|
|
|
Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
|
|
|
ERR_FAIL_COND_V_MSG(p_buffer_len < 11, ERR_INVALID_DATA, "Invalid sync packet received");
|
|
|
+ bool is_delta = (p_buffer[0] & (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT)) != 0;
|
|
|
+ if (is_delta) {
|
|
|
+ return on_delta_receive(p_from, p_buffer, p_buffer_len);
|
|
|
+ }
|
|
|
uint16_t time = decode_uint16(&p_buffer[1]);
|
|
|
int ofs = 3;
|
|
|
while (ofs + 8 < p_buffer_len) {
|
|
@@ -743,13 +859,7 @@ Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_bu
|
|
|
uint32_t size = decode_uint32(&p_buffer[ofs]);
|
|
|
ofs += 4;
|
|
|
ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);
|
|
|
- MultiplayerSynchronizer *sync = nullptr;
|
|
|
- if (net_id & 0x80000000) {
|
|
|
- sync = Object::cast_to<MultiplayerSynchronizer>(multiplayer->get_path_cache()->get_cached_object(p_from, net_id & 0x7FFFFFFF));
|
|
|
- } else if (peers_info[p_from].recv_sync_ids.has(net_id)) {
|
|
|
- const ObjectID &sid = peers_info[p_from].recv_sync_ids[net_id];
|
|
|
- sync = get_id_as<MultiplayerSynchronizer>(sid);
|
|
|
- }
|
|
|
+ MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);
|
|
|
if (!sync) {
|
|
|
// Not received yet.
|
|
|
ofs += size;
|
|
@@ -782,3 +892,21 @@ Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_bu
|
|
|
}
|
|
|
return OK;
|
|
|
}
|
|
|
+
|
|
|
+void SceneReplicationInterface::set_max_sync_packet_size(int p_size) {
|
|
|
+ ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");
|
|
|
+ sync_mtu = p_size;
|
|
|
+}
|
|
|
+
|
|
|
+int SceneReplicationInterface::get_max_sync_packet_size() const {
|
|
|
+ return sync_mtu;
|
|
|
+}
|
|
|
+
|
|
|
+void SceneReplicationInterface::set_max_delta_packet_size(int p_size) {
|
|
|
+ ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");
|
|
|
+ delta_mtu = p_size;
|
|
|
+}
|
|
|
+
|
|
|
+int SceneReplicationInterface::get_max_delta_packet_size() const {
|
|
|
+ return delta_mtu;
|
|
|
+}
|