西西軟件下載最安全的下載網(wǎng)站、值得信賴的軟件下載站!

首頁編程開發(fā)Delphi → Delphi實現(xiàn)的線程池代碼單元

Delphi實現(xiàn)的線程池代碼單元

相關(guān)軟件相關(guān)文章發(fā)表評論 來源:西西整理時間:2012/12/6 11:26:43字體大。A-A+

作者:lucasli點擊:0次評論:0次標簽: 線程

  • 類型:服務(wù)器區(qū)大。21KB語言:中文 評分:6.6
  • 標簽:
立即下載
Delphi">Delphi什么樣的線程池更好呢?我覺得使用起來要可靠,并且一定要簡單,這樣才是更好的。我寫的線程池就是這樣一個標準,使用非常簡單,只傳入自己要執(zhí)行的方法就可以了,其實大家最后就是關(guān)注自己要操作的方法,其余的交給線程池。
unit uThreadPool;

{   aPool.AddRequest(TMyRequest.Create(RequestParam1, RequestParam2, ...)); }

interface
uses
  Windows,
  Classes;

// 是否記錄日志
// {$DEFINE NOLOGS}

type
  TCriticalSection = class(TObject)
  protected
    FSection: TRTLCriticalSection;
  public
    constructor Create;
    destructor Destroy; override;
    // 進入臨界區(qū)
    procedure Enter;
    // 離開臨界區(qū)
    procedure Leave;
    // 嘗試進入
    function TryEnter: Boolean;
  end;

type
  // 儲存請求數(shù)據(jù)的基本類
  TWorkItem = class(TObject)
  public
    // 是否有重復(fù)任務(wù)
    function IsTheSame(DataObj: TWorkItem): Boolean; virtual;
    // 如果 NOLOGS 被定義,則禁用。
    function TextForLog: string; virtual;
  end;

type
  TThreadsPool = class;

  //線程狀態(tài)
  TThreadState = (tcsInitializing, tcsWaiting, tcsGetting, tcsProcessing,
    tcsProcessed, tcsTerminating, tcsCheckingDown);
  // 工作線程僅用于線程池內(nèi), 不要直接創(chuàng)建并調(diào)用它。
  TProcessorThread = class(TThread)
  private
    // 創(chuàng)建線程時臨時的Event對象, 阻塞線程直到初始化完成
    hInitFinished: THandle;
    // 初始化出錯信息
    sInitError: string;
    // 記錄日志
    procedure WriteLog(const Str: string; Level: Integer = 0);
  protected
    // 線程臨界區(qū)同步對像
    csProcessingDataObject: TCriticalSection;
    // 平均處理時間
    FAverageProcessing: Integer;
    // 等待請求的平均時間
    FAverageWaitingTime: Integer;
    // 本線程實例的運行狀態(tài)
    FCurState: TThreadState;
    // 本線程實例所附屬的線程池
    FPool: TThreadsPool;
    // 當前處理的數(shù)據(jù)對像。
    FProcessingDataObject: TWorkItem;
    // 線程停止 Event, TProcessorThread.Terminate 中開綠燈
    hThreadTerminated: THandle;
    uProcessingStart: DWORD;
    // 開始等待的時間, 通過 GetTickCount 取得。
    uWaitingStart: DWORD;
    // 計算平均工作時間
    function AverageProcessingTime: DWORD;
    // 計算平均等待時間
    function AverageWaitingTime: DWORD;
    procedure Execute; override;
    function IamCurrentlyProcess(DataObj: TWorkItem): Boolean;
    // 轉(zhuǎn)換枚舉類型的線程狀態(tài)為字串類型
    function InfoText: string;
    // 線程是否長時間處理同一個請求?(已死掉?)
    function IsDead: Boolean;
    // 線程是否已完成當成任務(wù)
    function isFinished: Boolean;
    // 線程是否處于空閑狀態(tài)
    function isIdle: Boolean;
    // 平均值校正計算。
    function NewAverage(OldAvg, NewVal: Integer): Integer;
  public
    Tag: Integer;
    constructor Create(APool: TThreadsPool);
    destructor Destroy; override;
    procedure Terminate;
  end;

  // 線程初始化時觸發(fā)的事件
  TProcessorThreadInitializing = procedure(Sender: TThreadsPool; aThread:
    TProcessorThread) of object;
  // 線程結(jié)束時觸發(fā)的事件
  TProcessorThreadFinalizing = procedure(Sender: TThreadsPool; aThread:
    TProcessorThread) of object;
  // 線程處理請求時觸發(fā)的事件
  TProcessRequest = procedure(Sender: TThreadsPool; WorkItem: TWorkItem;
    aThread: TProcessorThread) of object;
  TEmptyKind = (
    ekQueueEmpty, //任務(wù)被取空后
    ekProcessingFinished // 最后一個任務(wù)處理完畢后
    );
  // 任務(wù)隊列空時觸發(fā)的事件
  TQueueEmpty = procedure(Sender: TThreadsPool; EmptyKind: TEmptyKind) of
    object;

  TThreadsPool = class(TComponent)
  private
    csQueueManagment: TCriticalSection;
    csThreadManagment: TCriticalSection;
    FProcessRequest: TProcessRequest;
    FQueue: TList;
    FQueueEmpty: TQueueEmpty;
    // 線程超時閥值
    FThreadDeadTimeout: DWORD;
    FThreadFinalizing: TProcessorThreadFinalizing;
    FThreadInitializing: TProcessorThreadInitializing;
    // 工作中的線程
    FThreads: TList;
    // 執(zhí)行了 terminat 發(fā)送退出指令, 正在結(jié)束的線程.
    FThreadsKilling: TList;
    // 最少, 最大線程數(shù)
    FThreadsMax: Integer;
    // 最少, 最大線程數(shù)
    FThreadsMin: Integer;
    // 池平均等待時間
    function PoolAverageWaitingTime: Integer;
    procedure WriteLog(const Str: string; Level: Integer = 0);
  protected
    FLastGetPoint: Integer;
    // Semaphore, 統(tǒng)計任務(wù)隊列
    hSemRequestCount: THandle;
    // Waitable timer. 每30觸發(fā)一次的時間量同步
    hTimCheckPoolDown: THandle;
    // 線程池停機(檢查并清除空閑線程和死線程)
    procedure CheckPoolDown;
    // 清除死線程,并補充不足的工作線程
    procedure CheckThreadsForGrow;
    procedure DoProcessed;
    procedure DoProcessRequest(aDataObj: TWorkItem; aThread: TProcessorThread);
      virtual;
    procedure DoQueueEmpty(EmptyKind: TEmptyKind); virtual;
    procedure DoThreadFinalizing(aThread: TProcessorThread); virtual;
    // 執(zhí)行事件
    procedure DoThreadInitializing(aThread: TProcessorThread); virtual;
    // 釋放 FThreadsKilling 列表中的線程
    procedure FreeFinishedThreads;
    // 申請任務(wù)
    procedure GetRequest(out Request: TWorkItem);
    // 清除死線程
    procedure KillDeadThreads;
  public
    constructor Create(AOwner: TComponent); override;
    destructor Destroy; override;
    // 就進行任務(wù)是否重復(fù)的檢查, 檢查發(fā)現(xiàn)重復(fù)就返回 False
    function AddRequest(aDataObject: TWorkItem; CheckForDoubles: Boolean =
      False): Boolean; overload;
    // 轉(zhuǎn)換枚舉類型的線程狀態(tài)為字串類型
    function InfoText: string;
  published
    // 線程處理任務(wù)時觸發(fā)的事件
    property OnProcessRequest: TProcessRequest read FProcessRequest write
      FProcessRequest;
    // 任務(wù)列表為空時解發(fā)的事件
    property OnQueueEmpty: TQueueEmpty read FQueueEmpty write FQueueEmpty;
    // 線程結(jié)束時觸發(fā)的事件
    property OnThreadFinalizing: TProcessorThreadFinalizing read
      FThreadFinalizing write FThreadFinalizing;
    // 線程初始化時觸發(fā)的事件
    property OnThreadInitializing: TProcessorThreadInitializing read
      FThreadInitializing write FThreadInitializing;
    // 線程超時值(毫秒), 如果處理超時,將視為死線程
    property ThreadDeadTimeout: DWORD read FThreadDeadTimeout write
      FThreadDeadTimeout default 0;
    // 最大線程數(shù)
    property ThreadsMax: Integer read FThreadsMax write FThreadsMax default 1;
    // 最小線程數(shù)
    property ThreadsMin: Integer read FThreadsMin write FThreadsMin default 0;
  end;

type
  //日志記志函數(shù)
  TLogWriteProc = procedure(
    const Str: string; //日志
    LogID: Integer = 0;
    Level: Integer = 0 //Level = 0 - 跟蹤信息, 10 - 致命錯誤
    );

var
  WriteLog: TLogWriteProc; // 如果存在實例就寫日志

implementation
uses
  SysUtils;

// 儲存請求數(shù)據(jù)的基本類
{
********************************** TWorkItem ***********************************
}

function TWorkItem.IsTheSame(DataObj: TWorkItem): Boolean;
begin
  Result := False;
end; { TWorkItem.IsTheSame }

function TWorkItem.TextForLog: string;
begin
  Result := 'Request';
end; { TWorkItem.TextForLog }

{
********************************* TThreadsPool *********************************
}

constructor TThreadsPool.Create(AOwner: TComponent);
var
  DueTo: Int64;
begin
{$IFNDEF NOLOGS}
  WriteLog('創(chuàng)建線程池', 5);
{$ENDIF}
  inherited;
  csQueueManagment := TCriticalSection.Create;
  FQueue := TList.Create;
  csThreadManagment := TCriticalSection.Create;
  FThreads := TList.Create;
  FThreadsKilling := TList.Create;
  FThreadsMin := 0;
  FThreadsMax := 1;
  FThreadDeadTimeout := 0;
  FLastGetPoint := 0;
  //
  hSemRequestCount := CreateSemaphore(nil, 0, $7FFFFFFF, nil);

  DueTo := -1;
  //可等待的定時器(只用于Window NT4或更高)
  hTimCheckPoolDown := CreateWaitableTimer(nil, False, nil);

  if hTimCheckPoolDown = 0 then // Win9x不支持
    // In Win9x number of thread will be never decrised
    hTimCheckPoolDown := CreateEvent(nil, False, False, nil)
  else
    SetWaitableTimer(hTimCheckPoolDown, DueTo, 30000, nil, nil, False);
end; { TThreadsPool.Create }

destructor TThreadsPool.Destroy;
var
  n, i: Integer;
  Handles: array of THandle;
begin
{$IFNDEF NOLOGS}
  WriteLog('線程池銷毀', 5);
{$ENDIF}
  csThreadManagment.Enter;

  SetLength(Handles, FThreads.Count);
  n := 0;
  for i := 0 to FThreads.Count - 1 do
    if FThreads[i] <> nil then
    begin
      Handles[n] := TProcessorThread(FThreads[i]).Handle;
      TProcessorThread(FThreads[i]).Terminate;
      Inc(n);
    end;

  csThreadManagment.Leave;  // lixiaoyu 添加于 2009.1.6,如沒有此行代碼無法成功釋放正在執(zhí)行中的工作者線程,死鎖。

  WaitForMultipleObjects(n, @Handles[0], True, 30000);  // 等待工作者線程執(zhí)行終止  lixiaoyu 注釋于 2009.1.6

  csThreadManagment.Enter;  // lixiaoyu 添加于 2009.1.6 再次進入鎖定,并釋放資源
  for i := 0 to FThreads.Count - 1 do
    TProcessorThread(FThreads[i]).Free;
  FThreads.Free;
  FThreadsKilling.Free;
  csThreadManagment.Free;

  csQueueManagment.Enter;
  for i := FQueue.Count - 1 downto 0 do
    TObject(FQueue[i]).Free;
  FQueue.Free;
  csQueueManagment.Free;

  CloseHandle(hSemRequestCount);
  CloseHandle(hTimCheckPoolDown);
  inherited;
end; { TThreadsPool.Destroy }

function TThreadsPool.AddRequest(aDataObject: TWorkItem; CheckForDoubles:
  Boolean = False): Boolean;
var
  i: Integer;
begin
{$IFNDEF NOLOGS}
  WriteLog('AddRequest(' + aDataObject.TextForLog + ')', 2);
{$ENDIF}
  Result := False;
  csQueueManagment.Enter;
  try
    // 如果 CheckForDoubles = TRUE
    // 則進行任務(wù)是否重復(fù)的檢查
    if CheckForDoubles then
      for i := 0 to FQueue.Count - 1 do
        if (FQueue[i] <> nil)
          and aDataObject.IsTheSame(TWorkItem(FQueue[i])) then
          Exit; // 發(fā)現(xiàn)有相同的任務(wù)

    csThreadManagment.Enter;
    try
      // 清除死線程,并補充不足的工作線程
      CheckThreadsForGrow;

      // 如果 CheckForDoubles = TRUE
      // 則檢查是否有相同的任務(wù)正在處理中
      if CheckForDoubles then
        for i := 0 to FThreads.Count - 1 do
          if TProcessorThread(FThreads[i]).IamCurrentlyProcess(aDataObject) then
            Exit; // 發(fā)現(xiàn)有相同的任務(wù)

    finally
      csThreadManagment.Leave;
    end;

    //將任務(wù)加入隊列
    FQueue.Add(aDataObject);

    //釋放一個同步信號量
    ReleaseSemaphore(hSemRequestCount, 1, nil);
{$IFNDEF NOLOGS}
    WriteLog('釋放一個同步信號量)', 1);
{$ENDIF}
    Result := True;
  finally
    csQueueManagment.Leave;
  end;
{$IFNDEF NOLOGS}
  //調(diào)試信息
  WriteLog('增加一個任務(wù)(' + aDataObject.TextForLog + ')', 1);
{$ENDIF}
end; { TThreadsPool.AddRequest }

{
 數(shù) 名:TThreadsPool.CheckPoolDown
功能描述:線程池停機(檢查并清除空閑線程和死線程)
輸入?yún)?shù):無
  值: 
創(chuàng)建日期:2006.10.22 11:31
修改日期:2006.
    者:Kook
附加說明:
}

procedure TThreadsPool.CheckPoolDown;
var
  i: Integer;
begin
{$IFNDEF NOLOGS}
  WriteLog('TThreadsPool.CheckPoolDown', 1);
{$ENDIF}
  csThreadManagment.Enter;
  try
{$IFNDEF NOLOGS}
    WriteLog(InfoText, 2);
{$ENDIF}
    // 清除死線程
    KillDeadThreads;
    // 釋放 FThreadsKilling 列表中的線程
    FreeFinishedThreads;

    // 如果線程空閑,就終止它
    for i := FThreads.Count - 1 downto FThreadsMin do
      if TProcessorThread(FThreads[i]).isIdle then
      begin
        //發(fā)出終止命令
        TProcessorThread(FThreads[i]).Terminate;
        //加入待清除隊列
        FThreadsKilling.Add(FThreads[i]);
        //從工作隊列中除名
        FThreads.Delete(i);
        //todo: ??
        Break;
      end;
  finally
    csThreadManagment.Leave;
  end;
end; { TThreadsPool.CheckPoolDown }

{
 數(shù) 名:TThreadsPool.CheckThreadsForGrow
功能描述:清除死線程,并補充不足的工作線程
輸入?yún)?shù):無
  值: 
創(chuàng)建日期:2006.10.22 11:31
修改日期:2006.
    者:Kook
附加說明:
}

procedure TThreadsPool.CheckThreadsForGrow;
var
  AvgWait: Integer;
  i: Integer;
begin
  {
    New thread created if:
    新建線程的條件:
      1. 工作線程數(shù)小于最小線程數(shù)
      2. 工作線程數(shù)小于最大線程數(shù) and 線程池平均等待時間 < 100ms(系統(tǒng)忙)
      3. 任務(wù)大于工作線程數(shù)的4
  }

  csThreadManagment.Enter;
  try
    KillDeadThreads;
    if FThreads.Count < FThreadsMin then
    begin
{$IFNDEF NOLOGS}
      WriteLog('工作線程數(shù)小于最小線程數(shù)', 4);
{$ENDIF}
      for i := FThreads.Count to FThreadsMin - 1 do
      try
        FThreads.Add(TProcessorThread.Create(Self));
      except
        on e: Exception do

          WriteLog(
            'TProcessorThread.Create raise: ' + e.ClassName + #13#10#9'Message: '
            + e.Message,
            9
            );
      end
    end
    else if FThreads.Count < FThreadsMax then
    begin
{$IFNDEF NOLOGS}
      WriteLog('工作線程數(shù)小于最大線程數(shù) and 線程池平均等待時間 < 100ms', 3);
{$ENDIF}
      AvgWait := PoolAverageWaitingTime;
{$IFNDEF NOLOGS}
      WriteLog(Format(
        'FThreads.Count (%d)<FThreadsMax(%d), AvgWait=%d',
        [FThreads.Count, FThreadsMax, AvgWait]),
        4
        );
{$ENDIF}

      if AvgWait < 100 then
      try
        FThreads.Add(TProcessorThread.Create(Self));
      except
        on e: Exception do
          WriteLog(
            'TProcessorThread.Create raise: ' + e.ClassName +
            #13#10#9'Message: ' + e.Message,
            9
            );
      end;
    end;
  finally
    csThreadManagment.Leave;
  end;
end; { TThreadsPool.CheckThreadsForGrow }

procedure TThreadsPool.DoProcessed;
var
  i: Integer;
begin
  if (FLastGetPoint < FQueue.Count) then
    Exit;
  csThreadManagment.Enter;
  try
    for i := 0 to FThreads.Count - 1 do
      if TProcessorThread(FThreads[i]).FCurState in [tcsProcessing] then
        Exit;
  finally
    csThreadManagment.Leave;
  end;
  DoQueueEmpty(ekProcessingFinished);
end; { TThreadsPool.DoProcessed }

procedure TThreadsPool.DoProcessRequest(aDataObj: TWorkItem; aThread:
  TProcessorThread);
begin
  if Assigned(FProcessRequest) then
    FProcessRequest(Self, aDataObj, aThread);
end; { TThreadsPool.DoProcessRequest }

procedure TThreadsPool.DoQueueEmpty(EmptyKind: TEmptyKind);
begin
  if Assigned(FQueueEmpty) then
    FQueueEmpty(Self, EmptyKind);
end; { TThreadsPool.DoQueueEmpty }

procedure TThreadsPool.DoThreadFinalizing(aThread: TProcessorThread);
begin
  if Assigned(FThreadFinalizing) then
    FThreadFinalizing(Self, aThread);
end; { TThreadsPool.DoThreadFinalizing }

procedure TThreadsPool.DoThreadInitializing(aThread: TProcessorThread);
begin
  if Assigned(FThreadInitializing) then
    FThreadInitializing(Self, aThread);
end; { TThreadsPool.DoThreadInitializing }

{
 數(shù) 名:TThreadsPool.FreeFinishedThreads
功能描述:釋放 FThreadsKilling 列表中的線程
輸入?yún)?shù):無
  值: 
創(chuàng)建日期:2006.10.22 11:34
修改日期:2006.
    者:Kook
附加說明:
}

procedure TThreadsPool.FreeFinishedThreads;
var
  i: Integer;
begin
  if csThreadManagment.TryEnter then
  try
    for i := FThreadsKilling.Count - 1 downto 0 do
      if TProcessorThread(FThreadsKilling[i]).isFinished then
      begin
        TProcessorThread(FThreadsKilling[i]).Free;
        FThreadsKilling.Delete(i);
      end;
  finally
    csThreadManagment.Leave
  end;
end; { TThreadsPool.FreeFinishedThreads }

{
 數(shù) 名:TThreadsPool.GetRequest
功能描述:申請任務(wù)
輸入?yún)?shù):out Request: TRequestDataObject
  值: 
創(chuàng)建日期:2006.10.22 11:34
修改日期:2006.
    者:Kook
附加說明:
}

procedure TThreadsPool.GetRequest(out Request: TWorkItem);
begin
{$IFNDEF NOLOGS}
  WriteLog('申請任務(wù)', 2);
{$ENDIF}
  csQueueManagment.Enter;
  try
    //跳過空的隊列元素
    while (FLastGetPoint < FQueue.Count) and (FQueue[FLastGetPoint] = nil) do
      Inc(FLastGetPoint);

    Assert(FLastGetPoint < FQueue.Count);
    //壓縮隊列,清除空元素
    if (FQueue.Count > 127) and (FLastGetPoint >= (3 * FQueue.Count) div 4) then
    begin
{$IFNDEF NOLOGS}
      WriteLog('FQueue.Pack', 1);
{$ENDIF}
      FQueue.Pack;
      FLastGetPoint := 0;
    end;

    Request := TWorkItem(FQueue[FLastGetPoint]);
    FQueue[FLastGetPoint] := nil;
    inc(FLastGetPoint);
    if (FLastGetPoint = FQueue.Count) then //如果隊列中無任務(wù)
    begin

      DoQueueEmpty(ekQueueEmpty);
      FQueue.Clear;
      FLastGetPoint := 0;
    end;
  finally
    csQueueManagment.Leave;
  end;
end; { TThreadsPool.GetRequest }

function TThreadsPool.InfoText: string;
begin
  Result := '';
  //end;
  //{$ELSE}
  //var
  //  i: Integer;
  //begin
  //  csQueueManagment.Enter;
  //  csThreadManagment.Enter;
  //  try
  //    if (FThreads.Count = 0) and (FThreadsKilling.Count = 1) and
  //      TProcessorThread(FThreadsKilling[0]).isFinished then
  //      FreeFinishedThreads;
  //
  //    Result := Format(
  //      'Pool thread: Min=%d, Max=%d, WorkingThreadsCount=%d, TerminatedThreadCount=%d, QueueLength=%d'#13#10,
  //      [ThreadsMin, ThreadsMax, FThreads.Count, FThreadsKilling.Count,
  //      FQueue.Count]
  //        );
  //    if FThreads.Count > 0 then
  //      Result := Result + 'Working threads:'#13#10;
  //    for i := 0 to FThreads.Count - 1 do
  //      Result := Result + TProcessorThread(FThreads[i]).InfoText + #13#10;
  //    if FThreadsKilling.Count > 0 then
  //      Result := Result + 'Terminated threads:'#13#10;
  //    for i := 0 to FThreadsKilling.Count - 1 do
  //      Result := Result + TProcessorThread(FThreadsKilling[i]).InfoText + #13#10;
  //  finally
  //    csThreadManagment.Leave;
  //    csQueueManagment.Leave;
  //  end;
  //end;
  //{$ENDIF}
end; { TThreadsPool.InfoText }

{
 數(shù) 名:TThreadsPool.KillDeadThreads
功能描述:清除死線程
輸入?yún)?shù):無
  值: 
創(chuàng)建日期:2006.10.22 11:32
修改日期:2006.
    者:Kook
附加說明:
}

procedure TThreadsPool.KillDeadThreads;
var
  i: Integer;
begin
  // Check for dead threads
  if csThreadManagment.TryEnter then
  try
    for i := 0 to FThreads.Count - 1 do
      if TProcessorThread(FThreads[i]).IsDead then
      begin
        // Dead thread moverd to other list.
        // New thread created to replace dead one
        TProcessorThread(FThreads[i]).Terminate;
        FThreadsKilling.Add(FThreads[i]);
        try
          FThreads[i] := TProcessorThread.Create(Self);
        except
          on e: Exception do
          begin
            FThreads[i] := nil;
{$IFNDEF NOLOGS}
            WriteLog(
              'TProcessorThread.Create raise: ' + e.ClassName +
              #13#10#9'Message: ' + e.Message,
              9
              );
{$ENDIF}
          end;
        end;
      end;
  finally
    csThreadManagment.Leave
  end;
end; { TThreadsPool.KillDeadThreads }

function TThreadsPool.PoolAverageWaitingTime: Integer;
var
  i: Integer;
begin
  Result := 0;
  if FThreads.Count > 0 then
  begin
    for i := 0 to FThreads.Count - 1 do
      Inc(result, TProcessorThread(FThreads[i]).AverageWaitingTime);
    Result := Result div FThreads.Count
  end
  else
    Result := 1;
end; { TThreadsPool.PoolAverageWaitingTime }

procedure TThreadsPool.WriteLog(const Str: string; Level: Integer = 0);
begin
{$IFNDEF NOLOGS}
  uThreadPool.WriteLog(Str, 0, Level);
{$ENDIF}
end; { TThreadsPool.WriteLog }

// 工作線程僅用于線程池內(nèi), 不要直接創(chuàng)建并調(diào)用它。
{
******************************* TProcessorThread *******************************
}

constructor TProcessorThread.Create(APool: TThreadsPool);
begin
  WriteLog('創(chuàng)建工作線程', 5);
  inherited Create(True);
  FPool := aPool;

  FAverageWaitingTime := 1000;
  FAverageProcessing := 3000;

  sInitError := '';
  {
  各參數(shù)的意義如下:
   
   參數(shù)一:填上 nil 即可。
   參數(shù)二:是否采用手動調(diào)整燈號。
   參數(shù)三:燈號的起始狀態(tài),False 表示紅燈。
   參數(shù)四:Event 名稱, 對象名稱相同的話,會指向同一個對象,所以想要有兩個Event對象,便要有兩個不同的名稱(這名稱以字符串來存.為NIL的話系統(tǒng)每次會自己創(chuàng)建一個不同的名字,就是被次創(chuàng)建的都是新的EVENT)。
   傳回值:Event handle。
  }
  hInitFinished := CreateEvent(nil, True, False, nil);
  hThreadTerminated := CreateEvent(nil, True, False, nil);
  csProcessingDataObject := TCriticalSection.Create;
  try
    WriteLog('TProcessorThread.Create::Resume', 3);
    Resume;
    //阻塞, 等待初始化完成
    WaitForSingleObject(hInitFinished, INFINITE);
    if sInitError <> '' then
      raise Exception.Create(sInitError);
  finally
    CloseHandle(hInitFinished);
  end;
  WriteLog('TProcessorThread.Create::Finished', 3);
end; { TProcessorThread.Create }

destructor TProcessorThread.Destroy;
begin
  WriteLog('工作線程銷毀', 5);
  CloseHandle(hThreadTerminated);
  csProcessingDataObject.Free;
  inherited;
end; { TProcessorThread.Destroy }

function TProcessorThread.AverageProcessingTime: DWORD;
begin
  if (FCurState in [tcsProcessing]) then
    Result := NewAverage(FAverageProcessing, GetTickCount - uProcessingStart)
  else
    Result := FAverageProcessing
end; { TProcessorThread.AverageProcessingTime }

function TProcessorThread.AverageWaitingTime: DWORD;
begin
  if (FCurState in [tcsWaiting, tcsCheckingDown]) then
    Result := NewAverage(FAverageWaitingTime, GetTickCount - uWaitingStart)
  else
    Result := FAverageWaitingTime
end; { TProcessorThread.AverageWaitingTime }

procedure TProcessorThread.Execute;

type
  THandleID = (hidTerminateThread, hidRequest, hidCheckPoolDown);
var
  WaitedTime: Integer;
  Handles: array[THandleID] of THandle;

begin
  WriteLog('工作線程進常運行', 3);
  //當前狀態(tài):初始化
  FCurState := tcsInitializing;
  try
    //執(zhí)行外部事件
    FPool.DoThreadInitializing(Self);
  except
    on e: Exception do
      sInitError := e.Message;
  end;

  //初始化完成,初始化Event綠燈
  SetEvent(hInitFinished);

  WriteLog('TProcessorThread.Execute::Initialized', 3);

  //引用線程池的同步 Event
  Handles[hidTerminateThread] := hThreadTerminated;
  Handles[hidRequest] := FPool.hSemRequestCount;
  Handles[hidCheckPoolDown] := FPool.hTimCheckPoolDown;

  //時間戳,
  //todo: 好像在線程中用 GetTickCount; 會不正常
  uWaitingStart := GetTickCount;
  //任務(wù)置空
  FProcessingDataObject := nil;

  //大巡環(huán)
  while not terminated do
  begin
    //當前狀態(tài):等待
    FCurState := tcsWaiting;
    //阻塞線程,使線程休眠
    case WaitForMultipleObjects(Length(Handles), @Handles, False, INFINITE) -
      WAIT_OBJECT_0 of

      WAIT_OBJECT_0 + ord(hidTerminateThread):
        begin
          WriteLog('TProcessorThread.Execute:: Terminate event signaled ', 5);
          //當前狀態(tài):正在終止線程
          FCurState := tcsTerminating;
          //退出大巡環(huán)(結(jié)束線程)
          Break;
        end;

      WAIT_OBJECT_0 + ord(hidRequest):
        begin
          WriteLog('TProcessorThread.Execute:: Request semaphore signaled ', 3);
          //等待的時間
          WaitedTime := GetTickCount - uWaitingStart;
          //重新計算平均等待時間
          FAverageWaitingTime := NewAverage(FAverageWaitingTime, WaitedTime);
          //當前狀態(tài):申請任務(wù)
          FCurState := tcsGetting;
          //如果等待時間過短,則檢查工作線程是否足夠
          if WaitedTime < 5 then
            FPool.CheckThreadsForGrow;
          //從線程池的任務(wù)隊列中得到任務(wù)
          FPool.GetRequest(FProcessingDataObject);
          //開始處理的時間戳
          uProcessingStart := GetTickCount;
          //當前狀態(tài):執(zhí)行任務(wù)
          FCurState := tcsProcessing;
          try
{$IFNDEF NOLOGS}
            WriteLog('Processing: ' + FProcessingDataObject.TextForLog, 2);
{$ENDIF}
            //執(zhí)行任務(wù)
            FPool.DoProcessRequest(FProcessingDataObject, Self);
          except
            on e: Exception do
              WriteLog(
                'OnProcessRequest for ' + FProcessingDataObject.TextForLog +
                #13#10'raise Exception: ' + e.Message,
                8
                );
          end;

          //釋放任務(wù)對象
          csProcessingDataObject.Enter;
          try
            FProcessingDataObject.Free;
            FProcessingDataObject := nil;
          finally
            csProcessingDataObject.Leave;
          end;
          //重新計算
          FAverageProcessing := NewAverage(FAverageProcessing, GetTickCount -
            uProcessingStart);
          //當前狀態(tài):執(zhí)行任務(wù)完畢
          FCurState := tcsProcessed;
          //執(zhí)行線程外事件
          FPool.DoProcessed;

          uWaitingStart := GetTickCount;
        end;
      WAIT_OBJECT_0 + ord(hidCheckPoolDown):
        begin
          // !!! Never called under Win9x
          WriteLog('TProcessorThread.Execute:: CheckPoolDown timer signaled ',
            4);
          //當前狀態(tài):線程池停機(檢查并清除空閑線程和死線程)
          FCurState := tcsCheckingDown;
          FPool.CheckPoolDown;
        end;
    end;
  end;
  FCurState := tcsTerminating;

  FPool.DoThreadFinalizing(Self);
end; { TProcessorThread.Execute }

function TProcessorThread.IamCurrentlyProcess(DataObj: TWorkItem): Boolean;
begin
  csProcessingDataObject.Enter;
  try
    Result := (FProcessingDataObject <> nil) and
      DataObj.IsTheSame(FProcessingDataObject);
  finally
    csProcessingDataObject.Leave;
  end;
end; { TProcessorThread.IamCurrentlyProcess }

function TProcessorThread.InfoText: string;

const
  ThreadStateNames: array[TThreadState] of string =
  (
    'tcsInitializing',
    'tcsWaiting',
    'tcsGetting',
    'tcsProcessing',
    'tcsProcessed',
    'tcsTerminating',
    'tcsCheckingDown'
    );

begin
{$IFNDEF NOLOGS}
  Result := Format(
    '%5d: %15s, AverageWaitingTime=%6d, AverageProcessingTime=%6d',
    [ThreadID, ThreadStateNames[FCurState], AverageWaitingTime,
    AverageProcessingTime]
      );
  case FCurState of
    tcsWaiting:
      Result := Result + ', WaitingTime=' + IntToStr(GetTickCount -
        uWaitingStart);
    tcsProcessing:
      Result := Result + ', ProcessingTime=' + IntToStr(GetTickCount -
        uProcessingStart);
  end;

  csProcessingDataObject.Enter;
  try
    if FProcessingDataObject <> nil then
      Result := Result + ' ' + FProcessingDataObject.TextForLog;
  finally
    csProcessingDataObject.Leave;
  end;
{$ENDIF}
end; { TProcessorThread.InfoText }

function TProcessorThread.IsDead: Boolean;
begin
  Result :=
    Terminated or
    (FPool.ThreadDeadTimeout > 0) and (FCurState = tcsProcessing) and
    (GetTickCount - uProcessingStart > FPool.ThreadDeadTimeout);
  if Result then
    WriteLog('Thread dead', 5);
end; { TProcessorThread.IsDead }

function TProcessorThread.isFinished: Boolean;
begin
  Result := WaitForSingleObject(Handle, 0) = WAIT_OBJECT_0;
end; { TProcessorThread.isFinished }

function TProcessorThread.isIdle: Boolean;
begin
  // 如果線程狀態(tài)是 tcsWaiting, tcsCheckingDown
  // 并且 空間時間 > 100ms,
  // 并且 平均等候任務(wù)時間大于平均工作時間的 50%
  // 則視為空閑。
  Result :=
    (FCurState in [tcsWaiting, tcsCheckingDown]) and
    (AverageWaitingTime > 100) and
    (AverageWaitingTime * 2 > AverageProcessingTime);
end; { TProcessorThread.isIdle }

function TProcessorThread.NewAverage(OldAvg, NewVal: Integer): Integer;
begin
  Result := (OldAvg * 2 + NewVal) div 3;
end; { TProcessorThread.NewAverage }

procedure TProcessorThread.Terminate;
begin
  WriteLog('TProcessorThread.Terminate', 5);
  inherited Terminate;
  SetEvent(hThreadTerminated);
end; { TProcessorThread.Terminate }

procedure TProcessorThread.WriteLog(const Str: string; Level: Integer = 0);
begin
{$IFNDEF NOLOGS}
  uThreadPool.WriteLog(Str, ThreadID, Level);
{$ENDIF}
end; { TProcessorThread.WriteLog }

{
******************************* TCriticalSection *******************************
}

constructor TCriticalSection.Create;
begin
  InitializeCriticalSection(FSection);
end; { TCriticalSection.Create }

destructor TCriticalSection.Destroy;
begin
  DeleteCriticalSection(FSection);
end; { TCriticalSection.Destroy }

procedure TCriticalSection.Enter;
begin
  EnterCriticalSection(FSection);
end; { TCriticalSection.Enter }

procedure TCriticalSection.Leave;
begin
  LeaveCriticalSection(FSection);
end; { TCriticalSection.Leave }

function TCriticalSection.TryEnter: Boolean;
begin
  Result := TryEnterCriticalSection(FSection);
end; { TCriticalSection.TryEnter }

procedure NoLogs(const Str: string; LogID: Integer = 0; Level: Integer = 0);
begin
end;

initialization
  WriteLog := NoLogs;
end.

用法:
// 創(chuàng)建線程池
FThreadPool := TThreadsPool.Create(Self); // 創(chuàng)建線程池
FThreadPool.ThreadsMin := 5; // 初始工作線程數(shù)
FThreadPool.ThreadsMax := 50; // 最大允許工作線程數(shù)
FThreadPool.OnProcessRequest := DealwithCommRecvData; // 線程工作函數(shù)(DealwithCommRecvData在工作者線程的Execute方法中被調(diào)用)

// 使用線程池
var
 AWorkItem: TRecvCommDataWorkItem; // 繼承自TWorkItem
begin
 AWorkItem := TRecvCommDataWorkItem.Create;
 Move(PData[0], AWorkItem.FRecvData[0], PDataLen);
 AWorkItem.FRecvDataLen := PDataLen;
 FThreadPool.AddRequest(AWorkItem); // 向線程池分配一個任務(wù)
end; 

    相關(guān)評論

    閱讀本文后您有什么感想? 已有人給出評價!

    • 8 喜歡喜歡
    • 3 頂
    • 1 難過難過
    • 5 囧
    • 3 圍觀圍觀
    • 2 無聊無聊

    熱門評論

    最新評論

    發(fā)表評論 查看所有評論(0)

    昵稱:
    表情: 高興 可 汗 我不要 害羞 好 下下下 送花 屎 親親
    字數(shù): 0/500 (您的評論需要經(jīng)過審核才能顯示)
    推薦文章

    沒有數(shù)據(jù)