top
Loading...
C#消息隊列應用程序-2
在這個數組內部,CWorker 類創建了 CWorkerThread類的一個實現版
本。CWorkerThread 類(將在下面討論)是一個必須繼承的抽象類。導出
類定義了消息的處理方式:
aThreads = new ArrayList();
for (int idx=0; idx〈sfWorker.NumberThreads; idx++)
{
WorkerThreadFormatter wfThread = new WorkerThreadFormatter();
wfThread.ProcessName = sfWorker.ProcessName;
wfThread.ProcessDesc = sfWorker.ProcessDesc;
wfThread.ThreadNumber = idx;
wfThread.InputQueue = sfWorker.InputQueue;
wfThread.ErrorQueue = sfWorker.ErrorQueue;
wfThread.OutputName = sfWorker.OutputName;
// 定義輔助類型,并將其插入輔助線程結構
CWorkerThread wtBase;
switch (sfWorker.ProcessType)
{
case WorkerFormatter.SFProcessType.ProcessRoundRobin:
wtBase = new CWorkerThreadRoundRobin(this, wfThread);
break;
case WorkerFormatter.SFProcessType.ProcessAppSpecific:
wtBase = new CWorkerThreadAppSpecific(this, wfThread);
break;
case WorkerFormatter.SFProcessType.ProcessAssembly:
wtBase = new CWorkerThreadAssembly(this, wfThread);
break;
default:
throw new Exception("Unknown Processing Type");
}
// 添加對數組的調用
aThreads.Insert(idx, wtBase);
}

一旦所有的對象都已創建,就可以通過調用每個線程對象的 Start方
法來啟動它們:
foreach(CWorkerThread cThread in aThreads)
cThread.Start();

Stop、Pause 和 Continue 方法在 foreach循環里執行的操作類似。
Stop方法具有如下的垃圾收集操作:
GC.SuppressFinalize(this);

在類析構函數中將調用 Stop 方法,這樣,在沒有顯式調用 Stop 方
法的情況下也可以正確地終止對象。如果調用了 Stop 方法,將不需要析
構函數。SuppressFinalize方法能夠防止調用對象的 Finalize 方法(析
構函數的實際實現)。

CWorkerThread 抽象類

CWorkerThread 是一個由 CWorkerThreadAppSpecifc、CWorkerThread
RoundRobin 和 CWorkerThreadAssembly繼承的抽象類。無論如何處理消
息,隊列的大部分處理是相同的,所以 CWorkerThread類提供了這一功能。
這個類提供了抽象方法(必須被實際方法替代)以管理資源和處理消息。

類的工作再一次通過 Start、Stop、Pause 和 Continue 方法來實現。
在 Start方法中引用了輸入和錯誤隊列。在 .NET 框架中,消息由 System.
Messaging 名稱空間處理:
// 嘗試打開隊列,并設置默認的讀寫屬性
MessageQueue mqInput = new MessageQueue(sInputQueue);
mqInput.MessageReadPropertyFilter.Body = true;
mqInput.MessageReadPropertyFilter.AppSpecific = true;
MessageQueue mqError = new MessageQueue(sErrorQueue);
// 如果使用 MSMQ COM,則將格式化程序設置為 ActiveX
mqInput.Formatter = new ActiveXMessageFormatter();
mqError.Formatter = new ActiveXMessageFormatter();

一旦定義了消息隊列引用,即會創建一個線程用于實際的處理函數
(稱為 ProcessMessages)。在 .NET 框架中,使用 System.Threading
名稱空間很容易實現線程處理:
procMessage = new Thread(new ThreadStart(ProcessMessages));
procMessage.Start();

ProcessMessages 函數是基于 Boolean值的處理循環。當數值設為
False,處理循環將終止。因此,線程對象的 Stop 方法只設置這一Boolean
值,然后關閉打開的消息隊列,并加入帶有主線程的線程:
// 加入服務線程和處理線程
bRun = false;
procMessage.Join();
// 關閉打開的消息隊列
mqInput.Close();
mqError.Close();

Pause 方法只設置一個 Boolean 值,使處理線程休眠半秒鐘:

if (bPause)
Thread.Sleep(500);

最后,每一個 Start、Stop、Pause 和 Continue 方法將調用抽象的
OnStart 、OnStop、OnPause 和 OnContinue 方法。這些抽象方法為實現
的類提供了掛鉤,以捕獲和釋放所需的資源。

ProcessMessages 循環具有如下基本結構:
●接收Message。
●如果Message具有成功的Receive,則調用抽象ProcessMessage方法。
●如果Receive或ProcessMessage失敗,將Message發送至錯誤隊列中。

Message mInput;
try
{
// 從隊列中讀取,并等候 1 秒
mInput = mqInput.Receive(new TimeSpan(0,0,0,1));
}
catch (MessageQueueException mqe)
{
// 將消息設置為 null
mInput = null;
// 查看錯誤代碼,了解是否超時
if (mqe.ErrorCode != (-1072824293) ) //0xC00E001B
{
// 如果未超時,發出一個錯誤并記錄錯誤號
LogError("Error: " + mqe.Message);
throw mqe;
}
}
if (mInput != null)
{
// 得到一個要處理的消息,調用處理消息抽象方法
try
{
ProcessMessage(mInput);
}
// 捕獲已知異常狀態的錯誤
catch (CWorkerThreadException ex)
{
ProcessError(mInput, ex.Terminate);
}
// 捕獲未知異常,并調用 Terminate
catch
{
ProcessError(mInput, true);
}
}

ProcessError方法將錯誤的消息發送至錯誤隊列。另外,它也可能引
發異常來終止線程。如果ProcessMessage方法引發了終止錯誤或 CWorker
ThreadException類型,它將執行此操作。

CworkerThread 導出類

任何從 CWorkerThread中繼承的類都必須提供 OnStart、OnStop、On
Pause、OnContinue和 ProcessMessage 方法。OnStart 和 OnStop方法獲
取并釋放處理資源。OnPause 和 OnContinue 方法允許臨時釋放和重新獲
取這些資源。ProcessMessage方法應該處理消息,并在出現失敗事件時引
發 CWorkerThreadException 異常。

由于 CWorkerThread構造函數定義運行時參數,導出類必須調用基類
構造函數:
public CWorkerThreadDerived(CWorker v_cParent, WorkerThread
Formatter v_wfThread)
: base (v_cParent, v_wfThread) {}

導出類提供了兩種類型的處理:將消息發送至另一隊列,或者調用組
件方法。接收和發送消息的兩種實現使用了循環技術或應用程序偏移(保
留在消息 AppSpecific屬性中),作為使用哪一隊列的決定因素。此方案
中的配置文件應該包括隊列路徑的列表。實現的 OnStart和 OnStop 方法
應該打開和關閉對這些隊列的引用:
iQueues = wfThread.OutputName.Length;
mqOutput = new MessageQueue[iQueues];
for (int idx=0; idx〈iQueues; idx++)
{
mqOutput[idx] = new MessageQueue(wfThread.OutputName[idx]);
mqOutput[idx].Formatter = new ActiveXMessageFormatter();
}

在這些方案中,消息的處理很簡單:將消息發送必要的輸出隊列。在
循環情況下,這個進程為:
try
{
mqOutput[iNextQueue].Send(v_mInput);
}
catch (Exception ex)
{
// 如果錯誤強制終止異常
throw new CWorkerThreadException(ex.Message, true);
}
// 計算下一個隊列號
iNextQueue++;
iNextQueue %= iQueues;

后一種調用帶消息參數的組件的實現方法比較有趣。ProcessMessage
方法使用 IWebMessage接口調入一個 .NET 組件。OnStart 和 OnStop 方
法獲取和釋放此組件的引用。

此方案中的配置文件應該包含兩個項目:完整的類名和類所在文件的
位置。按照 IWebMessage接口中的定義,在組件上調用 Process方法。

要獲取對象引用,需要使用 Activator.CreateInstance 方法。此函
數需要一個程序集類型。在這里,它是從程序集文件路徑和類名中導出的。
一旦獲取對象引用,它將被放入合適的接口:
private IWebMessage iwmSample;
private string sFilePath, sTypeName;
// 保存程序集路徑和類型名稱
sFilePath = wfThread.OutputName[0];
sTypeName = wfThread.OutputName[1];
// 獲取對必要對象的引用
Assembly asmSample = Assembly.LoadFrom(sFilePath);
Type typSample = asmSample.GetType(sTypeName);
object objSample = Activator.CreateInstance(typSample);
// 定義給對象的必要接口
iwmSample = (IWebMessage)objSample;

獲取對象引用后,ProcessMessage方法將在 IWebMessage接口上調用
Process 方法:
WebMessageReturn wbrSample;
try
{
// 定義方法調用的參數
string sLabel = v_mInput.Label;
string sBody = (string)v_mInput.Body;
int iAppSpecific = v_mInput.AppSpecific;
// 調用方法并捕捉返回代碼
wbrSample = iwmSample.Process(sLabel, sBody, iAppSpecific);
}
catch (InvalidCastException ex)
{
// 如果在消息內容中發生錯誤,則強制發出一個非終止異常
throw new CWorkerThreadException(ex.Message, false);
}
catch (Exception ex)
{
// 如果錯誤調用程序集,則強制發出終止異常
throw new CWorkerThreadException(ex.Message, true);
}
// 如果沒有錯誤,則檢查對象調用的返回狀態
switch (wbrSample)
{
case WebMessageReturn.ReturnBad:
throw new CWorkerThreadException
("Unable to process message: Message marked bad", false);
case WebMessageReturn.ReturnAbort:
throw new CWorkerThreadException
("Unable to process message: Process terminating", true);
default:
break;
}

提供的示例組件將消息正文寫入數據庫表。如果捕獲到嚴重數據庫錯
誤,您可能希望終止處理過程,但是在這里,僅僅將消息標記為錯誤的消
息。

由于此示例中創建的類實例可能會獲取并保留昂貴的數據庫資源,所
以用 OnPause和 OnContinue 方法釋放和重新獲取對象引用。

檢測設備

就象在所有優秀的應用程序中一樣,檢測設備用于監測應用程序的狀
態。。NET 框架大大簡化了將事件日志、性能計數器和 Windows管理檢測
設備(WMI )納入應用程序的過程。消息應用程序使用時間日志和性能計
數器,二者都是來自 System.Diagnostics 程序集。

在 ServiceBase類中,您可以自動啟用事件日志。另外,ServiceBase
EventLog成員支持寫入應用程序事件日志:
EventLog.WriteEntry(sMyMessage, EventLogEntryType.Information);

對于寫入事件日志而不是應用程序日志的應用程序,它能夠很容易地
創建和獲取 EventLog 資源的引用(正如在 CWorker類中所做的一樣),
并能夠使用 WriteEntry 方法記錄日志項:
private EventLog cLog;
string sSource = ServiceControl.ServiceControlName;
string sLog = "Application";
// 查看源是否存在,如果不存在,則創建源
if (!EventLog.SourceExists(sSource))
EventLog.CreateEventSource(sSource, sLog);
// 創建日志對象,并引用現在定義的源
cLog = new EventLog();
cLog.Source = sSource;
// 在日志中寫入條目,表明創建成功
cLog.WriteEntry("已成功創建", EventLogEntryType.Information);

.NET 框架大大簡化了性能計數器。對于每一個處理線程、線程導出
的用戶和整個應用程序,這一消息應用程序都能提供計數器,用于跟蹤消
息數量和每秒鐘處理消息的數量。要提供此功能,您需要定義性能計數器
的類別,然后增加相應的計數器實例。

性能計數器的類別在服務 OnStart方法中定義。這些類別代表兩種計
數器——消息總數和每秒鐘處理的消息數:
CounterCreationData[] cdMessage = new CounterCreationData[2];
cdMessage[0] = new CounterCreationData("Messages/Total", "Total
Messages Processed",
PerformanceCounterType.NumberOfItems64);
cdMessage[1] = new CounterCreationData("Messages/Second",
"Messages Processed a Second",
PerformanceCounterType.RateOfChangePerSecond32);
PerformanceCounterCategory.Create("MSDN Message Service", "MSDN
Message Service Counters", cdMessage);

一旦定義了性能計數器類別,將創建 PerformanceCounter 對象以訪
問計數器實例功能。PerformanceCounter對象需要類別、計數器名稱和一
個可選的實例名稱。對于輔助進程,將使用來自 XML文件的進程名稱,代
碼如下:
pcMsgTotWorker = new PerformanceCounter("MSDN Message Service",
"Messages/Total", sProcessName);
pcMsgSecWorker = new PerformanceCounter("MSDN Message Service",
"Messages/Second", sProcessName);
pcMsgTotWorker.RawValue = 0;
pcMsgSecWorker.RawValue = 0;

要增加計數器的值,僅僅需要調用適當的方法:

pcMsgTotWorker.IncrementBy(1);
pcMsgSecWorker.IncrementBy(1);

最后說明一點,服務終止時,安裝的性能計數器類別應該從系統中刪除:

PerformanceCounterCategory.Delete("MSDN Message Service");

由于性能計數器在 .NET 框架中工作,因此需要運行一項特殊的服務。
此服務(PerfCounterService)提供了共享內存。計數器信息將寫入共享
內存,并被性能計數器系統讀取。

安裝

在結束以前,我們來簡要介紹一下安裝以及稱為 installutil.exe的
安裝工具。由于此應用程序是 Windows服務,它必須使用installutil.exe
來安裝。因此,需要使用一個從 System.Configuration.Install 程序集
中繼承的 Installer類:
public class ServiceRegister: Installer
{
private ServiceInstaller serviceInstaller;
private ServiceProcessInstaller processInstaller;
public ServiceRegister()
{
// 創建服務安裝程序
serviceInstaller = new ServiceInstaller();
serviceInstaller.StartType = ServiceStart.Manual;
serviceInstaller.ServiceName = ServiceControl.ServiceControl
Name;
serviceInstaller.DisplayName = ServiceControl.ServiceControl
Desc;
Installers.Add(serviceInstaller);
// 創建進程安裝程序
processInstaller = new ServiceProcessInstaller();
processInstaller.RunUnderSystemAccount = true;
Installers.Add(processInstaller);
}
}

如此示例類所示,對于一個 Windows服務,服務和服務進程各需要一
個安裝程序,以定義運行服務的帳戶。其他安裝程序允許注冊事件日志和
性能計數器等資源。

總結

從這個 .NET 框架應用程序示例中可以看出,以前只有 Visual C++
程序員能夠編寫的應用程序,現在使用簡單的面向對象程序即可實現。盡
管我們的重點是 C# ,但本文所述的內容也同樣適用于 Visual Basic 和
Managed C++.新的 .NET 框架使開發人員能夠使用任何編程語言來創建功
能強大、可伸縮的 Windows應用程序和服務。

新的 .NET 框架不僅簡化和擴展了編程的種種可能,還能夠輕松地將
人們經常遺忘的應用程序檢測設備(例如性能監測計數器和事件日志通知)
合并到應用程序中。盡管這里的應用程序沒有使用 Windows管理檢測設備
(WMI ),但 .NET 框架同樣也可以應用它。


北斗有巢氏 有巢氏北斗