- 類型:服務(wù)器區(qū)大。21KB語言:中文 評分:6.6
- 標簽:
立即下載
Delphi">Delphi什么樣的線程池更好呢?我覺得使用起來要可靠,并且一定要簡單,這樣才是更好的。我寫的線程池就是這樣一個標準,使用非常簡單,只傳入自己要執(zhí)行的方法就可以了,其實大家最后就是關(guān)注自己要操作的方法,其余的交給線程池。
unit uThreadPool;
{ aPool.AddRequest(TMyRequest.Create(RequestParam1, RequestParam2, ...)); }
interface
uses
Windows,
Classes;// 是否記錄日志
// {$DEFINE NOLOGS}
type
TCriticalSection = class(TObject)
protected
FSection: TRTLCriticalSection;
public
constructor Create;
destructor Destroy; override;
// 進入臨界區(qū)
procedure Enter;
// 離開臨界區(qū)
procedure Leave;
// 嘗試進入
function TryEnter: Boolean;
end;
type
// 儲存請求數(shù)據(jù)的基本類
TWorkItem = class(TObject)
public
// 是否有重復(fù)任務(wù)
function IsTheSame(DataObj: TWorkItem): Boolean; virtual;
// 如果 NOLOGS 被定義,則禁用。
function TextForLog: string; virtual;
end;
type
TThreadsPool = class;
//線程狀態(tài)
TThreadState = (tcsInitializing, tcsWaiting, tcsGetting, tcsProcessing,
tcsProcessed, tcsTerminating, tcsCheckingDown);
// 工作線程僅用于線程池內(nèi), 不要直接創(chuàng)建并調(diào)用它。
TProcessorThread = class(TThread)
private
// 創(chuàng)建線程時臨時的Event對象, 阻塞線程直到初始化完成
hInitFinished: THandle;
// 初始化出錯信息
sInitError: string;
// 記錄日志
procedure WriteLog(const Str: string; Level: Integer = 0);
protected
// 線程臨界區(qū)同步對像
csProcessingDataObject: TCriticalSection;
// 平均處理時間
FAverageProcessing: Integer;
// 等待請求的平均時間
FAverageWaitingTime: Integer;
// 本線程實例的運行狀態(tài)
FCurState: TThreadState;
// 本線程實例所附屬的線程池
FPool: TThreadsPool;
// 當前處理的數(shù)據(jù)對像。
FProcessingDataObject: TWorkItem;
// 線程停止 Event, TProcessorThread.Terminate 中開綠燈
hThreadTerminated: THandle;
uProcessingStart: DWORD;
// 開始等待的時間, 通過 GetTickCount 取得。
uWaitingStart: DWORD;
// 計算平均工作時間
function AverageProcessingTime: DWORD;
// 計算平均等待時間
function AverageWaitingTime: DWORD;
procedure Execute; override;
function IamCurrentlyProcess(DataObj: TWorkItem): Boolean;
// 轉(zhuǎn)換枚舉類型的線程狀態(tài)為字串類型
function InfoText: string;
// 線程是否長時間處理同一個請求?(已死掉?)
function IsDead: Boolean;
// 線程是否已完成當成任務(wù)
function isFinished: Boolean;
// 線程是否處于空閑狀態(tài)
function isIdle: Boolean;
// 平均值校正計算。
function NewAverage(OldAvg, NewVal: Integer): Integer;
public
Tag: Integer;
constructor Create(APool: TThreadsPool);
destructor Destroy; override;
procedure Terminate;
end;
// 線程初始化時觸發(fā)的事件
TProcessorThreadInitializing = procedure(Sender: TThreadsPool; aThread:
TProcessorThread) of object;
// 線程結(jié)束時觸發(fā)的事件
TProcessorThreadFinalizing = procedure(Sender: TThreadsPool; aThread:
TProcessorThread) of object;
// 線程處理請求時觸發(fā)的事件
TProcessRequest = procedure(Sender: TThreadsPool; WorkItem: TWorkItem;
aThread: TProcessorThread) of object;
TEmptyKind = (
ekQueueEmpty, //任務(wù)被取空后
ekProcessingFinished // 最后一個任務(wù)處理完畢后
);
// 任務(wù)隊列空時觸發(fā)的事件
TQueueEmpty = procedure(Sender: TThreadsPool; EmptyKind: TEmptyKind) of
object;
TThreadsPool = class(TComponent)
private
csQueueManagment: TCriticalSection;
csThreadManagment: TCriticalSection;
FProcessRequest: TProcessRequest;
FQueue: TList;
FQueueEmpty: TQueueEmpty;
// 線程超時閥值
FThreadDeadTimeout: DWORD;
FThreadFinalizing: TProcessorThreadFinalizing;
FThreadInitializing: TProcessorThreadInitializing;
// 工作中的線程
FThreads: TList;
// 執(zhí)行了 terminat 發(fā)送退出指令, 正在結(jié)束的線程.
FThreadsKilling: TList;
// 最少, 最大線程數(shù)
FThreadsMax: Integer;
// 最少, 最大線程數(shù)
FThreadsMin: Integer;
// 池平均等待時間
function PoolAverageWaitingTime: Integer;
procedure WriteLog(const Str: string; Level: Integer = 0);
protected
FLastGetPoint: Integer;
// Semaphore, 統(tǒng)計任務(wù)隊列
hSemRequestCount: THandle;
// Waitable timer. 每30觸發(fā)一次的時間量同步
hTimCheckPoolDown: THandle;
// 線程池停機(檢查并清除空閑線程和死線程)
procedure CheckPoolDown;
// 清除死線程,并補充不足的工作線程
procedure CheckThreadsForGrow;
procedure DoProcessed;
procedure DoProcessRequest(aDataObj: TWorkItem; aThread: TProcessorThread);
virtual;
procedure DoQueueEmpty(EmptyKind: TEmptyKind); virtual;
procedure DoThreadFinalizing(aThread: TProcessorThread); virtual;
// 執(zhí)行事件
procedure DoThreadInitializing(aThread: TProcessorThread); virtual;
// 釋放 FThreadsKilling 列表中的線程
procedure FreeFinishedThreads;
// 申請任務(wù)
procedure GetRequest(out Request: TWorkItem);
// 清除死線程
procedure KillDeadThreads;
public
constructor Create(AOwner: TComponent); override;
destructor Destroy; override;
// 就進行任務(wù)是否重復(fù)的檢查, 檢查發(fā)現(xiàn)重復(fù)就返回 False
function AddRequest(aDataObject: TWorkItem; CheckForDoubles: Boolean =
False): Boolean; overload;
// 轉(zhuǎn)換枚舉類型的線程狀態(tài)為字串類型
function InfoText: string;
published
// 線程處理任務(wù)時觸發(fā)的事件
property OnProcessRequest: TProcessRequest read FProcessRequest write
FProcessRequest;
// 任務(wù)列表為空時解發(fā)的事件
property OnQueueEmpty: TQueueEmpty read FQueueEmpty write FQueueEmpty;
// 線程結(jié)束時觸發(fā)的事件
property OnThreadFinalizing: TProcessorThreadFinalizing read
FThreadFinalizing write FThreadFinalizing;
// 線程初始化時觸發(fā)的事件
property OnThreadInitializing: TProcessorThreadInitializing read
FThreadInitializing write FThreadInitializing;
// 線程超時值(毫秒), 如果處理超時,將視為死線程
property ThreadDeadTimeout: DWORD read FThreadDeadTimeout write
FThreadDeadTimeout default 0;
// 最大線程數(shù)
property ThreadsMax: Integer read FThreadsMax write FThreadsMax default 1;
// 最小線程數(shù)
property ThreadsMin: Integer read FThreadsMin write FThreadsMin default 0;
end;
type
//日志記志函數(shù)
TLogWriteProc = procedure(
const Str: string; //日志
LogID: Integer = 0;
Level: Integer = 0 //Level = 0 - 跟蹤信息, 10 - 致命錯誤
);
var
WriteLog: TLogWriteProc; // 如果存在實例就寫日志
implementation
uses
SysUtils;
// 儲存請求數(shù)據(jù)的基本類
{
********************************** TWorkItem ***********************************
}
function TWorkItem.IsTheSame(DataObj: TWorkItem): Boolean;
begin
Result := False;
end; { TWorkItem.IsTheSame }
function TWorkItem.TextForLog: string;
begin
Result := 'Request';
end; { TWorkItem.TextForLog }
{
********************************* TThreadsPool *********************************
}
constructor TThreadsPool.Create(AOwner: TComponent);
var
DueTo: Int64;
begin
{$IFNDEF NOLOGS}
WriteLog('創(chuàng)建線程池', 5);
{$ENDIF}
inherited;
csQueueManagment := TCriticalSection.Create;
FQueue := TList.Create;
csThreadManagment := TCriticalSection.Create;
FThreads := TList.Create;
FThreadsKilling := TList.Create;
FThreadsMin := 0;
FThreadsMax := 1;
FThreadDeadTimeout := 0;
FLastGetPoint := 0;
//
hSemRequestCount := CreateSemaphore(nil, 0, $7FFFFFFF, nil);
DueTo := -1;
//可等待的定時器(只用于Window NT4或更高)
hTimCheckPoolDown := CreateWaitableTimer(nil, False, nil);
if hTimCheckPoolDown = 0 then // Win9x不支持
// In Win9x number of thread will be never decrised
hTimCheckPoolDown := CreateEvent(nil, False, False, nil)
else
SetWaitableTimer(hTimCheckPoolDown, DueTo, 30000, nil, nil, False);
end; { TThreadsPool.Create }
destructor TThreadsPool.Destroy;
var
n, i: Integer;
Handles: array of THandle;
begin
{$IFNDEF NOLOGS}
WriteLog('線程池銷毀', 5);
{$ENDIF}
csThreadManagment.Enter;
SetLength(Handles, FThreads.Count);
n := 0;
for i := 0 to FThreads.Count - 1 do
if FThreads[i] <> nil then
begin
Handles[n] := TProcessorThread(FThreads[i]).Handle;
TProcessorThread(FThreads[i]).Terminate;
Inc(n);
end;
csThreadManagment.Leave; // lixiaoyu 添加于 2009.1.6,如沒有此行代碼無法成功釋放正在執(zhí)行中的工作者線程,死鎖。
WaitForMultipleObjects(n, @Handles[0], True, 30000); // 等待工作者線程執(zhí)行終止 lixiaoyu 注釋于 2009.1.6
csThreadManagment.Enter; // lixiaoyu 添加于 2009.1.6 再次進入鎖定,并釋放資源
for i := 0 to FThreads.Count - 1 do
TProcessorThread(FThreads[i]).Free;
FThreads.Free;
FThreadsKilling.Free;
csThreadManagment.Free;
csQueueManagment.Enter;
for i := FQueue.Count - 1 downto 0 do
TObject(FQueue[i]).Free;
FQueue.Free;
csQueueManagment.Free;
CloseHandle(hSemRequestCount);
CloseHandle(hTimCheckPoolDown);
inherited;
end; { TThreadsPool.Destroy }
function TThreadsPool.AddRequest(aDataObject: TWorkItem; CheckForDoubles:
Boolean = False): Boolean;
var
i: Integer;
begin
{$IFNDEF NOLOGS}
WriteLog('AddRequest(' + aDataObject.TextForLog + ')', 2);
{$ENDIF}
Result := False;
csQueueManagment.Enter;
try
// 如果 CheckForDoubles = TRUE
// 則進行任務(wù)是否重復(fù)的檢查
if CheckForDoubles then
for i := 0 to FQueue.Count - 1 do
if (FQueue[i] <> nil)
and aDataObject.IsTheSame(TWorkItem(FQueue[i])) then
Exit; // 發(fā)現(xiàn)有相同的任務(wù)
csThreadManagment.Enter;
try
// 清除死線程,并補充不足的工作線程
CheckThreadsForGrow;
// 如果 CheckForDoubles = TRUE
// 則檢查是否有相同的任務(wù)正在處理中
if CheckForDoubles then
for i := 0 to FThreads.Count - 1 do
if TProcessorThread(FThreads[i]).IamCurrentlyProcess(aDataObject) then
Exit; // 發(fā)現(xiàn)有相同的任務(wù)
finally
csThreadManagment.Leave;
end;
//將任務(wù)加入隊列
FQueue.Add(aDataObject);
//釋放一個同步信號量
ReleaseSemaphore(hSemRequestCount, 1, nil);
{$IFNDEF NOLOGS}
WriteLog('釋放一個同步信號量)', 1);
{$ENDIF}
Result := True;
finally
csQueueManagment.Leave;
end;
{$IFNDEF NOLOGS}
//調(diào)試信息
WriteLog('增加一個任務(wù)(' + aDataObject.TextForLog + ')', 1);
{$ENDIF}
end; { TThreadsPool.AddRequest }
{
函 數(shù) 名:TThreadsPool.CheckPoolDown
功能描述:線程池停機(檢查并清除空閑線程和死線程)
輸入?yún)?shù):無
返 回 值: 無
創(chuàng)建日期:2006.10.22 11:31
修改日期:2006.
作 者:Kook
附加說明:
}
procedure TThreadsPool.CheckPoolDown;
var
i: Integer;
begin
{$IFNDEF NOLOGS}
WriteLog('TThreadsPool.CheckPoolDown', 1);
{$ENDIF}
csThreadManagment.Enter;
try
{$IFNDEF NOLOGS}
WriteLog(InfoText, 2);
{$ENDIF}
// 清除死線程
KillDeadThreads;
// 釋放 FThreadsKilling 列表中的線程
FreeFinishedThreads;
// 如果線程空閑,就終止它
for i := FThreads.Count - 1 downto FThreadsMin do
if TProcessorThread(FThreads[i]).isIdle then
begin
//發(fā)出終止命令
TProcessorThread(FThreads[i]).Terminate;
//加入待清除隊列
FThreadsKilling.Add(FThreads[i]);
//從工作隊列中除名
FThreads.Delete(i);
//todo: ??
Break;
end;
finally
csThreadManagment.Leave;
end;
end; { TThreadsPool.CheckPoolDown }
{
函 數(shù) 名:TThreadsPool.CheckThreadsForGrow
功能描述:清除死線程,并補充不足的工作線程
輸入?yún)?shù):無
返 回 值: 無
創(chuàng)建日期:2006.10.22 11:31
修改日期:2006.
作 者:Kook
附加說明:
}
procedure TThreadsPool.CheckThreadsForGrow;
var
AvgWait: Integer;
i: Integer;
begin
{
New thread created if:
新建線程的條件:
1. 工作線程數(shù)小于最小線程數(shù)
2. 工作線程數(shù)小于最大線程數(shù) and 線程池平均等待時間 < 100ms(系統(tǒng)忙)
3. 任務(wù)大于工作線程數(shù)的4倍
}
csThreadManagment.Enter;
try
KillDeadThreads;
if FThreads.Count < FThreadsMin then
begin
{$IFNDEF NOLOGS}
WriteLog('工作線程數(shù)小于最小線程數(shù)', 4);
{$ENDIF}
for i := FThreads.Count to FThreadsMin - 1 do
try
FThreads.Add(TProcessorThread.Create(Self));
except
on e: Exception do
WriteLog(
'TProcessorThread.Create raise: ' + e.ClassName + #13#10#9'Message: '
+ e.Message,
9
);
end
end
else if FThreads.Count < FThreadsMax then
begin
{$IFNDEF NOLOGS}
WriteLog('工作線程數(shù)小于最大線程數(shù) and 線程池平均等待時間 < 100ms', 3);
{$ENDIF}
AvgWait := PoolAverageWaitingTime;
{$IFNDEF NOLOGS}
WriteLog(Format(
'FThreads.Count (%d)<FThreadsMax(%d), AvgWait=%d',
[FThreads.Count, FThreadsMax, AvgWait]),
4
);
{$ENDIF}
if AvgWait < 100 then
try
FThreads.Add(TProcessorThread.Create(Self));
except
on e: Exception do
WriteLog(
'TProcessorThread.Create raise: ' + e.ClassName +
#13#10#9'Message: ' + e.Message,
9
);
end;
end;
finally
csThreadManagment.Leave;
end;
end; { TThreadsPool.CheckThreadsForGrow }
procedure TThreadsPool.DoProcessed;
var
i: Integer;
begin
if (FLastGetPoint < FQueue.Count) then
Exit;
csThreadManagment.Enter;
try
for i := 0 to FThreads.Count - 1 do
if TProcessorThread(FThreads[i]).FCurState in [tcsProcessing] then
Exit;
finally
csThreadManagment.Leave;
end;
DoQueueEmpty(ekProcessingFinished);
end; { TThreadsPool.DoProcessed }
procedure TThreadsPool.DoProcessRequest(aDataObj: TWorkItem; aThread:
TProcessorThread);
begin
if Assigned(FProcessRequest) then
FProcessRequest(Self, aDataObj, aThread);
end; { TThreadsPool.DoProcessRequest }
procedure TThreadsPool.DoQueueEmpty(EmptyKind: TEmptyKind);
begin
if Assigned(FQueueEmpty) then
FQueueEmpty(Self, EmptyKind);
end; { TThreadsPool.DoQueueEmpty }
procedure TThreadsPool.DoThreadFinalizing(aThread: TProcessorThread);
begin
if Assigned(FThreadFinalizing) then
FThreadFinalizing(Self, aThread);
end; { TThreadsPool.DoThreadFinalizing }
procedure TThreadsPool.DoThreadInitializing(aThread: TProcessorThread);
begin
if Assigned(FThreadInitializing) then
FThreadInitializing(Self, aThread);
end; { TThreadsPool.DoThreadInitializing }
{
函 數(shù) 名:TThreadsPool.FreeFinishedThreads
功能描述:釋放 FThreadsKilling 列表中的線程
輸入?yún)?shù):無
返 回 值: 無
創(chuàng)建日期:2006.10.22 11:34
修改日期:2006.
作 者:Kook
附加說明:
}
procedure TThreadsPool.FreeFinishedThreads;
var
i: Integer;
begin
if csThreadManagment.TryEnter then
try
for i := FThreadsKilling.Count - 1 downto 0 do
if TProcessorThread(FThreadsKilling[i]).isFinished then
begin
TProcessorThread(FThreadsKilling[i]).Free;
FThreadsKilling.Delete(i);
end;
finally
csThreadManagment.Leave
end;
end; { TThreadsPool.FreeFinishedThreads }
{
函 數(shù) 名:TThreadsPool.GetRequest
功能描述:申請任務(wù)
輸入?yún)?shù):out Request: TRequestDataObject
返 回 值: 無
創(chuàng)建日期:2006.10.22 11:34
修改日期:2006.
作 者:Kook
附加說明:
}
procedure TThreadsPool.GetRequest(out Request: TWorkItem);
begin
{$IFNDEF NOLOGS}
WriteLog('申請任務(wù)', 2);
{$ENDIF}
csQueueManagment.Enter;
try
//跳過空的隊列元素
while (FLastGetPoint < FQueue.Count) and (FQueue[FLastGetPoint] = nil) do
Inc(FLastGetPoint);
Assert(FLastGetPoint < FQueue.Count);
//壓縮隊列,清除空元素
if (FQueue.Count > 127) and (FLastGetPoint >= (3 * FQueue.Count) div 4) then
begin
{$IFNDEF NOLOGS}
WriteLog('FQueue.Pack', 1);
{$ENDIF}
FQueue.Pack;
FLastGetPoint := 0;
end;
Request := TWorkItem(FQueue[FLastGetPoint]);
FQueue[FLastGetPoint] := nil;
inc(FLastGetPoint);
if (FLastGetPoint = FQueue.Count) then //如果隊列中無任務(wù)
begin
DoQueueEmpty(ekQueueEmpty);
FQueue.Clear;
FLastGetPoint := 0;
end;
finally
csQueueManagment.Leave;
end;
end; { TThreadsPool.GetRequest }
function TThreadsPool.InfoText: string;
begin
Result := '';
//end;
//{$ELSE}
//var
// i: Integer;
//begin
// csQueueManagment.Enter;
// csThreadManagment.Enter;
// try
// if (FThreads.Count = 0) and (FThreadsKilling.Count = 1) and
// TProcessorThread(FThreadsKilling[0]).isFinished then
// FreeFinishedThreads;
//
// Result := Format(
// 'Pool thread: Min=%d, Max=%d, WorkingThreadsCount=%d, TerminatedThreadCount=%d, QueueLength=%d'#13#10,
// [ThreadsMin, ThreadsMax, FThreads.Count, FThreadsKilling.Count,
// FQueue.Count]
// );
// if FThreads.Count > 0 then
// Result := Result + 'Working threads:'#13#10;
// for i := 0 to FThreads.Count - 1 do
// Result := Result + TProcessorThread(FThreads[i]).InfoText + #13#10;
// if FThreadsKilling.Count > 0 then
// Result := Result + 'Terminated threads:'#13#10;
// for i := 0 to FThreadsKilling.Count - 1 do
// Result := Result + TProcessorThread(FThreadsKilling[i]).InfoText + #13#10;
// finally
// csThreadManagment.Leave;
// csQueueManagment.Leave;
// end;
//end;
//{$ENDIF}
end; { TThreadsPool.InfoText }
{
函 數(shù) 名:TThreadsPool.KillDeadThreads
功能描述:清除死線程
輸入?yún)?shù):無
返 回 值: 無
創(chuàng)建日期:2006.10.22 11:32
修改日期:2006.
作 者:Kook
附加說明:
}
procedure TThreadsPool.KillDeadThreads;
var
i: Integer;
begin
// Check for dead threads
if csThreadManagment.TryEnter then
try
for i := 0 to FThreads.Count - 1 do
if TProcessorThread(FThreads[i]).IsDead then
begin
// Dead thread moverd to other list.
// New thread created to replace dead one
TProcessorThread(FThreads[i]).Terminate;
FThreadsKilling.Add(FThreads[i]);
try
FThreads[i] := TProcessorThread.Create(Self);
except
on e: Exception do
begin
FThreads[i] := nil;
{$IFNDEF NOLOGS}
WriteLog(
'TProcessorThread.Create raise: ' + e.ClassName +
#13#10#9'Message: ' + e.Message,
9
);
{$ENDIF}
end;
end;
end;
finally
csThreadManagment.Leave
end;
end; { TThreadsPool.KillDeadThreads }
function TThreadsPool.PoolAverageWaitingTime: Integer;
var
i: Integer;
begin
Result := 0;
if FThreads.Count > 0 then
begin
for i := 0 to FThreads.Count - 1 do
Inc(result, TProcessorThread(FThreads[i]).AverageWaitingTime);
Result := Result div FThreads.Count
end
else
Result := 1;
end; { TThreadsPool.PoolAverageWaitingTime }
procedure TThreadsPool.WriteLog(const Str: string; Level: Integer = 0);
begin
{$IFNDEF NOLOGS}
uThreadPool.WriteLog(Str, 0, Level);
{$ENDIF}
end; { TThreadsPool.WriteLog }
// 工作線程僅用于線程池內(nèi), 不要直接創(chuàng)建并調(diào)用它。
{
******************************* TProcessorThread *******************************
}
constructor TProcessorThread.Create(APool: TThreadsPool);
begin
WriteLog('創(chuàng)建工作線程', 5);
inherited Create(True);
FPool := aPool;
FAverageWaitingTime := 1000;
FAverageProcessing := 3000;
sInitError := '';
{
各參數(shù)的意義如下:
參數(shù)一:填上 nil 即可。
參數(shù)二:是否采用手動調(diào)整燈號。
參數(shù)三:燈號的起始狀態(tài),False 表示紅燈。
參數(shù)四:Event 名稱, 對象名稱相同的話,會指向同一個對象,所以想要有兩個Event對象,便要有兩個不同的名稱(這名稱以字符串來存.為NIL的話系統(tǒng)每次會自己創(chuàng)建一個不同的名字,就是被次創(chuàng)建的都是新的EVENT)。
傳回值:Event handle。
}
hInitFinished := CreateEvent(nil, True, False, nil);
hThreadTerminated := CreateEvent(nil, True, False, nil);
csProcessingDataObject := TCriticalSection.Create;
try
WriteLog('TProcessorThread.Create::Resume', 3);
Resume;
//阻塞, 等待初始化完成
WaitForSingleObject(hInitFinished, INFINITE);
if sInitError <> '' then
raise Exception.Create(sInitError);
finally
CloseHandle(hInitFinished);
end;
WriteLog('TProcessorThread.Create::Finished', 3);
end; { TProcessorThread.Create }
destructor TProcessorThread.Destroy;
begin
WriteLog('工作線程銷毀', 5);
CloseHandle(hThreadTerminated);
csProcessingDataObject.Free;
inherited;
end; { TProcessorThread.Destroy }
function TProcessorThread.AverageProcessingTime: DWORD;
begin
if (FCurState in [tcsProcessing]) then
Result := NewAverage(FAverageProcessing, GetTickCount - uProcessingStart)
else
Result := FAverageProcessing
end; { TProcessorThread.AverageProcessingTime }
function TProcessorThread.AverageWaitingTime: DWORD;
begin
if (FCurState in [tcsWaiting, tcsCheckingDown]) then
Result := NewAverage(FAverageWaitingTime, GetTickCount - uWaitingStart)
else
Result := FAverageWaitingTime
end; { TProcessorThread.AverageWaitingTime }
procedure TProcessorThread.Execute;
type
THandleID = (hidTerminateThread, hidRequest, hidCheckPoolDown);
var
WaitedTime: Integer;
Handles: array[THandleID] of THandle;
begin
WriteLog('工作線程進常運行', 3);
//當前狀態(tài):初始化
FCurState := tcsInitializing;
try
//執(zhí)行外部事件
FPool.DoThreadInitializing(Self);
except
on e: Exception do
sInitError := e.Message;
end;
//初始化完成,初始化Event綠燈
SetEvent(hInitFinished);
WriteLog('TProcessorThread.Execute::Initialized', 3);
//引用線程池的同步 Event
Handles[hidTerminateThread] := hThreadTerminated;
Handles[hidRequest] := FPool.hSemRequestCount;
Handles[hidCheckPoolDown] := FPool.hTimCheckPoolDown;
//時間戳,
//todo: 好像在線程中用 GetTickCount; 會不正常
uWaitingStart := GetTickCount;
//任務(wù)置空
FProcessingDataObject := nil;
//大巡環(huán)
while not terminated do
begin
//當前狀態(tài):等待
FCurState := tcsWaiting;
//阻塞線程,使線程休眠
case WaitForMultipleObjects(Length(Handles), @Handles, False, INFINITE) -
WAIT_OBJECT_0 of
WAIT_OBJECT_0 + ord(hidTerminateThread):
begin
WriteLog('TProcessorThread.Execute:: Terminate event signaled ', 5);
//當前狀態(tài):正在終止線程
FCurState := tcsTerminating;
//退出大巡環(huán)(結(jié)束線程)
Break;
end;
WAIT_OBJECT_0 + ord(hidRequest):
begin
WriteLog('TProcessorThread.Execute:: Request semaphore signaled ', 3);
//等待的時間
WaitedTime := GetTickCount - uWaitingStart;
//重新計算平均等待時間
FAverageWaitingTime := NewAverage(FAverageWaitingTime, WaitedTime);
//當前狀態(tài):申請任務(wù)
FCurState := tcsGetting;
//如果等待時間過短,則檢查工作線程是否足夠
if WaitedTime < 5 then
FPool.CheckThreadsForGrow;
//從線程池的任務(wù)隊列中得到任務(wù)
FPool.GetRequest(FProcessingDataObject);
//開始處理的時間戳
uProcessingStart := GetTickCount;
//當前狀態(tài):執(zhí)行任務(wù)
FCurState := tcsProcessing;
try
{$IFNDEF NOLOGS}
WriteLog('Processing: ' + FProcessingDataObject.TextForLog, 2);
{$ENDIF}
//執(zhí)行任務(wù)
FPool.DoProcessRequest(FProcessingDataObject, Self);
except
on e: Exception do
WriteLog(
'OnProcessRequest for ' + FProcessingDataObject.TextForLog +
#13#10'raise Exception: ' + e.Message,
8
);
end;
//釋放任務(wù)對象
csProcessingDataObject.Enter;
try
FProcessingDataObject.Free;
FProcessingDataObject := nil;
finally
csProcessingDataObject.Leave;
end;
//重新計算
FAverageProcessing := NewAverage(FAverageProcessing, GetTickCount -
uProcessingStart);
//當前狀態(tài):執(zhí)行任務(wù)完畢
FCurState := tcsProcessed;
//執(zhí)行線程外事件
FPool.DoProcessed;
uWaitingStart := GetTickCount;
end;
WAIT_OBJECT_0 + ord(hidCheckPoolDown):
begin
// !!! Never called under Win9x
WriteLog('TProcessorThread.Execute:: CheckPoolDown timer signaled ',
4);
//當前狀態(tài):線程池停機(檢查并清除空閑線程和死線程)
FCurState := tcsCheckingDown;
FPool.CheckPoolDown;
end;
end;
end;
FCurState := tcsTerminating;
FPool.DoThreadFinalizing(Self);
end; { TProcessorThread.Execute }
function TProcessorThread.IamCurrentlyProcess(DataObj: TWorkItem): Boolean;
begin
csProcessingDataObject.Enter;
try
Result := (FProcessingDataObject <> nil) and
DataObj.IsTheSame(FProcessingDataObject);
finally
csProcessingDataObject.Leave;
end;
end; { TProcessorThread.IamCurrentlyProcess }
function TProcessorThread.InfoText: string;
const
ThreadStateNames: array[TThreadState] of string =
(
'tcsInitializing',
'tcsWaiting',
'tcsGetting',
'tcsProcessing',
'tcsProcessed',
'tcsTerminating',
'tcsCheckingDown'
);
begin
{$IFNDEF NOLOGS}
Result := Format(
'%5d: %15s, AverageWaitingTime=%6d, AverageProcessingTime=%6d',
[ThreadID, ThreadStateNames[FCurState], AverageWaitingTime,
AverageProcessingTime]
);
case FCurState of
tcsWaiting:
Result := Result + ', WaitingTime=' + IntToStr(GetTickCount -
uWaitingStart);
tcsProcessing:
Result := Result + ', ProcessingTime=' + IntToStr(GetTickCount -
uProcessingStart);
end;
csProcessingDataObject.Enter;
try
if FProcessingDataObject <> nil then
Result := Result + ' ' + FProcessingDataObject.TextForLog;
finally
csProcessingDataObject.Leave;
end;
{$ENDIF}
end; { TProcessorThread.InfoText }
function TProcessorThread.IsDead: Boolean;
begin
Result :=
Terminated or
(FPool.ThreadDeadTimeout > 0) and (FCurState = tcsProcessing) and
(GetTickCount - uProcessingStart > FPool.ThreadDeadTimeout);
if Result then
WriteLog('Thread dead', 5);
end; { TProcessorThread.IsDead }
function TProcessorThread.isFinished: Boolean;
begin
Result := WaitForSingleObject(Handle, 0) = WAIT_OBJECT_0;
end; { TProcessorThread.isFinished }
function TProcessorThread.isIdle: Boolean;
begin
// 如果線程狀態(tài)是 tcsWaiting, tcsCheckingDown
// 并且 空間時間 > 100ms,
// 并且 平均等候任務(wù)時間大于平均工作時間的 50%
// 則視為空閑。
Result :=
(FCurState in [tcsWaiting, tcsCheckingDown]) and
(AverageWaitingTime > 100) and
(AverageWaitingTime * 2 > AverageProcessingTime);
end; { TProcessorThread.isIdle }
function TProcessorThread.NewAverage(OldAvg, NewVal: Integer): Integer;
begin
Result := (OldAvg * 2 + NewVal) div 3;
end; { TProcessorThread.NewAverage }
procedure TProcessorThread.Terminate;
begin
WriteLog('TProcessorThread.Terminate', 5);
inherited Terminate;
SetEvent(hThreadTerminated);
end; { TProcessorThread.Terminate }
procedure TProcessorThread.WriteLog(const Str: string; Level: Integer = 0);
begin
{$IFNDEF NOLOGS}
uThreadPool.WriteLog(Str, ThreadID, Level);
{$ENDIF}
end; { TProcessorThread.WriteLog }
{
******************************* TCriticalSection *******************************
}
constructor TCriticalSection.Create;
begin
InitializeCriticalSection(FSection);
end; { TCriticalSection.Create }
destructor TCriticalSection.Destroy;
begin
DeleteCriticalSection(FSection);
end; { TCriticalSection.Destroy }
procedure TCriticalSection.Enter;
begin
EnterCriticalSection(FSection);
end; { TCriticalSection.Enter }
procedure TCriticalSection.Leave;
begin
LeaveCriticalSection(FSection);
end; { TCriticalSection.Leave }
function TCriticalSection.TryEnter: Boolean;
begin
Result := TryEnterCriticalSection(FSection);
end; { TCriticalSection.TryEnter }
procedure NoLogs(const Str: string; LogID: Integer = 0; Level: Integer = 0);
begin
end;
initialization
WriteLog := NoLogs;
end.
用法:
// 創(chuàng)建線程池
FThreadPool := TThreadsPool.Create(Self); // 創(chuàng)建線程池
FThreadPool.ThreadsMin := 5; // 初始工作線程數(shù)
FThreadPool.ThreadsMax := 50; // 最大允許工作線程數(shù)
FThreadPool.OnProcessRequest := DealwithCommRecvData; // 線程工作函數(shù)(DealwithCommRecvData在工作者線程的Execute方法中被調(diào)用)
// 使用線程池
var
AWorkItem: TRecvCommDataWorkItem; // 繼承自TWorkItem
begin
AWorkItem := TRecvCommDataWorkItem.Create;
Move(PData[0], AWorkItem.FRecvData[0], PDataLen);
AWorkItem.FRecvDataLen := PDataLen;
FThreadPool.AddRequest(AWorkItem); // 向線程池分配一個任務(wù)
end;