Browse Source

Added TBlockingQueue

Brucey 1 năm trước cách đây
mục cha
commit
d03454a057

+ 4 - 2
collections.mod/collections.bmx

@@ -5,11 +5,13 @@ bbdoc: Generic Collections.
 End Rem
 End Rem
 Module BRL.Collections
 Module BRL.Collections
 
 
-ModuleInfo "Version: 1.00"
+ModuleInfo "Version: 1.01"
 ModuleInfo "Author: Bruce A Henderson"
 ModuleInfo "Author: Bruce A Henderson"
 ModuleInfo "License: zlib/libpng"
 ModuleInfo "License: zlib/libpng"
-ModuleInfo "Copyright: 2019 Bruce A Henderson"
+ModuleInfo "Copyright: 2019-2024 Bruce A Henderson"
 
 
+ModuleInfo "History: 1.01"
+ModuleInfo "History: Added TBlockingQueue"
 ModuleInfo "History: 1.00"
 ModuleInfo "History: 1.00"
 ModuleInfo "History: Initial Release"
 ModuleInfo "History: Initial Release"
 
 

+ 38 - 0
collections.mod/examples/blockingqueue_01.bmx

@@ -0,0 +1,38 @@
+'
+' Demonstrates how to use a blocking queue to synchronize threads.
+'
+SuperStrict
+
+Framework Brl.StandardIO
+Import Brl.Threads
+Import Brl.Collections
+
+
+Function Producer:Object(data:Object)
+	Local queue:TBlockingQueue<Int> = TBlockingQueue<Int>(data)
+
+    For Local i:Int = 1 To 10
+        Print "Producing " + i
+        queue.Enqueue(i)
+        Delay 500 ' Simulate work
+    Next
+End Function
+
+Function Consumer:Object(data:Object)
+	Local queue:TBlockingQueue<Int> = TBlockingQueue<Int>(data)
+
+    For Local i:Int = 1 To 10
+        Local item:Int = queue.Dequeue()
+        Print "Consuming " + item
+        Delay 1000 ' Simulate work
+    Next
+End Function
+
+Local queue:TBlockingQueue<Int> = New TBlockingQueue<Int>(5)
+Local producerThread:TThread = CreateThread(Producer, queue)
+Local consumerThread:TThread = CreateThread(Consumer, queue)
+
+WaitThread(producerThread)
+WaitThread(consumerThread)
+
+Print "All tasks are done."

+ 92 - 0
collections.mod/queue.bmx

@@ -1,6 +1,9 @@
 SuperStrict
 SuperStrict
 
 
 Import "collection.bmx"
 Import "collection.bmx"
+?threaded
+Import BRL.threads
+?
 
 
 Rem
 Rem
 bbdoc: A first-in, first-out (FIFO) collection of elements.
 bbdoc: A first-in, first-out (FIFO) collection of elements.
@@ -312,3 +315,92 @@ Type TQueueIterator<T> Implements IIterator<T>
 		Return index <> queue.tail
 		Return index <> queue.tail
 	End Method
 	End Method
 End Type
 End Type
+
+?threaded
+
+Rem
+bbdoc: A thread-safe first-in, first-out (FIFO) collection of elements.
+about: Implements a queue as a circular array. Elements stored in a #TBlockingQueue are inserted at one end and removed from the other.
+Use a #TBlockingQueue if you need to access the information in the same order that it is stored in the collection and you need to ensure that the collection is thread-safe.
+A call to #Dequeue will block if the queue is empty. A call to #Enqueue will block if the queue is full.
+The capacity of a #TBlockingQueue is the number of elements the #TBlockingQueue can hold. Once the queue is full, any attempt to add an element will block until space is available.
+End Rem
+Type TBlockingQueue<T> Extends TQueue<T>
+
+	Private
+		Field lock:TMutex
+		Field notEmpty:TCondVar
+		Field notFull:TCondVar
+	Public
+
+	Method New(capacity:Int = 16)
+		Super.New(capacity)
+		lock = TMutex.Create()
+		notEmpty = TCondVar.Create()
+		notFull = TCondVar.Create()
+	End Method
+	
+	Method Enqueue(element:T)
+		lock.Lock()
+		While full
+			notFull.Wait(lock)
+		Wend
+		Super.Enqueue(element)
+		notEmpty.Signal()
+		lock.Unlock()
+	End Method
+	
+	Method Dequeue:T()
+		lock.Lock()
+		While IsEmpty()
+			notEmpty.Wait(lock)
+		Wend
+		Local element:T = Super.Dequeue()
+		notFull.Signal()
+		lock.Unlock()
+		Return element
+	End Method
+	
+	Method TryDequeue:Int(value:T Var)
+		lock.Lock()
+		If IsEmpty()
+			lock.Unlock()
+			Return False
+		End If
+		value = Super.Dequeue()
+		notFull.Signal()
+		lock.Unlock()
+		Return True
+	End Method
+	
+	Method TryPeek:Int(value:T Var)
+		lock.Lock()
+		If IsEmpty()
+			lock.Unlock()
+			Return False
+		End If
+		value = data[head]
+		lock.Unlock()
+		Return True
+	End Method
+	
+	Method Clear()
+		lock.Lock()
+		Super.Clear()
+		notFull.Signal()
+		lock.Unlock()
+	End Method
+	
+	Method TrimExcess()
+		' noop since a blocking queue does not grow beyond its initial capacity
+	End Method
+	
+	Method Resize()
+		lock.Lock()
+		Super.Resize()
+		notFull.Signal()
+		lock.Unlock()
+	End Method
+	
+End Type
+?