下面图老师小编要向大家介绍下C#消息队列应用程序,看起来复杂实则是简单的,掌握好技巧就OK,喜欢就赶紧收藏起来吧!
【 tulaoshi.com - 编程语言 】
简介
Microsoft 近期推出了一种用于生成集成应用程序的新平台Microsoft .NET 框架。.NET 框架允许开发人员使用任何编程语言迅速生成和部署 Web 服务和应用程序。Microsoft Intermediate Language (MSIL) 和实时 (JIT) 编译器使这种不依赖语言的框架得以实现。
与 .NET 框架同时面世的还有一种新的编程语言 C#(读作C sharp)。C# 是一种简单、新颖、面向对象和类型安全的编程语言。利用 .NET 框架和 C#(除 Microsoft? Visual Basic? 和 Managed C++ 之外),用户可以编写功能强大的 Microsoft Windows? 和 Web 应用程序及服务。本文提供了这样的一个解决方案,它的重点是 .NET 框架和 C# 而不是编程语言。C# 语言的介绍可以在 C# 简介和概述(英文)找到。
近期的文章MSMQ:可伸缩、高可用性的负载平衡解决方案(英文)介绍了一种解决方案,用于高可用性消息队列 (MSMQ) 的可伸缩负载平衡解决方案体系结构。此解决方案中涉及了一种将 Windows 服务用作智能消息路由器的开发方案。这样的解决方案以前只有 Microsoft Visual C++? 程序员才能实现,而 .NET 框架的出现改变了这种情况。从下面的解决方案中,您可以看到这一点。
.NET 框架应用程序
这里介绍的解决方案是一种用来处理若干消息队列的 Windows 服务;其中每个队列都是由多个线程进行处理(接收和处理消息)。处理程序使用循环法技术或应用程序特定值(消息 AppSpecific 属性)从目的队列列表中路由消息,并使用消息属性来调用组件方法。(示例进程也属于这种情况。)在后一种情况下,组件的要求是它能够实现给定的接口 IWebMessage。要处理错误,应用程序需要将不能处理的消息发送到错误队列中。
消息应用程序的结构与以前的活动模板库 (ATL) 应用程序相似,它们之间的主要不同在于用于管理服务的代码的封装和 .NET 框架组件的使用。要创建 Windows 服务,.NET 框架用户仅仅需要创建一个从 ServiceBase(来自 System.ServiceControl 程序集)继承的类。这毫不奇怪,因为 .NET 框架是面向对象的。
应用程序结构
应用程序中主要的类是 ServiceControl,它是从 ServiceBase 继承的。因而,它必须实现 OnStart 和 OnStop 方法,以及可选的 OnPause 和 OnContinue 方法。事实上,类是在静态方法 Main 内构造的:
using System;using System.ServiceProcess;public class ServiceControl: ServiceBase{ // 创建服务对象的主入口点 public static void Main() { ServiceBase.Run(new ServiceControl()); } // 定义服务参数的构造对象 public ServiceControl() { CanPauseAndContinue = true; ServiceName = "MSDNMessageService"; AutoLog = false; } protected override void OnStart(string[] args) {...} protected override void OnStop() {...} protected override void OnPause() {...} protected override void OnContinue() {...}}
ServiceControl 类创建一系列 CWorker 对象,即,为需要处理的每个消息队列创建 CWorker 类的一个实例。根据定义中处理队列所需的线程数目,CWorker 类依次创建了一系列的 CWorkerThread 对象。CWorkerThread 类创建的一个处理线程将执行实际的服务工作。
使用 CWorker 和 CWorkerThread 类的主要目的是确认服务控件 Start、Stop、Pause 和 Continue 命令。因为这些进程必须是无阻塞的,命令操作最终将在后台处理线程上执行。
CWorkerThread 是一个抽象类,被 CWorkerThreadAppSpecific 、CWorkerThreadRoundRobin 和 CWorkerThreadAssembly 继承。这些类以不同的方式处理消息。前两个类通过给另一队列发送消息来处理消息(其不同之处在于确定接收队列路径的方式),最后一个类则使用消息属性来调用组件方法。
.NET 框架内部的错误处理是以基类 Exception 为基础的。当系统引发或捕获错误时,这些错误必须是从 Exception 中导出的类。CWorkerThreadException 类就是这样一种实现,它通过附加额外属性(用于定义服务是否应继续运行)来扩展基类。
最后,应用程序包含两种结构。这些值类型定义了辅助进程或线程的运行时参数,以简化 CWorker 和 CWorkerThread 对象的结构。使用值类型结构(而不是引用类型类)能够确保这些运行时参数维护的是数值(而不是引用)。
IWebMessage 接口
CWorkerThread 的实现之一是一个调用组件方法的类。这个名为 CWorkerThreadAssembly 的类使用 IWebMessage 接口来定义服务和组件之间的约定。
与当前版本的 Microsoft Visual Studio? 不同,C# 接口可以在任何语言中显式定义,而不需要创建和编译 IDL 文件。C# IWebMessage 接口的定义如下:
public interface IWebMessage{ WebMessageReturn Process(string sMessageLabel, string sMessageBody, int iAppSpecific); void Release();}
ATL 代码中的 Process 方法是为处理消息而指定的。Process 方法的返回代码定义为枚举类型 WebMessageReturn:
(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/bianchengyuyan/)
public enum WebMessageReturn{ ReturnGood, ReturnBad, ReturnAbort}
枚举的定义如下:Good 表示继续处理,Bad 表示将消息写入错误队列,Abort 表示终止处理。Release 方法为服务提供了轻松清除类实例的途径。因为仅在垃圾回收的过程中才调用类实例的析构函数,所以确保所有占用昂贵资源(例如数据库连接)的类都有一个能够在析构之前被调用的方法,用来释放这些资源,这是一种非常好的构思。
名称空间
在这里先简单介绍一下名称空间。名称空间允许在内部和外部表示中将应用程序组织成为逻辑元素。服务内的所有代码都包含在 MSDNMessageService.Service 名称空间内。尽管服务代码包含在若干文件中,但是由于它们包含在同一名称空间中,因此用户不需要引用其他文件。
由于 IWebMessage 接口包含在 MSDNMessageService.Interface 名称空间中,因此使用此接口的线程类具有一个接口名称空间。
服务类
应用程序的目的是监视和处理消息队列,每一队列在收到消息时都执行不同的进程。应用程序是作为 Windows 服务来实现的。
ServiceBase 类
如前所述,服务的基本结构是从 ServiceBase 继承的类。重要的方法包括 OnStart、OnStop、OnPause 和 OnContinue,每一个替代方法都与一个服务控制操作直接对应。OnStart 方法的目的是创建 CWorker 对象,而 CWorker 类又创建 CWorkerThread 对象,然后在该对象中创建执行服务工作的线程。
服务的运行时配置(以及 CWorker 和 CWorkerThread 对象的属性)是在基于 XML 的配置文件中维护的。它的名称与创建的 .exe 文件相同,但带有一个 .cfg 后缀。配置示例如下:
<?xml version="1.0"?><configuration><ProcessList> <ProcessDefinition ProcessName="Worker1" ProcessDesc="Message Worker with 2 Threads" ProcessType="AppSpecific" ProcessThreads="2" InputQueue=".private$test_load1" ErrorQueue=".private$test_error"> <OutputList> <OutputDefinition OutputName=".private$test_out11" /> <OutputDefinition OutputName=".private$test_out12" /> </OutputList> </ProcessDefinition> <ProcessDefinition ProcessName="Worker2" ProcessDesc="Assembly Worker with 1 Thread" ProcessType="Assembly" ProcessThreads="1" InputQueue=".private$test_load2" ErrorQueue=".private$test_error"> <OutputList> <OutputDefinition OutputName="C:MSDNMessageServiceMessageExample.dll" /> <OutputDefinition OutputName="MSDNMessageService.MessageSample.ExampleClass"/> </OutputList> </ProcessDefinition></ProcessList></configuration>
对此信息的访问通过来自 System.Configuration 程序集的 ConfigManager 类来管理。静态 Get 方法返回信息的集合,这些集合将被枚举以获得单个属性。这些属性集的设置决定了辅助对象的运行时特征。除了这一配置文件,您还应该创建定义 XML 文件结构的图元文件,并在其中引用位于服务器 machine.cfg 配置文件中的图元文件:
<?xml version ="1.0"?><MetaData xmlns="x-schema:CatMeta.xms"> <DatabaseMeta InternalName="MessageService"> <ServerWiring Interceptor="Core_XMLInterceptor"/> <Collection InternalName="Process" PublicName="ProcessList" PublicRowName="ProcessDefinition" SchemaGeneratorFlags="EMITXMLSCHEMA"> <Property InternalName="ProcessName" Type="String" MetaFlags="PRIMARYKEY" /> <Property InternalName="ProcessDesc" Type="String" /> <Property InternalName="ProcessType" Type="Int32" DefaultValue="RoundRobin" > <Enum InternalName="RoundRobin" Value="0"/> <Enum InternalName="AppSpecific" Value="1"/> <Enum InternalName="Assembly" Value="2"/> </Property> <Property InternalName="ProcessThreads" Type="Int32" DefaultValue="1" /> <Property InternalName="InputQueue" Type="String" /> <Property InternalName="ErrorQueue" Type="String" /> <Property InternalName="OutputName" Type="String" /> <QueryMeta InternalName="All" MetaFlags="ALL" /> <QueryMeta InternalName="QueryByFile" CellName="__FILE" Operator="EQUAL" /> </Collection> <Collection InternalName="Output" PublicName="OutputList" PublicRowName="OutputDefinition" SchemaGeneratorFlags="EMITXMLSCHEMA"> <Property InternalName="ProcessName" Type="String" MetaFlags="PRIMARYKEY" /> <Property InternalName="OutputName" Type="String" MetaFlags="PRIMARYKEY" /> <QueryMeta InternalName="All" MetaFlags="ALL" /> <QueryMeta InternalName="QueryByFile" CellName="__FILE" Operator="EQUAL" /> </Collection> </DatabaseMeta> <RelationMeta PrimaryTable="Process" PrimaryColumns="ProcessName" ForeignTable="Output" ForeignColumns="ProcessName" MetaFlags="USECONTAINMENT"/></MetaData>
由于 Service 类必须维护一个已创建辅助对象的列表,因此使用了 Hashtable 集合,用于保持类型对象的名称/数值对列表。Hashtable 不仅支持枚举,还允许通过关键字来查询值。在应用程序中,XML 进程名称是唯一的关键字:
private Hashtable htWorkers = new Hashtable();IConfigCollection cWorkers = ConfigManager.Get("ProcessList", new AppDomainSelector());foreach (IConfigItem ciWorker in cWorkers){ WorkerFormatter sfWorker = new WorkerFormatter(); sfWorker.ProcessName = (string)ciWorker["ProcessName"]; sfWorker.ProcessDesc = (string)ciWorker["ProcessDesc"]; sfWorker.NumberThreads = (int)ciWorker["ProcessThreads"]; sfWorker.InputQueue = (string)ciWorker["InputQueue"]; sfWorker.ErrorQueue = (string)ciWorker["ErrorQueue"]; // 计算并定义进程类型 switch ((int)ciWorker["ProcessType"]) { case 0: sfWorker.ProcessType = WorkerFormatter.SFProcessType.ProcessRoundRobin; break; case 1: sfWorker.ProcessType = WorkerFormatter.SFProcessType.ProcessAppSpecific; break; case 2: sfWorker.ProcessType = WorkerFormatter.SFProcessType.ProcessAssembly; break; default: throw new Exception("Unknown Processing Type"); } // 执行更多的工作以读取输出信息 string sProcessName = (string)ciWorker["ProcessName"]; if (htWorkers.ContainsKey(sProcessName)) throw new ArgumentException("Process Name Must be Unique: " + sProcessName); htWorkers.Add(sProcessName, new CWorker(sfWorker));}
在这段代码中没有包含的主要信息是输出数据的获取。每一个进程定义中都有一组相应的输出定义项。该信息是通过如下的简单查询读取的:
string sQuery = "SELECT * FROM OutputList WHERE ProcessName=" +sfWorker.ProcessName + " AND Selector=appdomain://";ConfigQuery qQuery = new ConfigQuery(sQuery);IConfigCollection cOutputs = ConfigManager.Get("OutputList", qQuery);int iSize = cOutputs.Count, iLoop = 0;sfWorker.OutputName = new string[iSize];foreach (IConfigItem ciOutput in cOutputs)sfWorker.OutputName[iLoop++] = (string)ciOutput["OutputName"];
CWorkerThread 和 Cworker 类都有相应的服务控制方法,根据服务控制操作进行调用。由于 Hashtable 中引用了每一个 CWorker 对象,因此需要枚举 Hashtable 的内容,以调用适当的服务控制方法:
foreach (CWorker cWorker in htWorkers.Values) cWorker.Start();
类似地,实现的 OnPause、OnContinue 和 OnStop 方法是通过调用 CWorker 对象上的相应方法来执行操作的。
CWorker 类
CWorker 类的主要功能是创建和管理 CWorkerThread 对象。Start、Stop、Pause 和 Continue 方法调用相应的 CWorkerThread 方法。实际的 CWorkerThread 对象是在Start 方法中创建的。与使用 Hashtable 管理辅助对象引用的 Service 类相似,CWorker 使用 ArrayList(简单的动态数组)来维护线程对象的列表。
在这个数组内部,CWorker 类创建了 CWorkerThread 类的一个实现版本。CWorkerThread 类(将在下面讨论)是一个必须继承的抽象类。导出类定义了消息的处理方式:
aThreads = new ArrayList();for (int idx=0; idx<sfWorker.NumberThreads; idx++){ WorkerThreadFormatter wfThread = new WorkerThreadFormatter(); wfThread.ProcessName = sfWorker.ProcessName; wfThread.ProcessDesc = sfWorker.ProcessDesc; wfThread.ThreadNumber = idx; wfThread.InputQueue = sfWorker.InputQueue; wfThread.ErrorQueue = sfWorker.ErrorQueue; wfThread.OutputName = sfWorker.OutputName; // 定义辅助类型,并将其插入辅助线程结构 CWorkerThread wtBase; switch (sfWorker.ProcessType) { case WorkerFormatter.SFProcessType.ProcessRoundRobin: wtBase = new CWorkerThreadRoundRobin(this, wfThread); break; case WorkerFormatter.SFProcessType.ProcessAppSpecific: wtBase = new CWorkerThreadAppSpecific(this, wfThread); break; case WorkerFormatter.SFProcessType.ProcessAssembly: wtBase = new CWorkerThreadAssembly(this, wfThread); break; default: throw new Exception("Unknown Processing Type"); } // 添加对数组的调用 aThreads.Insert(idx, wtBase);}
一旦所有的对象都已创建,就可以通过调用每个线程对象的 Start 方法来启动它们:
foreach(CWorkerThread cThread in aThreads) cThread.Start();
Stop、Pause 和 Continue 方法在 foreach 循环里执行的操作类似。Stop 方法具有如下的垃圾收集操作:
GC.SuppressFinalize(this);
在类析构函数中将调用 Stop 方法,这样,在没有显式调用 Stop 方法的情况下也可以正确地终止对象。如果调用了 Stop 方法,将不需要析构函数。SuppressFinalize 方法能够防止调用对象的 Finalize 方法(析构函数的实际实现)。
CWorkerThread 抽象类
CWorkerThread 是一个由 CWorkerThreadAppSpecifc、CWorkerThreadRoundRobin 和 CWorkerThreadAssembly 继承的抽象类。无论如何处理消息,队列的大部分处理是相同的,所以 CWorkerThread 类提供了这一功能。这个类提供了抽象方法(必须被实际方法替代)以管理资源和处理消息。
类的工作再一次通过 Start、Stop、Pause 和 Continue 方法来实现。在 Start 方法中引用了输入和错误队列。在 .NET 框架中,消息由 System.Messaging 名称空间处理:
// 尝试打开队列,并设置默认的读写属性MessageQueue mqInput = new MessageQueue(sInputQueue);mqInput.MessageReadPropertyFilter.Body = true;mqInput.MessageReadPropertyFilter.AppSpecific = true;MessageQueue mqError = new MessageQueue(sErrorQueue);// 如果使用 MSMQ COM,则将格式化程序设置为 ActiveXmqInput.Formatter = new ActiveXMessageFormatter();mqError.Formatter = new ActiveXMessageFormatter();
一旦定义了消息队列引用,即会创建一个线程用于实际的处理函数(称为 ProcessMessages)。在 .NET 框架中,使用 System.Threading 名称空间很容易实现线程处理:
procMessage = new Thread(new ThreadStart(ProcessMessages));procMessage.Start();
ProcessMessages 函数是基于 Boolean 值的处理循环。当数值设为 False,处理循环将终止。因此,线程对象的 Stop 方法只设置这一 Boolean 值,然后关闭打开的消息队列,并加入带有主线程的线程:
// 加入服务线程和处理线程bRun = false;procMessage.Join();// 关闭打开的消息队列mqInput.Close();mqError.Close();
Pause 方法只设置一个 Boolean 值,使处理线程休眠半秒钟:
if (bPause) Thread.Sleep(500);
最后,每一个 Start、Stop、Pause 和 Continue 方法将调用抽象的 OnStart、OnStop、OnPause 和 OnContinue 方法。这些抽象方法为实现的类提供了挂钩,以捕获和释放所需的资源。
ProcessMessages 循环具有如下基本结构:
1、接收 Message。
2、如果 Message 具有成功的 Receive,则调用抽象 ProcessMessage 方法。
3、如果 Receive 或 ProcessMessage 失败,将 Message 发送至错误队列中。
Message mInput;try{ // 从队列中读取,并等候 1 秒 mInput = mqInput.Receive(new TimeSpan(0,0,0,1));}catch (MessageQueueException mqe){ // 将消息设置为 null mInput = null; // 查看错误代码,了解是否超时 if (mqe.ErrorCode != (-1072824293) ) //0xC00E001B { // 如果未超时,发出一个错误并记录错误号 LogError("Error: " + mqe.Message); throw mqe; }}if (mInput != null){ // 得到一个要处理的消息,调用处理消息抽象方法 try { ProcessMessage(mInput); } // 捕获已知异常状态的错误 catch (CWorkerThreadException ex) { ProcessError(mInput, ex.Terminate); } // 捕获未知异常,并调用 Terminate catch { ProcessError(mInput, true); }}
ProcessError 方法将错误的消息发送至错误队列。另外,它也可能引发异常来终止线程。如果ProcessMessage 方法引发了终止错误或 CWorkerThreadException 类型,它将执行此操作。
CworkerThread 导出类
任何从 CWorkerThread 中继承的类都必须提供 OnStart、OnStop、OnPause、OnContinue 和 ProcessMessage 方法。OnStart 和 OnStop 方法获取并释放处理资源。OnPause 和 OnContinue 方法允许临时释放和重新获取这些资源。ProcessMessage 方法应该处理消息,并在出现失败事件时引发 CWorkerThreadException 异常。
由于 CWorkerThread 构造函数定义运行时参数,导出类必须调用基类构造函数:
public CWorkerThreadDerived(CWorker v_cParent, WorkerThreadFormatter v_wfThread): base (v_cParent, v_wfThread) {}
导出类提供了两种类型的处理:将消息发送至另一队列,或者调用组件方法。接收和发送消息的两种实现使用了循环技术或应用程序偏移(保留在消息 AppSpecific 属性中),作为使用哪一队列的决定因素。此方案中的配置文件应该包括队列路径的列表。实现的 OnStart 和 OnStop 方法应该打开和关闭对这些队列的引用:
iQueues = wfThread.OutputName.Length;mqOutput = new MessageQueue[iQueues];for (int idx=0; idx<iQueues; idx++){ mqOutput[idx] = new MessageQueue(wfThread.OutputName[idx]); mqOutput[idx].Formatter = new ActiveXMessageFormatter();}
在这些方案中,消息的处理很简单:将消息发送必要的输出队列。在循环情况下,这个进程为:
try{ mqOutput[iNextQueue].Send(v_mInput);}catch (Exception ex){ // 如果错误强制终止异常 throw new CWorkerThreadException(ex.Message, true);}// 计算下一个队列号iNextQueue++;iNextQueue %= iQueues;
后一种调用带消息参数的组件的实现方法比较有趣。ProcessMessage 方法使用 IWebMessage 接口调入一个 .NET 组件。OnStart 和 OnStop 方法获取和释放此组件的引用。
此方案中的配置文件应该包含两个项目:完整的类名和类所在文件的位置。按照 IWebMessage 接口中的定义,在组件上调用 Process 方法。
要获取对象引用,需要使用 Activator.CreateInstance 方法。此函数需要一个程序集类型。在这里,它是从程序集文件路径和类名中导出的。一旦获取对象引用,它将被放入合适的接口:
private IWebMessage iwmSample;private string sFilePath, sTypeName;// 保存程序集路径和类型名称sFilePath = wfThread.OutputName[0];sTypeName = wfThread.OutputName[1];// 获取对必要对象的引用Assembly asmSample = Assembly.LoadFrom(sFilePath);Type typSample = asmSample.GetType(sTypeName);object objSample = Activator.CreateInstance(typSample);// 定义给对象的必要接口iwmSample = (IWebMessage)objSample;获取对象引用后,ProcessMessage 方法将在 IWebMessage 接口上调用 Process 方法:WebMessageReturn wbrSample;try{ // 定义方法调用的参数 string sLabel = v_mInput.Label; string sBody = (string)v_mInput.Body; int iAppSpecific = v_mInput.AppSpecific; // 调用方法并捕捉返回代码 wbrSample = iwmSample.Process(sLabel, sBody, iAppSpecific);}catch (InvalidCastException ex){ // 如果在消息内容中发生错误,则强制发出一个非终止异常 throw new CWorkerThreadException(ex.Message, false);}catch (Exception ex){ // 如果错误调用程序集,则强制发出终止异常 throw new CWorkerThreadException(ex.Message, true);}// 如果没有错误,则检查对象调用的返回状态switch (wbrSample){ case WebMessageReturn.ReturnBad: throw new CWorkerThreadException ("Unable to process message: Message marked bad", false); case WebMessageReturn.ReturnAbort: throw new CWorkerThreadException ("Unable to process message: Process terminating", true); default: break;}
提供的示例组件将消息正文写入数据库表。如果捕获到严重数据库错误,您可能希望终止处理过程,但是在这里,仅仅将消息标记为错误的消息。
由于此示例中创建的类实例可能会获取并保留昂贵的数据库资源,所以用 OnPause 和 OnContinue 方法释放和重新获取对象引用。
检测设备
就象在所有优秀的应用程序中一样,检测设备用于监测应用程序的状态。.NET 框架大大简化了将事件日志、性能计数器和 Windows 管理检测设备 (WMI) 纳入应用程序的过程。消息应用程序使用时间日志和性能计数器,二者都是来自 System.Diagnostics 程序集。
在 ServiceBase 类中,您可以自动启用事件日志。另外,ServiceBase EventLog 成员支持写入应用程序事件日志:
EventLog.WriteEntry(sMyMessage, EventLogEntryType.Information);(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/bianchengyuyan/)
来源:http://www.tulaoshi.com/n/20160219/1614342.html