WCF 扩展之我见: 消息处理流程

本系列索引,请见《开篇

最近因为生活和工作的原因,花在这上面的时间少了许多。

上一篇中简单介绍了与 WCF 扩展相关的一些知识点,本文将介绍从启动 WCF host 到接收到第一条消息为止的整个流程,以便于大家能在之后的阅读中能更清楚 WCF 各扩展点是如何在整个 WCF 生命周期中被使用的。

在开始详细介绍前,先粘一张图(图片太大,建议大家另存为后观看):

图1 启动 Host 到接收完第一条消息的流程(下载


上图是本人通过不断调试并结合 WCF 源码而来,基本描述了 WCF 生命周期中的各个主要环节。本文的后续部分即是对 WCF 中的重要步骤的解释。


初始化所有 Description

不管通过哪种方式对 ServiceHost 进行服务寄宿(控制台,IIS,Windows 服务等),基本的流程都是一样的。最开始便是对 ServiceHost 进行实例化。(即,new ServiceHost()。该步骤并不包含在上图中

实例化时所要做的主要事情便是实例化各种 Description (描述),而构成 “描述” 的主要部分则是耳熟能详的 Behavior。根据 Behavior 被添加到 WCF 中的方式不同,又采用了不同的方式对这些 Behavior 进行加载并实例化。

protected void InitializeDescription(UriSchemeKeyedCollection baseAddresses)
{
    foreach (Uri baseAddress in baseAddresses)
    {
        this.baseAddresses.Add(baseAddress);
    }
    IDictionary<string, ContractDescription> implementedContracts = null;
    ServiceDescription description = CreateDescription(out implementedContracts);
    this.description = description;
    this.implementedContracts = implementedContracts;
  
    //从配置文件中进行读取
    ApplyConfiguration();
    this.initializeDescriptionHasFinished = true;
}


Behavior 在 WCF 框架内部可以通过多种方式进行自定义:1,编程方式:通过 Service/Contract/Endpoint/Opeartion 提供的 Behavior 属性进行添加;2,利用传统的 Attribute 方式;3,通过配置文件进行配置。


初始化 ServiceHost 运行时环境

实例化完成后,下一部就会调用 ServiceHost.Open 方法开启服务宿主。在这个开启的过程中涉及到很多步骤,首先便是对服务宿主运行时环境的初始化。

在该步骤中,首先会调用所有 Behavior.Validate 方法对当前的服务进行验证以确定该 Behavior 所需要的环境是正确的,比如绑定是否正确等。

验证成功后,会根据 ListenUri 对 Endpoint 进行分组(一个 ListenUri 上可以有一个或多个 Endpoint)。

然后遍历每个 ListenUri,并创建对应的 ChannelListener 和 ChannelDispatcher。在 WCF 中,Channel 以 Stack 的形式组合在一起,一个 Channel 对应一个 ChannelListener (ChannelFactory) ,一个 ChannelListener 对应一个 ChannelDispatcher。当 Build 最外层的 ChannelListener 的时候,它会逐层 Build,直到最底层的传输层 Channel。

紧接着遍历当前 ListenUri 下的所有 Endpoint,并构建 EndpointDispatcher。而在构建 EndpointDispatcher 的时候,会同时构建 DispatchRuntime 及该 Endpoint 下所有的 DispatchOperation。

最后调用所有 Behavior.ApplyDispatchBehavior。

图2 初始化 ServiceHost 运行时环境


ListenUri 是当前文件所在的实际物理地址,而 Endpoint address 可以理解为给用户看的逻辑地址。一般情况下,这两个地址相同,但也允许一个物理地址对应多个逻辑地址的情况。


启动监听

当 ServiceHost 运行时环境准备完成后,便会遍历所有的 ChannelDispatcher,执行它们的 Open 方法。而该 Open 方法首先便于调用其下的 ChannelListener.Open。该 Open 方法最终会建立一个 Socket 并开始在指定的 IP 和端口位置进行监听。

图3 启动监听


ChannelPump

启动监听后,会调用所有 ChannelListener.BeginAcceptChannel 方法等待 Channel 的建立(如果是无连接协议,则该处会立即返回,否则会一直等待,直到第一个连接的到来)。当连接到来时,EndAcceptChannel 会被调用,在此过程中 Channel 将被创建,而后 Channel.Open 被执行。Channel.Open 将会把对消息处理的方式挂接到消息处理流程中,从而当消息在经过 Channel 时所会采取不同的措施对消息进行处理,比如 Transport Channel 会把二进制转换化 message。


MessagePump

当消息经过所有的 Channel 处理完成后,一串二进制数据已经变成了我们所需要的 Message。这个时候 MessagePump 需要做的就是把这个 Message 转换成对具体方法的调用。这个过程就是之后我们对 WCF 进行扩展时首要关注的区域了 (源代码中所对应的核心方法就是 ChannelHandler.HandlerRequest)。

根据 MessageFilter 对携带的地址进行过滤,直到找到匹配的 EndpointDispatcher。

根据 InstanceContextProvider 来获取 InstanceContext ( WCF 提供了三种 Provider: PerCall, PerSession, Single)。

根据 Message 中的 Action 信息找到对应的 DispatchOperation (使用 OperationSelector)。

创建并初始化 OperationContext (包含 IncomingMessage)。

调用 AuthenticationBehavior 和 AuthorizationBehavior 对 Message 进行授权及验证。

调用 MessageInspector.AfterReceiveRequest 对消息进行检查或修改。

Transaction 相关操作。

调用 InstanceProvider 创建 Instance 实例。

初始化调用上下文 (CallContextInitializer)。

把 Message 中携带的参数信息反序列化成 parameter。

调用 ParemterInspector.BeforeCall 对参数进行检查或修改。

使用 OperationInvoker 来发起对 Method 的调用。

调用 ParameterInspector.AfterCall 对返回的参数进行检查或修改。

对返回的参数进行序列化。

调用 MessageInspector.BeforeSendReply 对返回的 Message 进行检查或修改。

图4 MessagePump


作为程序猿,不结合源代码来看一下总觉得虚:

bool HandleRequest(RequestContext request, OperationContext currentOperationContext)
{
    if (request == null)
    {
        // channel EOF, stop receiving        
        return false;
    }
 
    ServiceModelActivity activity = DiagnosticUtility.ShouldUseActivity ? TraceUtility.ExtractActivity(request.RequestMessage) : null;
    using (ServiceModelActivity.BoundOperation(activity))
    {
        if (this.HandleRequestAsReply(request))
        {
            this.ReleasePump();
            return true;
        }
 
        if (this.isChannelTerminated)
        {
            this.ReleasePump();
            this.ReplyChannelTerminated(request);
            return true;
        }
 
        if (this.requestInfo.RequestContext != null)
        {
            Fx.Assert("ChannelHandler.HandleRequest: this.requestInfo.RequestContext != null");
        }
 
        this.requestInfo.RequestContext = request;
 
        if (!this.TryAcquireCallThrottle(request))
        {
            // this.ThrottleAcquiredForCall will be called to continue            
            return false;
        }
 
        // NOTE: from here on down, ensure that this code is the same as ThrottleAcquiredForCall (see 55460)        
        if (this.requestInfo.ChannelHandlerOwnsCallThrottle)
        {
            Fx.Assert("ChannelHandler.HandleRequest: this.requestInfo.ChannelHandlerOwnsCallThrottle");
        }
        this.requestInfo.ChannelHandlerOwnsCallThrottle = true;
 
        if (!this.TryRetrievingInstanceContext(request))
        {
            //Would have replied and close the request.            
            return true;
        }
 
        this.requestInfo.Channel.CompletedIOOperation();
 
        //Only acquire InstanceContext throttle if one doesnt already exist.        
        if (!this.TryAcquireThrottle(request, (this.requestInfo.ExistingInstanceContext == null)))
        {
            // this.ThrottleAcquired will be called to continue            
            return false;
        }
        if (this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle)
        {
            Fx.Assert("ChannelHandler.HandleRequest: this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle");
        }
        this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = (this.requestInfo.ExistingInstanceContext == null);
 
        if (!this.DispatchAndReleasePump(request, true, currentOperationContext))
        {
            // this.DispatchDone will be called to continue            
            return false;
        }
    }
    return true;
}

上面的 HandleRequest 可以看成是整个 Service Model Layer 的起点:

其中的 TryRetrievingInstanceContext 会通过 MessageFilter 找到对应的 EndpointDispatcher 并创建 InstanceContext,即上述步骤中的1、2。

DispatchAndReleasePump 则会走完剩下的其它步骤,而该方法最后的 Dispatch 方法中最终会调用 MessageRpc 的 Process 方法,然后便开始以类似管道的方式对 Message 进行处理,下述代码中已经省略部分无关逻辑:

bool DispatchAndReleasePump(RequestContext request, bool cleanThread, OperationContext currentOperationContext)
{
    ServiceChannel channel = this.requestInfo.Channel;
    EndpointDispatcher endpoint = this.requestInfo.Endpoint;
    bool releasedPump = false;
 
    try    
    {
        DispatchRuntime dispatchBehavior = this.requestInfo.DispatchRuntime;
 
        if (channel == null || dispatchBehavior == null)
        {
            return true;
        }
        
        // GetOperation         
        DispatchOperationRuntime operation = dispatchBehavior.GetOperation(ref message);
        if (operation == null)
        {
            throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(String.Format(CultureInfo.InvariantCulture, "No DispatchOperationRuntime found to process message.")));
        }
 
        // Init OperationContext        
        bool hasOperationContextBeenSet;
        if (currentOperationContext != null)
        {
            hasOperationContextBeenSet = true;
            currentOperationContext.ReInit(request, message, channel);
        }
        else        
        {
            hasOperationContextBeenSet = false;
            currentOperationContext = new OperationContext(request, message, channel, this.host);
        }
 
        // Init MessageRpc        
        MessageRpc rpc = new MessageRpc(request, message, operation, channel, this.host,
            this, cleanThread, currentOperationContext, this.requestInfo.ExistingInstanceContext, eventTraceActivity);

        rpc.TransactedBatchContext = this.transactedBatchContext;
 
        // passing responsibility for call throttle to MessageRpc        
        // (MessageRpc implicitly owns this throttle once it's created)        
        this.requestInfo.ChannelHandlerOwnsCallThrottle = false;
        
        // explicitly passing responsibility for instance throttle to MessageRpc        
        rpc.MessageRpcOwnsInstanceContextThrottle = this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle;
        this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = false;
 
        // These need to happen before Dispatch but after accessing any ChannelHandler        
        // state, because we go multi-threaded after this until we reacquire pump mutex.        
        this.ReleasePump();
        releasedPump = true;
 
        return operation.Parent.Dispatch(ref rpc, hasOperationContextBeenSet);
    }
    catch (Exception e)
    {...}
}


//operation.Parent.Dispatch
internal bool Dispatch(ref MessageRpc rpc, bool isOperationContextSet)
{
    rpc.ErrorProcessor = this.processMessage8;
    rpc.NextProcessor = this.processMessage1;
    return rpc.Process(isOperationContextSet);
}

processMessage 共有14个,分别用于处理上述的不同步骤及一些异常处理,请查阅 ImmutableDispatchRuntime.cs 来了解每个委托所指向的方法,这里不再赘述。

readonly MessageRpcProcessor processMessage1;
readonly MessageRpcProcessor processMessage11;
readonly MessageRpcProcessor processMessage2;
readonly MessageRpcProcessor processMessage3;
readonly MessageRpcProcessor processMessage31;
readonly MessageRpcProcessor processMessage4;
readonly MessageRpcProcessor processMessage41;
readonly MessageRpcProcessor processMessage5;
readonly MessageRpcProcessor processMessage6;
readonly MessageRpcProcessor processMessage7;
readonly MessageRpcProcessor processMessage8;
readonly MessageRpcProcessor processMessage9;
readonly MessageRpcProcessor processMessageCleanup;
readonly MessageRpcProcessor processMessageCleanupError;


总 结

本文介绍了 WCF 宿主在启动,及完成对第一条消息的接收过程中,所经过的一系列流程。通过此流程,可以比较直观的知道消息在它的整个生命周期中是如何被处理的,是如何从一串进制数据最终转换成对一个方法的调用的。


扩展阅读

Inside WCF Runtime

Data Transfer Architectural Overview

文章索引

[隐 藏]

本站采用知识共享署名 3.0 中国大陆许可协议进行许可。 ©2014 Charley Box | 关于本站 | 浙ICP备13014059号