Răsfoiți Sursa

Merge pull request #322 from bmx-ng/task/add-scheduled-threadpool

Added scheduled thread pool.
Brucey 1 an în urmă
părinte
comite
c0cd77d02a

+ 34 - 0
threadpool.mod/examples/scheduled_01.bmx

@@ -0,0 +1,34 @@
+'
+' Demostrates use of a scheduled thread pool with single-shot tasks.
+'
+SuperStrict
+
+Framework BRL.Standardio
+Import BRL.ThreadPool
+
+Local pool:TScheduledThreadPoolExecutor = TScheduledThreadPoolExecutor.newFixedThreadPool(11)
+
+For Local i:Int = 10 Until 0 Step -1
+	pool.schedule(New TTask(i), 10 - i, ETimeUnit.Seconds)
+Next
+
+Print "Shutting down the pool..."
+
+pool.shutdown()
+
+Print "Done"
+
+Type TTask Extends TRunnable
+
+	Field value:Int
+
+	Method New(value:Int)
+		Self.value = value
+	End Method
+
+	Method run()
+		Print "Number " + value
+	End Method
+	
+End Type
+

+ 35 - 0
threadpool.mod/examples/scheduled_02.bmx

@@ -0,0 +1,35 @@
+'
+' Demostrates use of a scheduled thread pool with recurring tasks.
+'
+SuperStrict
+
+Framework BRL.Standardio
+Import BRL.ThreadPool
+
+Local pool:TScheduledThreadPoolExecutor = TScheduledThreadPoolExecutor.newFixedThreadPool(11)
+
+pool.schedule(New TTask("One-shot Task"), 5, ETimeUnit.Seconds) ' after 5 sceonds
+pool.schedule(New TTask("Recurring Task"), 3, 5, ETimeUnit.Seconds) ' after 3 seconds and then every 5 seconds
+
+Delay(10 * 1000) ' wait for 10 seconds and then shutdown the pool
+
+Print "Shutting down the pool..."
+
+pool.shutdown()
+
+Print "Done"
+
+Type TTask Extends TRunnable
+
+	Field message:String
+
+	Method New(message:String)
+		Self.message = message
+	End Method
+
+	Method run()
+		Print message
+	End Method
+	
+End Type
+

+ 221 - 17
threadpool.mod/threadpool.bmx

@@ -26,11 +26,13 @@ bbdoc: System/ThreadPool
 End Rem
 Module BRL.ThreadPool
 
-ModuleInfo "Version: 1.01"
+ModuleInfo "Version: 1.02"
 ModuleInfo "Author: Bruce A Henderson"
 ModuleInfo "License: zlib/libpng"
 ModuleInfo "Copyright: Bruce A Henderson"
 
+ModuleInfo "History: 1.02"
+ModuleInfo "History: Added scheduled pool executor"
 ModuleInfo "History: 1.01"
 ModuleInfo "History: Added cached pool executor"
 ModuleInfo "History: 1.00"
@@ -40,7 +42,7 @@ ModuleInfo "History: Initial Release"
 
 Import BRL.Threads
 Import BRL.LinkedList
-
+Import pub.stdc
 
 Rem
 bbdoc: An object that is intended to be executed by a thread pool.
@@ -181,24 +183,30 @@ Type TThreadPoolExecutor Extends TExecutor
 	End Rem
 	Method execute(command:TRunnable) Override
 		If Not isShutdown Then
-			If maxThreads < 0 Then
-				Local newThread:Int
-				countLock.Lock()
-				If threadsWorking = threadsAlive Then
-					newThread = True
-				End If
-				countLock.Unlock()
-				If newThread Then
-					threadsLock.Lock()
-					threads.AddLast(New TPooledThread(Self, _processThread))
-					threadsLock.Unlock()
-				End If
+			doExecute(command)
+		End If
+	End Method
+
+Private
+	Method doExecute(command:TRunnable)
+		If maxThreads < 0 Then
+			Local newThread:Int
+			countLock.Lock()
+			If threadsWorking = threadsAlive Then
+				newThread = True
+			End If
+			countLock.Unlock()
+			If newThread Then
+				threadsLock.Lock()
+				threads.AddLast(New TPooledThread(Self, _processThread))
+				threadsLock.Unlock()
 			End If
-			jobQueue.Lock()
-			jobQueue.Add(command)
-			jobQueue.Unlock()
 		End If
+		jobQueue.Lock()
+		jobQueue.Add(command)
+		jobQueue.Unlock()
 	End Method
+Public
 
 	Rem
 	bbdoc: Creates an executor that uses a single worker thread operating off an unbounded queue.
@@ -279,6 +287,202 @@ Type TThreadPoolExecutor Extends TExecutor
 	end method
 End Type
 
+Rem
+bbdoc: A unit of date-time, such as Days or Hours.
+End Rem
+Enum ETimeUnit
+	Milliseconds
+	Seconds
+	Minutes
+	Hours
+	Days
+End Enum
+
+Rem
+bbdoc: An executor that can be used to schedule commands to run after a given delay, or to execute commands periodically.
+End Rem
+Type TScheduledThreadPoolExecutor Extends TThreadPoolExecutor
+
+	Field tasks:TScheduledTask
+
+	Field taskMutex:TMutex
+	Field taskCond:TCondVar
+
+	Field schedulerThread:TThread
+
+	Method New(initial:Int, idleWait:Int = 0)
+		Super.New(initial, idleWait)
+		taskMutex = TMutex.Create()
+		taskCond = TCondVar.Create()
+
+		schedulerThread = CreateThread(taskScheduler, Self)
+	End Method
+
+	Method schedule(command:TRunnable, delay_:Int, unit:ETimeUnit = ETimeUnit.Milliseconds)
+		schedule(command, ULong(delay_), 0, unit)
+	End Method
+
+	Method schedule(command:TRunnable, initialDelay:Int, period:Int, unit:ETimeUnit = ETimeUnit.Milliseconds)
+		schedule(command, ULong(initialDelay), ULong(period), unit)
+	End Method
+	
+	Rem
+	bbdoc: Schedules a one-shot command to run after a given delay.
+	End Rem
+	Method schedule(command:TRunnable, delay_:ULong, unit:ETimeUnit = ETimeUnit.Milliseconds)
+		schedule(command, delay_, 0, unit)
+	End Method
+
+	Rem
+	bbdoc: Schedules a recurring command to run after a given initial delay, and subsequently with the given period.
+	End Rem
+	Method schedule(command:TRunnable, initialDelay:ULong, period:ULong, unit:ETimeUnit = ETimeUnit.Milliseconds)
+			Local now:ULong = CurrentUnixTime()
+
+		Local newTask:TScheduledTask = New TScheduledTask
+
+		Local delayMs:ULong = initialDelay
+		Local periodMs:ULong = period
+		Select unit
+			Case ETimeUnit.Seconds
+				delayMs :* 1000
+				periodMs :* 1000
+			Case ETimeUnit.Minutes
+				delayMs :* 60000
+				periodMs :* 60000
+			Case ETimeUnit.Hours
+				delayMs :* 3600000
+				periodMs :* 3600000
+			Case ETimeUnit.Days
+				delayMs :* 86400000
+				periodMs :* 86400000
+		End Select
+
+		newTask.executeAt = now + delayMs
+		newTask.intervalMs = periodMs
+		newTask.command = command
+		
+		taskMutex.Lock()
+		
+		insertTask(newTask)
+
+		taskMutex.Unlock()
+		
+	End Method
+
+	Rem
+	bbdoc: Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
+	End Rem
+	Method shutdown() Override
+		isShutdown = True
+		schedulerThread.Wait()
+		Super.shutdown()
+	End Method
+
+Private
+	Method insertTask(newTask:TScheduledTask)
+		Local headChanged:Int = False
+		If Not tasks Or newTask.executeAt < tasks.executeAt Then
+			newTask.nextTask = tasks
+			tasks = newTask
+			headChanged = True
+		Else
+			Local current:TScheduledTask = tasks
+			While current.nextTask And current.nextTask.executeAt < newTask.executeAt
+				current = current.nextTask
+			Wend
+			newTask.nextTask = current.nextTask
+			current.nextTask = newTask
+		End If
+
+		If headChanged Then
+			taskCond.Signal()
+		End If
+	End Method
+
+	Function taskScheduler:Object( data:Object )
+		Local exec:TScheduledThreadPoolExecutor = TScheduledThreadPoolExecutor(data)
+
+		While True
+
+			exec.taskMutex.Lock()
+
+			While Not exec.tasks
+				
+				If exec.isShutdown Then
+					exec.taskMutex.Unlock()
+					Return Null
+				End If
+
+				exec.taskCond.Wait(exec.taskMutex)
+			Wend
+
+			Local now:ULong = CurrentUnixTime()
+
+			If now < exec.tasks.executeAt Then
+				' Wait until the next task is due or a new task is scheduled
+				exec.taskCond.TimedWait(exec.taskMutex, Int(exec.tasks.executeAt - now))
+			End If
+
+			now = CurrentUnixTime()
+
+			While exec.tasks And exec.tasks.executeAt <= now
+				Local task:TScheduledTask = exec.tasks
+
+				exec.doExecute(task.command)
+
+				If task.intervalMs And Not exec.isShutdown Then
+					' If the task is recurring, reschedule it, unless the executor is shutting down
+					task.executeAt = now + task.intervalMs
+					exec.tasks = task.nextTask
+					exec.insertTask(task)
+				Else
+					' Otherwise, remove it from the list
+					exec.tasks = task.nextTask
+				End If
+			Wend
+
+			exec.taskMutex.Unlock()
+		Wend
+	End Function
+Public
+
+Rem
+	bbdoc: Creates an executor that uses a single worker thread operating off an unbounded queue.
+	End Rem
+	Function newSingleThreadExecutor:TScheduledThreadPoolExecutor()
+		Return New TScheduledThreadPoolExecutor(1)
+	End Function
+	
+	Rem
+	bbdoc: Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
+	about: At any point, at most @threads threads will be active processing tasks. If additional tasks are
+	submitted when all threads are active, they will wait in the queue until a thread is available.
+	End Rem
+	Function newFixedThreadPool:TScheduledThreadPoolExecutor(threads:Int)
+		Assert threads > 0
+		Return New TScheduledThreadPoolExecutor(threads)
+	End Function
+	
+	Rem
+	bbdoc: Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.
+	about: These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.
+	Threads that remain idle for more than the specified @idleWait time will be terminated and removed from the pool.
+	End Rem
+	Function newCachedThreadPool:TScheduledThreadPoolExecutor(idleWait:Int = 60000)
+		Return New TScheduledThreadPoolExecutor(-1, idleWait)
+	End Function
+End Type
+
+Type TScheduledTask
+	Field executeAt:Long ' the time to execute the task, in ms since the epoch
+	Field command:TRunnable
+	
+	Field intervalMs:ULong ' zero for one-shot tasks
+
+	Field nextTask:TScheduledTask
+End Type
+
 Private
 
 Type TBinarySemaphore