WCF 扩展之我见: MessageInspector

本系列索引,请见《开篇


这次介绍的 MessageInsepctor 是 WCF 可扩展性中用得最多的一个扩展了,MessageInspector 字面意思 “消息检查员”,可见通过使用这个扩展,可以对消息进行查看,那它到底有什么作用呢?

作 用

先来看看 MSDN 的说明:

Defines the methods that enable custom inspection or modification of inbound and outbound application messages in service applications.

-- 来自《IDispatchMessageInspector Interface

可以用来对请求和响应的消息进行自定义检查和修改。

如果大家阅读过《消息处理流程》,应该会知道 DispatchMessageInspector 被调用的时机正好为于消息抵达具体的 operation 前,或者消息刚刚离开 operation,但还没有传输到网络上。在这两个阶段,可以做很多的事情,比如:

- 监控消息:记录消息抵达的时间和离开的时间,记录消息的内容

- 检查消息:在消息抵达或离开前先对消息进行验证,从而提早发现消息中的问题(比如 XSD 验证)

- 修改消息:对消息正文进行修改,甚至完全替换成一个新的消息


执行时机

选中 Operation,且 Message 经过授权验证后,就会在 ProcessMessage2 中调用 AfterRecieveRequest 进行消息检查 :

internal void ProcessMessage11(ref MessageRpc rpc)
{
    rpc.NextProcessor = this.processMessage2;
 
    if (rpc.Operation.IsOneWay)
    {
        rpc.RequestContext.Reply(null);
        rpc.OperationContext.RequestContext = null;
    }
    else    
    {...}
        
 
    if (this.concurrency.IsConcurrent(ref rpc))
    {
        rpc.Channel.IncrementActivity();
        rpc.SuccessfullyIncrementedActivity = true;
    }
 
    if (this.authenticationBehavior != null)
    {
        this.authenticationBehavior.Authenticate(ref rpc);
    }
 
    if (this.authorizationBehavior != null)
    {
        this.authorizationBehavior.Authorize(ref rpc);
    }
 
    this.instance.EnsureInstanceContext(ref rpc);
    this.TransferChannelFromPendingList(ref rpc);
 
    this.AcquireDynamicInstanceContext(ref rpc);
 
    if (!rpc.IsPaused)
    {
        this.ProcessMessage2(ref rpc);
    }
}

void ProcessMessage2(ref MessageRpc rpc)
{
    rpc.NextProcessor = this.processMessage3;
 
    this.AfterReceiveRequest(ref rpc);
 
    if (!this.ignoreTransactionFlow)
    {
        // Transactions need to have the context in the message        
        rpc.TransactionMessageProperty = TransactionMessageProperty.TryGet(rpc.Request);
    }
 
    this.concurrency.LockInstance(ref rpc);
 
    if (!rpc.IsPaused)
    {
        this.ProcessMessage3(ref rpc);
    }
    else if (this.isOnServer && DiagnosticUtility.ShouldTraceInformation && !this.didTraceProcessMessage2)
    {
        this.didTraceProcessMessage2 = true;
 
        TraceUtility.TraceEvent(
            TraceEventType.Information,
            TraceCode.MessageProcessingPaused,
            SR.GetString(SR.TraceCodeProcessMessage2Paused,
            rpc.Channel.DispatchRuntime.EndpointDispatcher.ContractName,
            rpc.Channel.DispatchRuntime.EndpointDispatcher.EndpointAddress));
    }
}


当所有流程都走完后,消息打算发送到网络上时,就会在 ProcessMessage8 中调用 PrepareReply,这其中便会调用 BeforeSendReply 在返回前对消息进行检查。

void ProcessMessage8(ref MessageRpc rpc)
{
    rpc.NextProcessor = this.processMessage9;
 
    try    
    {
        this.error.ProvideMessageFault(ref rpc);
    }
    catch (Exception e)
    {
        if (Fx.IsFatal(e))
        {
            throw;
        }
 
        this.error.HandleError(e);
    }
 
    this.PrepareReply(ref rpc);
 
    if (rpc.CanSendReply)
    {
        rpc.ReplyTimeoutHelper = new TimeoutHelper(rpc.Channel.OperationTimeout);
 
        if (this.sendAsynchronously)
        {
            this.BeginFinalizeCorrelation(ref rpc);
        }
        else        {
            this.FinalizeCorrelation(ref rpc);
        }
    }
 
    if (!rpc.IsPaused)
    {
        this.ProcessMessage9(ref rpc);
    }
}


void PrepareReply(ref MessageRpc rpc)
{
    RequestContext context = rpc.OperationContext.RequestContext;
    Exception exception = null;
    bool thereIsAnUnhandledException = false;
 
    if (!rpc.Operation.IsOneWay)
    {
        if (DiagnosticUtility.ShouldTraceWarning)
        {
            // If a service both returns null and sets RequestContext null, that            
            // means they handled it (either by calling Close or Reply manually).            
            // These traces catch accidents, where you accidentally return null,            
            // or you accidentally close the context so we can't return your message.            
            if ((rpc.Reply == null) && (context != null))
            {
                TraceUtility.TraceEvent(System.Diagnostics.TraceEventType.Warning,
                    TraceCode.ServiceOperationMissingReply,
                    SR.GetString(SR.TraceCodeServiceOperationMissingReply, rpc.Operation.Name ?? String.Empty),
                    null, null);
            }
            else if ((context == null) && (rpc.Reply != null))
            {
                TraceUtility.TraceEvent(System.Diagnostics.TraceEventType.Warning,
                    TraceCode.ServiceOperationMissingReplyContext,
                    SR.GetString(SR.TraceCodeServiceOperationMissingReplyContext, rpc.Operation.Name ?? String.Empty),
                    null, null);
            }
        }
 
        if ((context != null) && (rpc.Reply != null))
        {
            try            
            {
                rpc.CanSendReply = PrepareAndAddressReply(ref rpc);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
                thereIsAnUnhandledException = (!this.error.HandleError(e)) || thereIsAnUnhandledException;
                exception = e;
            }
        }
    }
 
    this.BeforeSendReply(ref rpc, ref exception, ref thereIsAnUnhandledException);
 
    if (rpc.Operation.IsOneWay)
    {
        rpc.CanSendReply = false;
    }
 
    if (!rpc.Operation.IsOneWay && (context != null) && (rpc.Reply != null))
    {
        if (exception != null)
        {
            // We don't call ProvideFault again, since we have already passed the            
            // point where SFx addresses the reply, and it is reasonable for            
            // ProvideFault to expect that SFx will address the reply.  Instead            
            // we always just do 'internal server error' processing.            
            rpc.Error = exception;
            this.error.ProvideOnlyFaultOfLastResort(ref rpc);
 
            try            
            {
                rpc.CanSendReply = PrepareAndAddressReply(ref rpc);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
                this.error.HandleError(e);
            }
        }
    }
    else if ((exception != null) && thereIsAnUnhandledException)
    {
        rpc.Abort();
    }
}


Message 对象

在继续深入介绍 MessageInspector 之前,需要对 Message 对象有一个简单的认识。

WCF 是一个消息处理的框架,其中主要涉及到两种对象:消息实体 和 服务对象。前者,是整个 WCF 的基础对象,所有的消息调用最终都会表现为 Message 的形式(基于 XML),但是开发人员一般很少需要直接接触。而后者与传统的面向对象的编程模型类似,即通过我们熟悉的 ServiceContract、OperationContract、DataContract 等来描述。


Message 对象主要包含两个部分:Header 和 Body。前者一般指 Soap Header 部分,用于随消息体携带一些额外的信息,比如 To、Action、From 等。而后者则携带与业务密切相关的数据。

除此之外,Message 对象还有一个名叫 Properties 的属性,与 Header 类似,Property 也用于随 Message 携带一些额外的数据,但是 Properties 不会被传输到网络上,它只会在接收端或者发送端内部使用。


处理 Message 这两部分的方式并不相同,Header 会被缓存起来,任何时间都可以去读取,且不限次数。而 Body 部分是个一次性的对象(也有可能是个流对象),它内部有一个状态,初始为 Created,当发生了读取、写入、拷贝 和 关闭后,状态就会相应的变成 Read、Written、Copied 和 Closed。而状态一旦不是 Created,就不能再对 Message 进行上述 4 种操作。


MessageInspector 介绍

MessageInspector 分为 DispatchMessageInspector 和 ClientMessageInspector,前者用于服务端,后者用于客户端(本文只介绍前者)。

public interface IDispatchMessageInspector
{
    object AfterReceiveRequest(ref Message request, IClientChannel channel, InstanceContext instanceContext);
    void BeforeSendReply(ref Message reply, object correlationState);
}
 
public interface IClientMessageInspector
{
    void AfterReceiveReply(ref Message reply, object correlationState);
    object BeforeSendRequest(ref Message request, IClientChannel channel);
}


AfterReceiveRequest 会在消息抵达 WCF 运行时,但还未分发到具体的 Operation 前被调用。

BeforeSendReply 会在 Operation 处理完消息后,编码成一个 Message 对象,但还未传输到网络之前被调用。


从上一节中得知 Message 对象是个一次性的对象,如果在 Inspector 中对传入的 request 对象进行了读取、写入、拷贝 和 关闭中的任何一种,都会导致 WCF 运行时的中断,这也是为什么 request 对象是通过 ref 关键字进行引用传递的原因。

而 AfterReceiveRequest 所返回的对象会被作为 correlationState 传给 BeforeSendReply。


如何扩展 MessageInspector

与其它扩展一样,首先实现 IDispatchMessageInspector

public class MySchemaValidationInspector : IDispatchMessageInspector
{
    string _schemaLocation;
 
    public MyMessageInspector(string schemaLocation)
    {
        _schemaLocation = schemaLocation;
    }
 
    public object AfterReceiveRequest(ref Message request, IClientChannel channel, InstanceContext instanceContext)
    {
        MessageBuffer bufferredMsg = request.CreateBufferedCopy(int.MaxValue);
        string value = bufferredMsg.CreateMessage().GetReaderAtBodyContents().ReadOuterXml();
 
        Validate(value);
 
        request = bufferredMsg.CreateMessage();
        return null;
    }
 
    bool Validate(string message)
    {
        XmlSchemaSet schemas = new XmlSchemaSet();
        XmlSchema schema = XmlSchema.Read(new XmlTextReader(_schemaLocation), null);
        schemas.Add(schema);
 
        bool success = true;
        XDocument.Parse(message).Validate(schemas, (o, e) =>
        {
            success = false;
        });
 
        return success;
    }
 
    public void BeforeSendReply(ref Message reply, object correlationState)
    {
        //TODO    
    }
}

上面演示了如何实现一个 schema 验证的 Inspector,通过使用 CreateBufferedCopy 创建一个复本,此时 request 的状态为 "Copied",无法再进行其它操作。因此,想要进行其它操作,可以使用 CreateMessage 方法再重新创建一个全新的 message。


接下去就是通过自定义 Behavior 的方式把上述的 Insepctor 加入到 WCF 运行时中,这部分代码请参考《WCF 扩展之我见: Behaviors

文章索引

[隐 藏]

本站采用知识共享署名 3.0 中国大陆许可协议进行许可。 ©2014 Charley Box | 关于本站 | 浙ICP备13014059号