本系列索引,请见《开篇》
这次介绍的 MessageInsepctor 是 WCF 可扩展性中用得最多的一个扩展了,MessageInspector 字面意思 “消息检查员”,可见通过使用这个扩展,可以对消息进行查看,那它到底有什么作用呢?
作 用
先来看看 MSDN 的说明:
Defines the methods that enable custom inspection or modification of inbound and outbound application messages in service applications.
可以用来对请求和响应的消息进行自定义检查和修改。
如果大家阅读过《消息处理流程》,应该会知道 DispatchMessageInspector 被调用的时机正好为于消息抵达具体的 operation 前,或者消息刚刚离开 operation,但还没有传输到网络上。在这两个阶段,可以做很多的事情,比如:
- 监控消息:记录消息抵达的时间和离开的时间,记录消息的内容
- 检查消息:在消息抵达或离开前先对消息进行验证,从而提早发现消息中的问题(比如 XSD 验证)
- 修改消息:对消息正文进行修改,甚至完全替换成一个新的消息
执行时机
当选中 Operation,且 Message 经过授权验证后,就会在 ProcessMessage2 中调用 AfterRecieveRequest 进行消息检查 :
internal void ProcessMessage11(ref MessageRpc rpc) { rpc.NextProcessor = this.processMessage2; if (rpc.Operation.IsOneWay) { rpc.RequestContext.Reply(null); rpc.OperationContext.RequestContext = null; } else {...} if (this.concurrency.IsConcurrent(ref rpc)) { rpc.Channel.IncrementActivity(); rpc.SuccessfullyIncrementedActivity = true; } if (this.authenticationBehavior != null) { this.authenticationBehavior.Authenticate(ref rpc); } if (this.authorizationBehavior != null) { this.authorizationBehavior.Authorize(ref rpc); } this.instance.EnsureInstanceContext(ref rpc); this.TransferChannelFromPendingList(ref rpc); this.AcquireDynamicInstanceContext(ref rpc); if (!rpc.IsPaused) { this.ProcessMessage2(ref rpc); } } void ProcessMessage2(ref MessageRpc rpc) { rpc.NextProcessor = this.processMessage3; this.AfterReceiveRequest(ref rpc); if (!this.ignoreTransactionFlow) { // Transactions need to have the context in the message rpc.TransactionMessageProperty = TransactionMessageProperty.TryGet(rpc.Request); } this.concurrency.LockInstance(ref rpc); if (!rpc.IsPaused) { this.ProcessMessage3(ref rpc); } else if (this.isOnServer && DiagnosticUtility.ShouldTraceInformation && !this.didTraceProcessMessage2) { this.didTraceProcessMessage2 = true; TraceUtility.TraceEvent( TraceEventType.Information, TraceCode.MessageProcessingPaused, SR.GetString(SR.TraceCodeProcessMessage2Paused, rpc.Channel.DispatchRuntime.EndpointDispatcher.ContractName, rpc.Channel.DispatchRuntime.EndpointDispatcher.EndpointAddress)); } }
当所有流程都走完后,消息打算发送到网络上时,就会在 ProcessMessage8 中调用 PrepareReply,这其中便会调用 BeforeSendReply 在返回前对消息进行检查。
void ProcessMessage8(ref MessageRpc rpc) { rpc.NextProcessor = this.processMessage9; try { this.error.ProvideMessageFault(ref rpc); } catch (Exception e) { if (Fx.IsFatal(e)) { throw; } this.error.HandleError(e); } this.PrepareReply(ref rpc); if (rpc.CanSendReply) { rpc.ReplyTimeoutHelper = new TimeoutHelper(rpc.Channel.OperationTimeout); if (this.sendAsynchronously) { this.BeginFinalizeCorrelation(ref rpc); } else { this.FinalizeCorrelation(ref rpc); } } if (!rpc.IsPaused) { this.ProcessMessage9(ref rpc); } } void PrepareReply(ref MessageRpc rpc) { RequestContext context = rpc.OperationContext.RequestContext; Exception exception = null; bool thereIsAnUnhandledException = false; if (!rpc.Operation.IsOneWay) { if (DiagnosticUtility.ShouldTraceWarning) { // If a service both returns null and sets RequestContext null, that // means they handled it (either by calling Close or Reply manually). // These traces catch accidents, where you accidentally return null, // or you accidentally close the context so we can't return your message. if ((rpc.Reply == null) && (context != null)) { TraceUtility.TraceEvent(System.Diagnostics.TraceEventType.Warning, TraceCode.ServiceOperationMissingReply, SR.GetString(SR.TraceCodeServiceOperationMissingReply, rpc.Operation.Name ?? String.Empty), null, null); } else if ((context == null) && (rpc.Reply != null)) { TraceUtility.TraceEvent(System.Diagnostics.TraceEventType.Warning, TraceCode.ServiceOperationMissingReplyContext, SR.GetString(SR.TraceCodeServiceOperationMissingReplyContext, rpc.Operation.Name ?? String.Empty), null, null); } } if ((context != null) && (rpc.Reply != null)) { try { rpc.CanSendReply = PrepareAndAddressReply(ref rpc); } catch (Exception e) { if (Fx.IsFatal(e)) { throw; } thereIsAnUnhandledException = (!this.error.HandleError(e)) || thereIsAnUnhandledException; exception = e; } } } this.BeforeSendReply(ref rpc, ref exception, ref thereIsAnUnhandledException); if (rpc.Operation.IsOneWay) { rpc.CanSendReply = false; } if (!rpc.Operation.IsOneWay && (context != null) && (rpc.Reply != null)) { if (exception != null) { // We don't call ProvideFault again, since we have already passed the // point where SFx addresses the reply, and it is reasonable for // ProvideFault to expect that SFx will address the reply. Instead // we always just do 'internal server error' processing. rpc.Error = exception; this.error.ProvideOnlyFaultOfLastResort(ref rpc); try { rpc.CanSendReply = PrepareAndAddressReply(ref rpc); } catch (Exception e) { if (Fx.IsFatal(e)) { throw; } this.error.HandleError(e); } } } else if ((exception != null) && thereIsAnUnhandledException) { rpc.Abort(); } }
Message 对象
在继续深入介绍 MessageInspector 之前,需要对 Message 对象有一个简单的认识。
WCF 是一个消息处理的框架,其中主要涉及到两种对象:消息实体 和 服务对象。前者,是整个 WCF 的基础对象,所有的消息调用最终都会表现为 Message 的形式(基于 XML),但是开发人员一般很少需要直接接触。而后者与传统的面向对象的编程模型类似,即通过我们熟悉的 ServiceContract、OperationContract、DataContract 等来描述。
Message 对象主要包含两个部分:Header 和 Body。前者一般指 Soap Header 部分,用于随消息体携带一些额外的信息,比如 To、Action、From 等。而后者则携带与业务密切相关的数据。
除此之外,Message 对象还有一个名叫 Properties 的属性,与 Header 类似,Property 也用于随 Message 携带一些额外的数据,但是 Properties 不会被传输到网络上,它只会在接收端或者发送端内部使用。
处理 Message 这两部分的方式并不相同,Header 会被缓存起来,任何时间都可以去读取,且不限次数。而 Body 部分是个一次性的对象(也有可能是个流对象),它内部有一个状态,初始为 Created,当发生了读取、写入、拷贝 和 关闭后,状态就会相应的变成 Read、Written、Copied 和 Closed。而状态一旦不是 Created,就不能再对 Message 进行上述 4 种操作。
MessageInspector 介绍
MessageInspector 分为 DispatchMessageInspector 和 ClientMessageInspector,前者用于服务端,后者用于客户端(本文只介绍前者)。
public interface IDispatchMessageInspector { object AfterReceiveRequest(ref Message request, IClientChannel channel, InstanceContext instanceContext); void BeforeSendReply(ref Message reply, object correlationState); } public interface IClientMessageInspector { void AfterReceiveReply(ref Message reply, object correlationState); object BeforeSendRequest(ref Message request, IClientChannel channel); }
AfterReceiveRequest 会在消息抵达 WCF 运行时,但还未分发到具体的 Operation 前被调用。
BeforeSendReply 会在 Operation 处理完消息后,编码成一个 Message 对象,但还未传输到网络之前被调用。
从上一节中得知 Message 对象是个一次性的对象,如果在 Inspector 中对传入的 request 对象进行了读取、写入、拷贝 和 关闭中的任何一种,都会导致 WCF 运行时的中断,这也是为什么 request 对象是通过 ref 关键字进行引用传递的原因。
而 AfterReceiveRequest 所返回的对象会被作为 correlationState 传给 BeforeSendReply。
如何扩展 MessageInspector
与其它扩展一样,首先实现 IDispatchMessageInspector
public class MySchemaValidationInspector : IDispatchMessageInspector { string _schemaLocation; public MyMessageInspector(string schemaLocation) { _schemaLocation = schemaLocation; } public object AfterReceiveRequest(ref Message request, IClientChannel channel, InstanceContext instanceContext) { MessageBuffer bufferredMsg = request.CreateBufferedCopy(int.MaxValue); string value = bufferredMsg.CreateMessage().GetReaderAtBodyContents().ReadOuterXml(); Validate(value); request = bufferredMsg.CreateMessage(); return null; } bool Validate(string message) { XmlSchemaSet schemas = new XmlSchemaSet(); XmlSchema schema = XmlSchema.Read(new XmlTextReader(_schemaLocation), null); schemas.Add(schema); bool success = true; XDocument.Parse(message).Validate(schemas, (o, e) => { success = false; }); return success; } public void BeforeSendReply(ref Message reply, object correlationState) { //TODO } }
上面演示了如何实现一个 schema 验证的 Inspector,通过使用 CreateBufferedCopy 创建一个复本,此时 request 的状态为 "Copied",无法再进行其它操作。因此,想要进行其它操作,可以使用 CreateMessage 方法再重新创建一个全新的 message。
接下去就是通过自定义 Behavior 的方式把上述的 Insepctor 加入到 WCF 运行时中,这部分代码请参考《WCF 扩展之我见: Behaviors》