浏览代码

resource: use thread queues

Daniele Bartolini 2 年之前
父节点
当前提交
7e08864b61

+ 5 - 0
src/core/thread/mpsc_queue.inl

@@ -44,6 +44,11 @@ struct MPSCQueue
 	{
 		return _queue.pop(ev);
 	}
+
+	bool empty()
+	{
+		return _queue.empty();
+	}
 };
 
 } // namespace crown

+ 8 - 0
src/core/thread/spsc_queue.inl

@@ -76,6 +76,14 @@ struct SPSCQueue
 		_head.store(head_next, std::memory_order_release);
 		return true;
 	}
+
+	///
+	bool empty()
+	{
+		const int head = _head.load(std::memory_order_relaxed);
+		const int tail = _tail.load(std::memory_order_acquire);
+		return head == tail;
+	}
 };
 
 } // namespace crown

+ 8 - 2
src/device/device.cpp

@@ -434,8 +434,14 @@ void Device::run()
 		boot_dir += CROWN_BOOT_CONFIG;
 
 		const StringId64 config_name(boot_dir.c_str());
-		_resource_manager->load(PACKAGE_RESOURCE_NONE, RESOURCE_TYPE_CONFIG, config_name);
-		_resource_manager->flush();
+
+		while (!_resource_manager->try_load(PACKAGE_RESOURCE_NONE, RESOURCE_TYPE_CONFIG, config_name)) {
+			_resource_manager->complete_requests();
+		}
+		while (!_resource_manager->can_get(RESOURCE_TYPE_CONFIG, config_name)) {
+			_resource_manager->complete_requests();
+		}
+
 		_boot_config.parse((const char *)_resource_manager->get(RESOURCE_TYPE_CONFIG, config_name));
 		_resource_manager->unload(RESOURCE_TYPE_CONFIG, config_name);
 	}

+ 78 - 97
src/resource/resource_loader.cpp

@@ -45,41 +45,13 @@ ResourceLoader::~ResourceLoader()
 	_thread.stop();
 }
 
-void ResourceLoader::add_request(const ResourceRequest &rr)
+bool ResourceLoader::add_request(const ResourceRequest &rr)
 {
-	ScopedMutex sm(_mutex);
-	queue::push_back(_requests, rr);
-	_requests_condition.signal();
-}
-
-void ResourceLoader::flush()
-{
-	while (num_requests()) {}
-}
+	bool success = _requests.push(rr);
+	if (success)
+		_requests_condition.signal();
 
-u32 ResourceLoader::num_requests()
-{
-	ScopedMutex sm(_mutex);
-	return queue::size(_requests);
-}
-
-void ResourceLoader::add_loaded(ResourceRequest rr)
-{
-	ScopedMutex sm(_loaded_mutex);
-	queue::push_back(_loaded, rr);
-}
-
-void ResourceLoader::get_loaded(Array<ResourceRequest> &loaded)
-{
-	ScopedMutex sm(_loaded_mutex);
-
-	const u32 num = queue::size(_loaded);
-	array::reserve(loaded, num);
-
-	for (u32 i = 0; i < num; ++i) {
-		array::push_back(loaded, queue::front(_loaded));
-		queue::pop_front(_loaded);
-	}
+	return success;
 }
 
 void ResourceLoader::register_fallback(StringId64 type, StringId64 name)
@@ -91,27 +63,78 @@ s32 ResourceLoader::run()
 {
 	while (1) {
 		_mutex.lock();
-		while (queue::empty(_requests) && !_exit)
+		while (!_exit && _requests.empty())
 			_requests_condition.wait(_mutex);
 
+		_mutex.unlock();
 		if (_exit)
 			break;
 
-		ResourceRequest rr = queue::front(_requests);
-		_mutex.unlock();
+		ResourceRequest rr;
+		while (!_exit && _requests.pop(rr)) {
+			ResourceId res_id = resource_id(rr.type, rr.name);
+
+			TempAllocator128 ta;
+			DynamicString path(ta);
+			destination_path(path, res_id);
+
+			if (_is_bundle) {
+				if (rr.type == RESOURCE_TYPE_PACKAGE || rr.type == RESOURCE_TYPE_CONFIG) {
+					File *file = _data_filesystem.open(path.c_str(), FileOpenMode::READ);
+					CE_ASSERT(file->is_open(), "Cannot load " RESOURCE_ID_FMT, res_id);
+
+					// Load the resource.
+					if (rr.load_function) {
+						rr.data = rr.load_function(*file, *rr.allocator);
+					} else {
+						const u32 file_size = file->size();
+						rr.data = rr.allocator->allocate(file_size, 16);
+						file->read(rr.data, file_size);
+						CE_ASSERT(*(u32 *)rr.data == RESOURCE_HEADER(rr.version), "Wrong version");
+					}
 
-		ResourceId res_id = resource_id(rr.type, rr.name);
+					_data_filesystem.close(*file);
+				} else {
+					// Get the package containing the resource.
+					const PackageResource *pkg = (PackageResource *)rr.resource_manager->get(RESOURCE_TYPE_PACKAGE, rr.package_name);
+
+					// Find the resource inside the package.
+					for (u32 ii = 0; ii < pkg->num_resources; ++ii) {
+						const ResourceOffset *offt = package_resource::resource_offset(pkg, ii);
+						if (offt->type == rr.type && offt->name == rr.name) {
+							const void *resource_data = package_resource::data(pkg) + offt->offset;
+
+							// Load the resource.
+							if (rr.load_function) {
+								FileMemory fm(resource_data, offt->size);
+								rr.data = rr.load_function(fm, *rr.allocator);
+							} else {
+								rr.allocator = NULL;
+								rr.data = (void *)resource_data;
+								CE_ASSERT(*(u32 *)rr.data == RESOURCE_HEADER(rr.version), "Wrong version");
+							}
+
+							break;
+						}
+					}
+				}
+			} else {
+				File *file = _data_filesystem.open(path.c_str(), FileOpenMode::READ);
+				if (!file->is_open()) {
+					logw(RESOURCE_LOADER, "Cannot load resource: " RESOURCE_ID_FMT ". Falling back...", res_id._id);
 
-		TempAllocator128 ta;
-		DynamicString path(ta);
-		destination_path(path, res_id);
+					StringId64 fallback_name;
+					fallback_name = hash_map::get(_fallback, rr.type, fallback_name);
+					CE_ENSURE(fallback_name._id != 0);
 
-		if (_is_bundle) {
-			if (rr.type == RESOURCE_TYPE_PACKAGE || rr.type == RESOURCE_TYPE_CONFIG) {
-				File *file = _data_filesystem.open(path.c_str(), FileOpenMode::READ);
-				CE_ASSERT(file->is_open(), "Cannot load " RESOURCE_ID_FMT, res_id);
+					res_id = resource_id(rr.type, fallback_name);
+					destination_path(path, res_id);
+
+					_data_filesystem.close(*file);
+					file = _data_filesystem.open(path.c_str(), FileOpenMode::READ);
+				}
+				CE_ASSERT(file->is_open(), "Cannot load fallback resource: " RESOURCE_ID_FMT, res_id._id);
 
-				// Load the resource.
 				if (rr.load_function) {
 					rr.data = rr.load_function(*file, *rr.allocator);
 				} else {
@@ -122,66 +145,24 @@ s32 ResourceLoader::run()
 				}
 
 				_data_filesystem.close(*file);
-			} else {
-				// Get the package containing the resource.
-				const PackageResource *pkg = (PackageResource *)rr.resource_manager->get(RESOURCE_TYPE_PACKAGE, rr.package_name);
-
-				// Find the resource inside the package.
-				for (u32 ii = 0; ii < pkg->num_resources; ++ii) {
-					const ResourceOffset *offt = package_resource::resource_offset(pkg, ii);
-					if (offt->type == rr.type && offt->name == rr.name) {
-						const void *resource_data = package_resource::data(pkg) + offt->offset;
-
-						// Load the resource.
-						if (rr.load_function) {
-							FileMemory fm(resource_data, offt->size);
-							rr.data = rr.load_function(fm, *rr.allocator);
-						} else {
-							rr.allocator = NULL;
-							rr.data = (void *)resource_data;
-							CE_ASSERT(*(u32 *)rr.data == RESOURCE_HEADER(rr.version), "Wrong version");
-						}
+			}
 
+#define MAX_TRIES 16
+			while (1) {
+				u32 num_tries = 0;
+				while (num_tries++ < MAX_TRIES) {
+					if (_loaded.push(rr))
 						break;
-					}
 				}
-			}
-		} else {
-			File *file = _data_filesystem.open(path.c_str(), FileOpenMode::READ);
-			if (!file->is_open()) {
-				logw(RESOURCE_LOADER, "Cannot load resource: " RESOURCE_ID_FMT ". Falling back...", res_id._id);
+				if (num_tries < MAX_TRIES)
+					break;
 
-				StringId64 fallback_name;
-				fallback_name = hash_map::get(_fallback, rr.type, fallback_name);
-				CE_ENSURE(fallback_name._id != 0);
-
-				res_id = resource_id(rr.type, fallback_name);
-				destination_path(path, res_id);
-
-				_data_filesystem.close(*file);
-				file = _data_filesystem.open(path.c_str(), FileOpenMode::READ);
-			}
-			CE_ASSERT(file->is_open(), "Cannot load fallback resource: " RESOURCE_ID_FMT, res_id._id);
-
-			if (rr.load_function) {
-				rr.data = rr.load_function(*file, *rr.allocator);
-			} else {
-				const u32 file_size = file->size();
-				rr.data = rr.allocator->allocate(file_size, 16);
-				file->read(rr.data, file_size);
-				CE_ASSERT(*(u32 *)rr.data == RESOURCE_HEADER(rr.version), "Wrong version");
+				os::sleep(16);
 			}
-
-			_data_filesystem.close(*file);
+#undef MAX_TRIES
 		}
-
-		add_loaded(rr);
-		_mutex.lock();
-		queue::pop_front(_requests);
-		_mutex.unlock();
 	}
 
-	_mutex.unlock();
 	return 0;
 }
 

+ 5 - 13
src/resource/resource_loader.h

@@ -10,6 +10,7 @@
 #include "core/strings/string_id.h"
 #include "core/thread/condition_variable.h"
 #include "core/thread/mutex.h"
+#include "core/thread/spsc_queue.inl"
 #include "core/thread/thread.h"
 #include "core/types.h"
 #include "resource/types.h"
@@ -38,19 +39,15 @@ struct ResourceLoader
 	Filesystem &_data_filesystem;
 	bool _is_bundle;
 
-	Queue<ResourceRequest> _requests;
-	Queue<ResourceRequest> _loaded;
+	SPSCQueue<ResourceRequest, 128> _requests;
+	SPSCQueue<ResourceRequest, 128> _loaded;
 	HashMap<StringId64, StringId64> _fallback;
 
 	Thread _thread;
 	Mutex _mutex;
 	ConditionVariable _requests_condition;
-	Mutex _loaded_mutex;
 	bool _exit;
 
-	///
-	u32 num_requests();
-
 	///
 	void add_loaded(ResourceRequest rr);
 
@@ -65,13 +62,8 @@ struct ResourceLoader
 	~ResourceLoader();
 
 	/// Adds a request for loading the resource described by @a rr.
-	void add_request(const ResourceRequest &rr);
-
-	/// Blocks until all pending requests have been processed.
-	void flush();
-
-	/// Returns all the resources that have been loaded.
-	void get_loaded(Array<ResourceRequest> &loaded);
+	/// Returns true on success, false otherwise.
+	bool add_request(const ResourceRequest &rr);
 
 	/// Registers a fallback resource @a name for the given resource @a type.
 	void register_fallback(StringId64 type, StringId64 name);

+ 19 - 24
src/resource/resource_manager.cpp

@@ -69,7 +69,7 @@ ResourceManager::~ResourceManager()
 	}
 }
 
-void ResourceManager::load(StringId64 package_name, StringId64 type, StringId64 name)
+bool ResourceManager::try_load(StringId64 package_name, StringId64 type, StringId64 name)
 {
 	ResourcePair id = { type, name };
 	ResourceEntry &entry = hash_map::get(_rm, id, ResourceEntry::NOT_FOUND);
@@ -93,17 +93,15 @@ void ResourceManager::load(StringId64 package_name, StringId64 type, StringId64
 		rr.allocator = &_resource_heap;
 		rr.data = NULL;
 
-		_loader->add_request(rr);
-		return;
+		return _loader->add_request(rr);
 	}
 
 	entry.references++;
+	return true;
 }
 
 void ResourceManager::unload(StringId64 type, StringId64 name)
 {
-	flush();
-
 	ResourcePair id = { type, name };
 	ResourceEntry &entry = hash_map::get(_rm, id, ResourceEntry::NOT_FOUND);
 
@@ -125,8 +123,9 @@ void ResourceManager::reload(StringId64 type, StringId64 name)
 		return;
 
 	unload(type, name);
-	load(PACKAGE_RESOURCE_NONE, type, name);
-	flush();
+	while (!try_load(PACKAGE_RESOURCE_NONE, type, name)) {
+		complete_requests();
+	}
 
 	ResourceEntry &new_entry = hash_map::get(_rm, id, ResourceEntry::NOT_FOUND);
 	new_entry.references = old_refs;
@@ -145,8 +144,13 @@ const void *ResourceManager::get(StringId64 type, StringId64 name)
 	CE_ASSERT(can_get(type, name), "Resource not loaded: " RESOURCE_ID_FMT, resource_id(type, name)._id);
 
 	if (_autoload && !hash_map::has(_rm, id)) {
-		load(PACKAGE_RESOURCE_NONE, type, name);
-		flush();
+		while (!try_load(PACKAGE_RESOURCE_NONE, type, name)) {
+			complete_requests();
+		}
+
+		while (!hash_map::has(_rm, id)) {
+			complete_requests();
+		}
 	}
 
 	const ResourceEntry &entry = hash_map::get(_rm, id, ResourceEntry::NOT_FOUND);
@@ -158,29 +162,20 @@ void ResourceManager::enable_autoload(bool enable)
 	_autoload = enable;
 }
 
-void ResourceManager::flush()
-{
-	_loader->flush();
-	complete_requests();
-}
-
 void ResourceManager::complete_requests()
 {
-	TempAllocator1024 ta;
-	Array<ResourceRequest> loaded(ta);
-	_loader->get_loaded(loaded);
-
-	for (u32 ii = 0; ii < array::size(loaded); ++ii) {
+	ResourceRequest rr;
+	while (_loader->_loaded.pop(rr)) {
 		ResourceEntry entry;
 		entry.references = 1;
-		entry.data = loaded[ii].data;
-		entry.allocator = loaded[ii].allocator;
+		entry.data = rr.data;
+		entry.allocator = rr.allocator;
 
-		ResourcePair id = { loaded[ii].type, loaded[ii].name };
+		ResourcePair id = { rr.type, rr.name };
 
 		hash_map::set(_rm, id, entry);
 
-		on_online(loaded[ii].type, loaded[ii].name);
+		on_online(rr.type, rr.name);
 	}
 }
 

+ 6 - 7
src/resource/resource_manager.h

@@ -70,9 +70,11 @@ struct ResourceManager
 	///
 	~ResourceManager();
 
-	/// Loads the resource (@a type, @a name) from the @a package.
-	/// You can check whether the resource is available with can_get().
-	void load(StringId64 package_name, StringId64 type, StringId64 name);
+	/// Tries to load the resource (@a type, @a name) from @a package.
+	/// When the load queue is full, it may fail returning false. In such case,
+	/// you must call complete_requests() and try again later until true is returned.
+	/// Use can_get() to check whether the resource can be used.
+	bool try_load(StringId64 package_name, StringId64 type, StringId64 name);
 
 	/// Unloads the resource @a type @a name.
 	void unload(StringId64 type, StringId64 name);
@@ -90,10 +92,7 @@ struct ResourceManager
 	/// Sets whether resources should be automatically loaded when accessed.
 	void enable_autoload(bool enable);
 
-	/// Blocks until all load() requests have been completed.
-	void flush();
-
-	/// Completes all load() requests which have been loaded by ResourceLoader.
+	/// Completes all load requests which have been loaded by ResourceLoader.
 	void complete_requests();
 
 	/// Registers a new resource @a type into the resource manager.

+ 39 - 9
src/resource/resource_package.cpp

@@ -4,6 +4,7 @@
  */
 
 #include "core/containers/array.inl"
+#include "core/os.h"
 #include "core/strings/string_id.inl"
 #include "resource/package_resource.h"
 #include "resource/resource_id.inl"
@@ -18,6 +19,9 @@ ResourcePackage::ResourcePackage(StringId64 id, ResourceManager &resman)
 	, _resource_manager(&resman)
 	, _package_resource_name(id)
 	, _package_resource(NULL)
+	, _num_resources_queued(0)
+	, _package_resource_queued(false)
+	, _loaded(false)
 {
 }
 
@@ -29,13 +33,28 @@ ResourcePackage::~ResourcePackage()
 
 void ResourcePackage::load()
 {
-	_resource_manager->load(PACKAGE_RESOURCE_NONE, RESOURCE_TYPE_PACKAGE, _package_resource_name);
-	_resource_manager->flush();
-	_package_resource = (const PackageResource *)_resource_manager->get(RESOURCE_TYPE_PACKAGE, _package_resource_name);
+	// Load the package resource itself.
+	if (!_package_resource_queued) {
+		_package_resource_queued = _resource_manager->try_load(PACKAGE_RESOURCE_NONE, RESOURCE_TYPE_PACKAGE, _package_resource_name);
+	} else {
+		if (_package_resource == NULL) {
+			if (!_resource_manager->can_get(RESOURCE_TYPE_PACKAGE, _package_resource_name)) {
+				_resource_manager->complete_requests();
+				return;
+			}
 
-	for (u32 ii = 0; ii < _package_resource->num_resources; ++ii) {
-		const ResourceOffset *ro = package_resource::resource_offset(_package_resource, ii);
-		_resource_manager->load(_package_resource_name, ro->type, ro->name);
+			_package_resource = (PackageResource *)_resource_manager->get(RESOURCE_TYPE_PACKAGE, _package_resource_name);
+		}
+
+		// Now that the package resource has been loaded, issue loading requests for all the
+		// resources it contains.
+		for (u32 ii = _num_resources_queued; ii < _package_resource->num_resources; ++ii) {
+			const ResourceOffset *ro = package_resource::resource_offset(_package_resource, ii);
+			if (!_resource_manager->try_load(_package_resource_name, ro->type, ro->name))
+				break;
+
+			++_num_resources_queued;
+		}
 	}
 }
 
@@ -49,18 +68,29 @@ void ResourcePackage::unload()
 
 void ResourcePackage::flush()
 {
-	_resource_manager->flush();
+	while (!has_loaded()) {
+		_resource_manager->complete_requests();
+	}
 }
 
-bool ResourcePackage::has_loaded() const
+bool ResourcePackage::has_loaded()
 {
+	if (_loaded)
+		return _loaded;
+
+	load();
+
+	if (_package_resource == NULL)
+		return false;
+
 	for (u32 ii = 0; ii < _package_resource->num_resources; ++ii) {
 		const ResourceOffset *ro = package_resource::resource_offset(_package_resource, ii);
 		if (!_resource_manager->can_get(ro->type, ro->name))
 			return false;
 	}
 
-	return true;
+	_loaded = true;
+	return _loaded;
 }
 
 } // namespace crown

+ 4 - 1
src/resource/resource_package.h

@@ -18,6 +18,9 @@ struct ResourcePackage
 	ResourceManager *_resource_manager;
 	StringId64 _package_resource_name;
 	const PackageResource *_package_resource;
+	u32 _num_resources_queued;
+	bool _package_resource_queued;
+	bool _loaded;
 
 	///
 	ResourcePackage(StringId64 id, ResourceManager &resman);
@@ -38,7 +41,7 @@ struct ResourcePackage
 	void flush();
 
 	/// Returns whether the package has been loaded.
-	bool has_loaded() const;
+	bool has_loaded();
 };
 
 } // namespace crown