top
Loading...
C#消息隊列應用程序-1
簡介

Microsoft近期推出一種用于生成集成應用程序的新平臺——Microsoft
.NET框架。.NET 框架允許開發人員使用任何編程語言迅速生成和部署Web
服務和應用程序。Microsoft Intermediate Language (MSIL)和實時
(JIT )編譯器使這種不依賴語言的框架得以實現。

與.NET框架同時面世的還有一種新的編程語言C#(讀“C sharp”)。
C#是一種簡單、新穎、面向對象和類型安全的編程語言。利用 .NET 框架
和 C# (除 Microsoft? Visual Basic ?和 Managed C++之外),用戶
可以編寫功能強大的 Microsoft Windows?和 Web應用程序及服務。本文
提供了這樣的一個解決方案,它的重點是 .NET 框架和 C# 而不是編程語
言。C#語言的介紹可以在“ C# 簡介和概述(英文)”找到。

近期的文章“MSMQ:可伸縮、高可用性的負載平衡解決方案(英文)”
介紹了一種解決方案,用于高可用性消息隊列(MSMQ)的可伸縮負載平衡
解決方案體系結構。此解決方案中涉及了一種將 Windows服務用作智能消
息路由器的開發方案。這樣的解決方案以前只有 Microsoft Visual C++
程序員才能實現,而 .NET 框架的出現改變了這種情況。從下面的解決方
案中,您可以看到這一點。

.NET 框架應用程序

這里介紹的解決方案是一種用來處理若干消息隊列的 Windows服務;
其中每個隊列都是由多個線程進行處理(接收和處理消息)。處理程序使
用循環法技術或應用程序特定值(消息 AppSpecific屬性)從目的隊列列
表中路由消息,并使用消息屬性來調用組件方法。(示例進程也屬于這種
情況。)在后一種情況下,組件的要求是它能夠實現給定的接口IWeb
Message要處理錯誤,應用程序需要將不能處理的消息發送到錯誤隊列中。

消息應用程序的結構與以前的活動模板庫(ATL )應用程序相似,它
們之間的主要不同在于用于管理服務的代碼的封裝和 .NET 框架組件的使
用。要創建Windows服務,.NET框架用戶僅僅需要創建一個從 ServiceBase
(來自System.ServiceControl程序集)繼承的類。這毫不奇怪,因為.NET
框架是面向對象的。

應用程序結構

應用程序中主要的類是 ServiceControl ,它是從 ServiceBase繼承
的。因而,它必須實現 OnStart和 OnStop 方法,以及可選的 OnPause和
OnContinue方法。事實上,類是在靜態方法 Main 內構造的:

using System;
using System.ServiceProcess;

public class ServiceControl: ServiceBase
{
// 創建服務對象的主入口點
public static void Main()
{
ServiceBase.Run(new ServiceControl());
}

// 定義服務參數的構造對象
public ServiceControl()
{
CanPauseAndContinue = true;
ServiceName = "MSDNMessageService";
AutoLog = false;
}

protected override void OnStart(string[] args) {...}
protected override void OnStop() {...}
protected override void OnPause() {...}
protected override void OnContinue() {...}
}

ServiceControl類創建一系列 CWorker對象,即,為需要處理的每個
消息隊列創建 CWorker類的一個實例。根據定義中處理隊列所需的線程數
目,CWorker 類依次創建了一系列的 CWorkerThread對象。CWorkerThread
類創建的一個處理線程將執行實際的服務工作。

使用 CWorker和 CWorkerThread類的主要目的是確認服務控件 Start、
Stop、Pause 和 Continue 命令。因為這些進程必須是無阻塞的,命令操
作最終將在后臺處理線程上執行。

CWorkerThread 是一個抽象類,被 CWorkerThreadAppSpecific 、
CWorkerThreadRoundRobin 和 CWorkerThreadAssembly繼承。這些類以不
同的方式處理消息。前兩個類通過給另一隊列發送消息來處理消息(其不
同之處在于確定接收隊列路徑的方式),最后一個類則使用消息屬性來調
用組件方法。

.NET 框架內部的錯誤處理是以基類 Exception為基礎的。當系統引
發或捕獲錯誤時,這些錯誤必須是從 Exception中導出的類。CWorker
ThreadException 類就是這樣一種實現,它通過附加額外屬性(用于定義
服務是否應繼續運行)來擴展基類。

最后,應用程序包含兩種結構。這些值類型定義了輔助進程或線程的
運行時參數,以簡化 CWorker和 CWorkerThread對象的結構。使用值類型
結構(而不是引用類型類)能夠確保這些運行時參數維護的是數值(而不
是引用)。

IWebMessage 接口

CWorkerThread 的實現之一是一個調用組件方法的類。這個名為
CWorkerThreadAssembly 的類使用 IWebMessage接口來定義服務和組件之
間的約定。

與當前版本的 Microsoft Visual Studio?不同,C#接口可以在任何
語言中顯式定義,而不需要創建和編譯 IDL文件。C# IWebMessage接口的
定義如下:
public interface IWebMessage
{
WebMessageReturn Process(string sMessageLabel, string sMessage
Body, int iAppSpecific);
void Release();
}

ATL 代碼中的 Process 方法是為處理消息而指定的。Process 方法的返
回代碼定義為枚舉類型 WebMessageReturn:

public enum WebMessageReturn
{
ReturnGood,
ReturnBad,
ReturnAbort
}

枚舉的定義如下:Good表示繼續處理,Bad 表示將消息寫入錯誤隊列,
Abort 表示終止處理。Release 方法為服務提供了輕松清除類實例的途徑。
因為僅在垃圾回收的過程中才調用類實例的析構函數,所以確保所有占用
昂貴資源(例如數據庫連接)的類都有一個能夠在析構之前被調用的方法,
用來釋放這些資源,這是一種非常好的構思。

名稱空間

在這里先簡單介紹一下名稱空間。名稱空間允許在內部和外部表示中
將應用程序組織成為邏輯元素。服務內的所有代碼都包含在 MSDNMessage
Service.Service 名稱空間內。盡管服務代碼包含在若干文件中,但是由
于它們包含在同一名稱空間中,因此用戶不需要引用其他文件。

由于 IWebMessage接口包含在 MSDNMessageService.Interface 名稱
空間中,因此使用此接口的線程類具有一個接口名稱空間。

服務類

應用程序的目的是監視和處理消息隊列,每一隊列在收到消息時都執
行不同的進程。應用程序是作為 Windows服務來實現的。

ServiceBase 類

如前所述,服務的基本結構是從 ServiceBase繼承的類。重要的方法
包括 OnStart、OnStop、OnPause 和 OnContinue ,每一個替代方法都與
一個服務控制操作直接對應。OnStart 方法的目的是創建 CWorker對象,
而 CWorker類又創建 CWorkerThread對象,然后在該對象中創建執行服務
工作的線程。

服務的運行時配置(以及 CWorker和 CWorkerThread對象的屬性)是
在基于 XML的配置文件中維護的。它的名稱與創建的 .exe 文件相同,但
帶有一個 .cfg 后綴。配置示例如下:
〈?xml version="1.0"?〉
〈configuration〉
〈ProcessList〉
〈ProcessDefinition
ProcessName="Worker1"
ProcessDesc="Message Worker with 2 Threads"
ProcessType="AppSpecific"
ProcessThreads="2"
InputQueue=".private$est_load1"
ErrorQueue=".private$est_error"〉
〈OutputList〉
〈OutputDefinition OutputName=".private$est_out11" /〉
〈OutputDefinition OutputName=".private$est_out12" /〉
〈/OutputList〉
〈/ProcessDefinition〉
〈ProcessDefinition
ProcessName="Worker2"
ProcessDesc="Assembly Worker with 1 Thread"
ProcessType="Assembly"
ProcessThreads="1"
InputQueue=".private$est_load2"
ErrorQueue=".private$est_error"〉
〈OutputList〉
〈OutputDefinition OutputName="C:MSDNMessageServiceMessage
Example.dll" /〉
〈OutputDefinition OutputName="MSDNMessageService.Message
Sample.ExampleClass"/〉
〈/OutputList〉
〈/ProcessDefinition〉
〈/ProcessList〉
〈/configuration〉

對此信息的訪問通過來自 System.Configuration 程序集的 Config
Manager 類來管理。靜態 Get方法返回信息的集合,這些集合將被枚舉以
獲得單個屬性。這些屬性集的設置決定了輔助對象的運行時特征。除了這
一配置文件,您還應該創建定義 XML文件結構的圖元文件,并在其中引用
位于服務器 machine.cfg配置文件中的圖元文件:

〈?xml version ="1.0"?〉
〈MetaData xmlns="x-schema:CatMeta.xms"〉
〈DatabaseMeta InternalName="MessageService"〉
〈ServerWiring Interceptor="Core_XMLInterceptor"/〉
〈Collection
InternalName="Process" PublicName="ProcessList"
PublicRowName="ProcessDefinition"
SchemaGeneratorFlags="EMITXMLSCHEMA"〉
〈Property InternalName="ProcessName" Type="String" Meta
Flags="PRIMARYKEY" /〉
〈Property InternalName="ProcessDesc" Type="String" /〉
〈Property InternalName="ProcessType" Type="Int32" Default
Value="RoundRobin" 〉
〈Enum InternalName="RoundRobin"Value="0"/〉
〈Enum InternalName="AppSpecific" Value="1"/〉
〈Enum InternalName="Assembly" Value="2"/〉
〈/Property〉
〈Property InternalName="ProcessThreads" Type="Int32"
DefaultValue="1" /〉
〈Property InternalName="InputQueue" Type="String" /〉
〈Property InternalName="ErrorQueue" Type="String" /〉
〈Property InternalName="OutputName" Type="String" /〉
〈QueryMeta InternalName="All" MetaFlags="ALL" /〉
〈QueryMeta InternalName="QueryByFile" CellName="__FILE"
Operator="EQUAL"/〉
〈/Collection〉
〈Collection
InternalName="Output" PublicName="OutputList"
PublicRowName="OutputDefinition"
SchemaGeneratorFlags="EMITXMLSCHEMA"〉
〈Property InternalName="ProcessName" Type="String" Meta
Flags="PRIMARYKEY" /〉
〈Property InternalName="OutputName" Type="String" Meta
Flags="PRIMARYKEY" /〉
〈QueryMeta InternalName="All" MetaFlags="ALL" /〉
〈QueryMeta InternalName="QueryByFile" CellName="__FILE"
Operator="EQUAL"/〉
〈/Collection〉
〈/DatabaseMeta〉
〈RelationMeta
PrimaryTable="Process" PrimaryColumns="ProcessName"
ForeignTable="Output"ForeignColumns="ProcessName"
MetaFlags="USECONTAINMENT"/〉
〈/MetaData〉

由于 Service類必須維護一個已創建輔助對象的列表,因此使用了
Hashtable 集合,用于保持類型對象的名稱/ 數值對列表。Hashtable 不
僅支持枚舉,還允許通過關鍵字來查詢值。在應用程序中,XML 進程名稱
是唯一的關鍵字:
private Hashtable htWorkers = new Hashtable();
IConfigCollection cWorkers = ConfigManager.Get("ProcessList", new
AppDomainSelector());
foreach (IConfigItem ciWorker in cWorkers)
{
WorkerFormatter sfWorker = new WorkerFormatter();
sfWorker.ProcessName = (string)ciWorker["ProcessName"];
sfWorker.ProcessDesc = (string)ciWorker["ProcessDesc"];
sfWorker.NumberThreads = (int)ciWorker["ProcessThreads"];
sfWorker.InputQueue = (string)ciWorker["InputQueue"];
sfWorker.ErrorQueue = (string)ciWorker["ErrorQueue"];
// 計算并定義進程類型
switch ((int)ciWorker["ProcessType"])
{
case 0:
sfWorker.ProcessType = WorkerFormatter.SFProcessType.
ProcessRoundRobin;
break;
case 1:
sfWorker.ProcessType = WorkerFormatter.SFProcessType.
ProcessAppSpecific;
break;
case 2:
sfWorker.ProcessType = WorkerFormatter.SFProcessType.
ProcessAssembly;
break;
default:
throw new Exception("Unknown Processing Type");
}
// 執行更多的工作以讀取輸出信息
string sProcessName = (string)ciWorker["ProcessName"];
if (htWorkers.ContainsKey(sProcessName))
throw new ArgumentException("Process Name Must be Unique: "
+ sProcessName);
htWorkers.Add(sProcessName, new CWorker(sfWorker));
}

在這段代碼中沒有包含的主要信息是輸出數據的獲取。每一個進程定
義中都有一組相應的輸出定義項。該信息是通過如下的簡單查詢讀取的:

string sQuery = "SELECT * FROM OutputList WHERE ProcessName=" +
sfWorker.ProcessName + " AND Selector=appdomain://";
ConfigQuery qQuery = new ConfigQuery(sQuery);
IConfigCollection cOutputs = ConfigManager.Get("OutputList",
qQuery);
int iSize = cOutputs.Count, iLoop = 0;
sfWorker.OutputName = new string[iSize];
foreach (IConfigItem ciOutput in cOutputs)
sfWorker.OutputName[iLoop++] = (string)ciOutput["OutputName"];

CWorkerThread 和 Cworker類都有相應的服務控制方法,根據服務控
制操作進行調用。由于 Hashtable中引用了每一個 CWorker對象,因此需
要枚舉 Hashtable的內容,以調用適當的服務控制方法:
foreach (CWorker cWorker in htWorkers.Values)
cWorker.Start();

類似地,實現的 OnPause、OnContinue和 OnStop 方法是通過調用
CWorker 對象上的相應方法來執行操作的。

CWorker 類

CWorker 類的主要功能是創建和管理 CWorkerThread對象。Start 、
Stop、Pause 和 Continue 方法調用相應的 CWorkerThread方法。實際的
CWorkerThread 對象是在Start 方法中創建的。與使用 Hashtable管理輔
助對象引用的 Service類相似,CWorker 使用 ArrayList(簡單的動態數
組)來維護線程對象的列表。

北斗有巢氏 有巢氏北斗