Browse Source

Add Channel:supply(), a blocking version of Channel:push()

--HG--
branch : minor
Bart van Strien 12 years ago
parent
commit
be2363fc00

+ 49 - 5
src/modules/thread/Channel.cpp

@@ -22,6 +22,30 @@
 #include <map>
 #include <map>
 #include <string>
 #include <string>
 
 
+namespace
+{
+	union uslong
+	{
+		unsigned long u;
+		long i;
+	};
+
+	// target <= current, but semi-wrapsafe, one wrap, anyway
+	inline bool past(unsigned int target, unsigned int current)
+	{
+		if (target > current)
+			return false;
+		if (target == current)
+			return true;
+
+		uslong t, c;
+		t.u = target;
+		c.u = current;
+
+		return !(t.i < 0 && c.i > 0);
+	}
+}
+
 namespace love
 namespace love
 {
 {
 namespace thread
 namespace thread
@@ -41,14 +65,14 @@ Channel *Channel::getChannel(const std::string &name)
 }
 }
 
 
 Channel::Channel()
 Channel::Channel()
-	: named(false)
+	: named(false), sent(0), received(0)
 {
 {
 	mutex = newMutex();
 	mutex = newMutex();
 	cond = newConditional();
 	cond = newConditional();
 }
 }
 
 
 Channel::Channel(const std::string &name)
 Channel::Channel(const std::string &name)
-	: named(true), name(name)
+	: named(true), name(name), sent(0), received(0)
 {
 {
 	mutex = newMutex();
 	mutex = newMutex();
 	cond = newConditional();
 	cond = newConditional();
@@ -68,10 +92,10 @@ Channel::~Channel()
 		namedChannels.erase(name);
 		namedChannels.erase(name);
 }
 }
 
 
-void Channel::push(Variant *var)
+unsigned long Channel::push(Variant *var)
 {
 {
 	if (!var)
 	if (!var)
-		return;
+		return 0;
 	Lock l(mutex);
 	Lock l(mutex);
 	var->retain();
 	var->retain();
 	// Keep a reference to ourselves
 	// Keep a reference to ourselves
@@ -79,7 +103,24 @@ void Channel::push(Variant *var)
 	if (named && queue.empty())
 	if (named && queue.empty())
 		retain();
 		retain();
 	queue.push(var);
 	queue.push(var);
-	cond->signal();
+	cond->broadcast();
+
+	return ++sent;
+}
+
+void Channel::supply(Variant *var)
+{
+	if (!var)
+		return;
+
+	unsigned long id = push(var);
+
+	mutex->lock();
+	while (!past(id, received))
+	{
+		cond->wait(mutex);
+	}
+	mutex->unlock();
 }
 }
 
 
 Variant *Channel::pop()
 Variant *Channel::pop()
@@ -91,6 +132,9 @@ Variant *Channel::pop()
 	Variant *var = queue.front();
 	Variant *var = queue.front();
 	queue.pop();
 	queue.pop();
 
 
+	received++;
+	cond->broadcast();
+
 	// Release our reference to ourselves
 	// Release our reference to ourselves
 	// if we're empty and named.
 	// if we're empty and named.
 	if (named && queue.empty())
 	if (named && queue.empty())

+ 6 - 2
src/modules/thread/Channel.h

@@ -43,14 +43,18 @@ private:
 	std::string name;
 	std::string name;
 	Channel(const std::string &name);
 	Channel(const std::string &name);
 
 
+	unsigned long sent;
+	unsigned long received;
+
 public:
 public:
 	Channel();
 	Channel();
 	~Channel();
 	~Channel();
 	static Channel *getChannel(const std::string &name);
 	static Channel *getChannel(const std::string &name);
 
 
-	void push(Variant *var);
+	unsigned long push(Variant *var);
+	void supply(Variant *var); // blocking push
 	Variant *pop();
 	Variant *pop();
-	Variant *demand();
+	Variant *demand(); // blocking pop
 	Variant *peek();
 	Variant *peek();
 	int count();
 	int count();
 	void clear();
 	void clear();

+ 10 - 0
src/modules/thread/wrap_Channel.cpp

@@ -38,6 +38,15 @@ namespace thread
 		return 0;
 		return 0;
 	}
 	}
 
 
+	int w_Channel_supply(lua_State *L)
+	{
+		Channel *c = luax_checkchannel(L, 1);
+		Variant *var = Variant::fromLua(L, 2);
+		c->supply(var);
+		var->release();
+		return 0;
+	}
+
 	int w_Channel_pop(lua_State *L)
 	int w_Channel_pop(lua_State *L)
 	{
 	{
 		Channel *c = luax_checkchannel(L, 1);
 		Channel *c = luax_checkchannel(L, 1);
@@ -91,6 +100,7 @@ namespace thread
 
 
 	static const luaL_Reg type_functions[] = {
 	static const luaL_Reg type_functions[] = {
 		{ "push", w_Channel_push },
 		{ "push", w_Channel_push },
+		{ "supply", w_Channel_supply },
 		{ "pop", w_Channel_pop },
 		{ "pop", w_Channel_pop },
 		{ "demand", w_Channel_demand },
 		{ "demand", w_Channel_demand },
 		{ "peek", w_Channel_peek },
 		{ "peek", w_Channel_peek },

+ 1 - 0
src/modules/thread/wrap_Channel.h

@@ -30,6 +30,7 @@ namespace thread
 {
 {
 	Channel *luax_checkchannel(lua_State *L, int idx);
 	Channel *luax_checkchannel(lua_State *L, int idx);
 	int w_Channel_push(lua_State *L);
 	int w_Channel_push(lua_State *L);
+	int w_Channel_supply(lua_State *L);
 	int w_Channel_pop(lua_State *L);
 	int w_Channel_pop(lua_State *L);
 	int w_Channel_demand(lua_State *L);
 	int w_Channel_demand(lua_State *L);
 	int w_Channel_peek(lua_State *L);
 	int w_Channel_peek(lua_State *L);