浏览代码

Merge pull request #326 from bmx-ng/task/more-threaded-types

Added TThreadEvent and TBlockingTaskQueue types.
Brucey 1 年之前
父节点
当前提交
bbf6254ee2
共有 2 个文件被更改,包括 255 次插入0 次删除
  1. 171 0
      collections.mod/queue.bmx
  2. 84 0
      threads.mod/threads.bmx

+ 171 - 0
collections.mod/queue.bmx

@@ -450,5 +450,176 @@ Type TBlockingQueue<T> Extends TQueue<T>
 		lock.Unlock()
 	End Method
 	
+End Type
+
+Rem
+bbdoc: A thread-safe first-in, first-out (FIFO) collection of elements that supports the concept of tasks.
+about: When a task is complete, the task should call the #TaskDone method to signal that the task is done.
+End Rem
+Type TBlockingTaskQueue<T> Extends TQueue<T>
+
+	Private
+		Field lock:TMutex
+		Field notEmpty:TCondVar
+		Field notFull:TCondVar
+		Field allTasksDone:TCondVar
+		Field taskLock:TMutex
+		Field unfinishedTasks:Int
+	Public
+
+	Method New(capacity:Int = 16)
+		Super.New(capacity)
+		lock = TMutex.Create()
+		notEmpty = TCondVar.Create()
+		notFull = TCondVar.Create()
+		allTasksDone = TCondVar.Create()
+		taskLock = TMutex.Create()
+		unfinishedTasks = 0
+	End Method
+	
+	Method Enqueue(element:T)
+		lock.Lock()
+		While full
+			notFull.Wait(lock)
+		Wend
+		Super.Enqueue(element)
+		taskLock.Lock()
+		unfinishedTasks :+ 1
+		taskLock.Unlock()
+		notEmpty.Signal()
+		lock.Unlock()
+	End Method
+	
+	Rem
+	bbdoc: Adds an element to the end of the #TBlockingTaskQueue, waiting up to the specified wait time if necessary for space to become available
+	about: If the queue is full, the operation will block until space becomes available or the specified timeout elapses.
+	Throws a #TTimeoutException if the operation times out.
+	End Rem
+	Method Enqueue(element:T, timeout:ULong, unit:ETimeUnit = ETimeUnit.Milliseconds)
+		Local timeoutMs:ULong = TimeUnitToMillis(timeout, unit)
+	
+		Local startTime:ULong = CurrentUnixTime()
+		lock.Lock()
+		While full
+			Local now:ULong = CurrentUnixTime()
+			If timeout > 0 And now - startTime >= timeoutMs
+				lock.Unlock()
+				Throw New TTimeoutException("The operation timed out after " + timeoutMs + "ms")
+			End If
+			notFull.TimedWait(lock, Int(timeoutMs - (now - startTime)))
+		Wend
+		Super.Enqueue(element)
+		taskLock.Lock()
+		unfinishedTasks :+ 1
+		taskLock.Unlock()
+		notEmpty.Signal()
+		lock.Unlock()
+	End Method
+	
+	Rem
+	bbdoc: Removes and returns the element at the beginning of the #TBlockingTaskQueue, waiting up to the specified wait time if necessary for an element to become available.
+	about: If the queue is empty, the operation will block until an element becomes available or the specified timeout elapses.
+	Throws a #TTimeoutException if the operation times out.
+	End Rem
+	Method Dequeue:T(timeout:ULong, unit:ETimeUnit = ETimeUnit.Milliseconds)
+		Local timeoutMs:ULong = TimeUnitToMillis(timeout, unit)
+	
+		Local startTime:Long = CurrentUnixTime()
+		lock.Lock()
+		While IsEmpty()
+			Local now:ULong = CurrentUnixTime()
+			If timeout > 0 And now - startTime >= timeoutMs
+				lock.Unlock()
+				Throw New TTimeoutException("The operation timed out after " + timeoutMs + "ms")
+			End If
+			notEmpty.TimedWait(lock, Int(timeoutMs - (now - startTime)))
+		Wend
+		Local element:T = Super.Dequeue()
+		notFull.Signal()
+		lock.Unlock()
+		Return element
+	End Method
+
+	Method Dequeue:T()
+		lock.Lock()
+		While IsEmpty()
+			notEmpty.Wait(lock)
+		Wend
+		Local element:T = Super.Dequeue()
+		notFull.Signal()
+		lock.Unlock()
+		Return element
+	End Method
+	
+	Method TryDequeue:Int(value:T Var)
+		lock.Lock()
+		If IsEmpty()
+			lock.Unlock()
+			Return False
+		End If
+		value = Super.Dequeue()
+		notFull.Signal()
+		lock.Unlock()
+		Return True
+	End Method
+	
+	Method TryPeek:Int(value:T Var)
+		lock.Lock()
+		If IsEmpty()
+			lock.Unlock()
+			Return False
+		End If
+		value = data[head]
+		lock.Unlock()
+		Return True
+	End Method
+	
+	Method Clear()
+		lock.Lock()
+		Super.Clear()
+		taskLock.Lock()
+		unfinishedTasks = 0
+		allTasksDone.Signal()
+		taskLock.Unlock()
+		notFull.Signal()
+		lock.Unlock()
+	End Method
+	
+	Method TrimExcess()
+		' noop since a blocking queue does not grow beyond its initial capacity
+	End Method
+	
+	Method Resize()
+		lock.Lock()
+		Super.Resize()
+		notFull.Signal()
+		lock.Unlock()
+	End Method
+
+	Rem
+	bbdoc: Signals that a task is done.
+	End Rem
+	Method TaskDone()
+		taskLock.Lock()
+		If unfinishedTasks > 0 Then
+			unfinishedTasks :- 1
+			If unfinishedTasks = 0 Then
+				allTasksDone.Signal()
+			End If
+		End If
+		taskLock.Unlock()
+	End Method
+
+	Rem
+	bbdoc: Waits until all tasks are done.
+	End Rem
+	Method Join()
+		taskLock.Lock()
+		While unfinishedTasks > 0
+			allTasksDone.Wait(taskLock)
+		Wend
+		taskLock.Unlock()
+	End Method
+	
 End Type
 ?

+ 84 - 0
threads.mod/threads.bmx

@@ -25,6 +25,8 @@ Import "threads_mac.m"
 
 ?Threaded
 
+Import Pub.Stdc
+Import BRL.Time
 Import "threads.c"
 
 Private
@@ -407,6 +409,88 @@ Public
 
 End Type
 
+Rem
+bbdoc: A thread event object.
+about: A basic synchronization object that allows one thread to signal an event to other threads.
+It manages an internal flag that can be set or cleared, and provides methods to wait for the event to be set.
+End rem
+Type TThreadEvent
+    Private
+        Field lock:TMutex
+        Field condition:TCondVar
+        Field _isSet:Int
+	Public
+    Method New()
+        lock = TMutex.Create()
+        condition = TCondVar.Create()
+        _isSet = False
+    End Method
+
+    Rem
+	bboc: Sets the internal flag to #True and signals any waiting threads.
+	about: All threads waiting for it to become #True are awakened. Threads that call #Wait once the flag is true will not block at all.
+	End Rem
+    Method Set()
+        lock.Lock()
+        _isSet = True
+        condition.Broadcast()
+        lock.Unlock()
+    End Method
+
+    Rem
+	bbdoc: Resets the internal flag to false.
+	about: After clearing, threads calling #Wait will block until #Set is called to set the internal flag to #True again.
+	End Rem
+    Method Clear()
+        lock.Lock()
+        _isSet = False
+        lock.Unlock()
+    End Method
+
+    Rem
+	bbdoc: Waits for the event to be set.
+	about: This method could block indefinitely if the event is never set.
+	If the event is already set, the method returns immediately.
+	End Rem
+    Method Wait()
+        lock.Lock()
+        While Not _isSet
+            condition.Wait(lock)
+        Wend
+        lock.Unlock()
+    End Method
+
+    Rem
+	bbdoc: Waits for the event to be set, with a timeout.
+	about: If the timeout is reached before the event is set, the method returns #False.
+	End Rem
+    Method Wait:Int(timeout:ULong, unit:ETimeUnit = ETimeUnit.Milliseconds)
+        lock.Lock()
+		Local timeoutMs:ULong = TimeUnitToMillis(timeout, unit)
+        Local endTime:ULong = CurrentUnixTime() + timeoutMs
+        While Not _isSet
+            Local now:ULong = CurrentUnixTime()
+            If now >= timeoutMs Then
+                lock.Unlock()
+                Return False
+            End If
+            condition.TimedWait(lock, Int(timeoutMs - now))
+        Wend
+        lock.Unlock()
+        Return True
+    End Method
+
+    Rem
+	bbdoc: Returns whether the event is set or not.
+	End Rem
+    Method IsSet:Int()
+        lock.Lock()
+        Local result:Int = _isSet
+        lock.Unlock()
+        Return result
+    End Method
+End Type
+
 Rem
 bbdoc: Create a thread
 returns: A new thread object.