96SEO 2026-02-19 10:49 0
。

#xff08;PS#xff1a;与其说Communica…WorkFlow源码剖析——Communicator之TCPServer中
上节博客已经详细介绍了workflow的poller的实现这节我们来看看Communicator是如何利用poller的对连接对象生命周期的管理。
PS与其说Communicator利用的是poller其实Communicator使用的是mpoller上节在介绍poller时也提到过mpoller现场帮读者回忆一下mpoller是poller的manager类管理多个poller事件池对外提供的接口负责将各种poller_node负载均衡的分散给不同的poller。
上节在介绍poller时出现了各种回调比如poller-callback()、node-data.accept()、node-data.partial_written()、node-data.create_message()等当时我们总是一笔带过没有去深入分析这些回调会做什么并且每次在IO事件结束都会回调poller-callback()为什么要这样做在poller当中只看到了针对poller_node的malloc函数而没有看到对应的free函数哪里调用了free函数去释放poller_node
同样的注意里还是放在TCPServer上对于SSL和UDP相关的内容直接忽略。
先把TCP给模清楚。
既然谈到对连接对象的管理那Communicator必然有一个数据结构来表示一个连接上下文对象它就是CommConnEntry代码如下
的轮次。
一条连接的服务端和客户端理论上讲seq值是同时递增的并且一定是保持相同的。
int
含义同go-task每次读写完毕或则出错了都会调用该对象的hanle函数CommTarget
连接目的地。
对于客户端该成员是服务器的地址对于服务端该成员是客户端的地址。
CommService
};workflow将客户端、服务端、tcp、udp、ssl的实现都混杂在一个文件当中。
在第一次阅读它的源码时有点双眼摸瞎的感觉。
如果你有足够丰富的网络编程的经验可能还好。
需要注意一点的是该连接上下文在客户端和服务端所使用成员可能是不同的客户端不会使用service成员服务端不会使用seq成员。
一是被挂在服务端的CommService::alive_list上。
可以理解为服务端的http保活池。
二是被挂在客户端的CommTarget::idle_list上。
可以理解为客户端的对同一个ipaddr:port的http连接池。
广义上讲服务端的CommServiceTarget::idle_list也是http连接池。
只是服务端的idle_list上只可能会有一个连接。
随着tcp连接状态的变化state成员所记录的状态也会随之更新。
当ref成员减为零CommConnEntry对象将会被free掉。
状态迁移池——没错类似于事件池状态迁移池也有一个循环它的任务是不断根据IO的结果转换连接上下文的状态并且根据IO的结果去回调必要的处理函数最为代表的是session-handlesession的概念在go-task源码剖析一节中也是存在。
它存在的意义在下一章讲解workflow对TCPServer的时候才适合透露。
我们重点集中在communicator如何管理连接上下文的状态的。
这里其实就引入了一个问题连接上下文为什么存在状态的迁移别急让我一步步道来。
首先是状态池的启动————Communicator::init
(this-create_poller(poller_threads)
(this-create_handler_threads(handler_threads)
0;}mpoller_stop(this-mpoller);mpoller_destroy(this-mpoller);msgqueue_destroy(this-msgqueue);}return
Communicator::init首先会启动mpoller也就是上章我们所讲的事件池。
上节的poller-callback函数以及它的参数poller-context在poller初始化时就是由struct
poller_params提供。
而该结构体的够着在Communicator::create_poller当中是这样被赋值的
Communicator::create_poller(size_t
(size_t)sysconf(_SC_OPEN_MAX),.callback
Communicator::callback,};this-msgqueue
}所以可以看到上章的poller-callback回调会将传进来的poller_result追加到Communicator的状态迁移池的队列当中。
Communicator::init然后会启动状态迁移池。
状态迁移池使用的就是workflow自己造了链式线程池轮子。
特别的是在线程池的每个线程都运行一个routineCommunicator::handler_thread_routine该函数是一个死循环。
在每个线程都分配到一个Communicator::handler_thread_routine后线程池的队列其实就失去了它的意义。
每个Communicator::handler_thread_routine会使用1当中分配的队列。
转到Communicator::handler_thread_routine它的实现如下
Communicator::handler_thread_routine(void
*)msgqueue_get(comm-msgqueue);if
PD_OP_TIMER:comm-handle_sleep_result(res);break;case
PD_OP_READ:comm-handle_read_result(res);break;case
PD_OP_WRITE:comm-handle_write_result(res);break;case
PD_OP_LISTEN:comm-handle_listen_result(res);break;/*
{mpoller_destroy(comm-mpoller);msgqueue_destroy(comm-msgqueue);}
}阅读过上面的代码后我们应该惊喜因为我们看到了free这里我可以自信的回答这个问题上节poller当中只看到了针对poller_node的malloc函数而没有看到对应的free函数哪里调用了free函数去释放poller_node
}最终在状态迁移池启动完毕后结合poller的事件池Communicator最终的系统架构如下图
这三步就是由本节的Communicator执行-----------|V-----------|
______________________________________________V-----------|
从这里开始涉及到的所有函数是poller负责。
-----------|
write当然Communicator会使用mpoller暴露的api对sockfd设置所关心的IO事件。
间接调用了IO系统调用接口。
下面从listen
创建绑定监听三部曲————Communicator::bind
this-nonblock_listen(service);if
}注意到listen套接字被分配了一个Communicator::accept回调上一章介绍poller时每当listen套接字接收到一个客户端的连接都会将IO
socket作为参数回调一下accept函数此处代表Communicator::accept它实际上会为IO
Communicator::nonblock_listen(CommService
service-bind_addr,service-addrlen)
sockfd;}}}close(sockfd);}return
}所以Communicator::bind绑定并启动一个tcpserver的流程是
将监听套接字添加到mpoller事件池当中。
开始接受来自客户端的连接。
接收客户端连接————Communicator::handle_listen_result
了解了前面状态迁移池和bind的流程结合上一章poller的源码分析一旦poller的accept接收到一条连接会回调一下Communicator::accept然后再调用poller-callback并将IOsocket填到res然后res被传回Communicator队列当中。
状态迁移线程会从队列当中取res然后根据res-data.operation会去回调Communicator::handle_listen_result它的实现如下
Communicator::handle_listen_result(struct
Communicator::accept_conn(target,
this-mpoller;res-data.operation
entry-sockfd;res-data.create_message
Communicator::create_request;res-data.context
(this-stop_flag)mpoller_del(res-data.fd,
this-mpoller);break;}__release_conn(entry);}elseclose(target-sockfd);target-decref();break;}
}Communicator::accept_conn函数根据res创建出IOsocket的连接上下文CommConnEntry并且该连接上下文初始的state为CONN_STATE_CONNECTED。
然后将该对象加入到mpoller事件池当中开启对IOsocket的读事件进行监听。
边接收边解析————Communicator::create_request
客户端请求报文解析完毕状态转移————Communicator::handle_read_result
因为这两部分涉及的代码过于庞大详细讲解的话避免不了要贴大量的代码作者表达水平有限。
还是觉得使用图解的方式去呈现比较省事。
所以为避免文章代码比例过高下一小节将会以图画的形式向读者剖析这部分的源码。
服务端监听套接字的绑定就不说了下面使用一张图来讲解从服务端接收客户端连接
接收客户端连接并读取解析客户端发来的报文流程————异步状态机之美
第一步是poller当中的__poller_handle_listen回调函数
服务端在接收到一个客户端连接后首先会为其创建一个CommServiceTarget对象。
socketfd通过回调poller-callback放到队列当中传回给Communicator。
第二步是Communicator的Communicator::handle_listen_result函数
状态变迁池拿到res对像后得知operation为PD_OP_LISTEN的res所以调用Communicator::handle_listen_result函数来处理res。
在Communicator::handle_listen_result函数当中首先会构造一个连接上下文entry它的状态被初始化为CONN_STATE_CONNECTED。
构造一个operation为READ的poller_node。
并且data成员的create_message回调填为Communicator::create_request。
将poller_node加入到mpoller开始对其度事件进行监听。
第三步是poller当中的读事件处理回调__poller_handle_read
__poller_append_message它里面会创建一个poller_message_t对象如果不存在的话一般在一轮请求的最开始会构造一个msg对象。
利用poller_message_t对象对读到的数据进行解析。
这是一个边读边解析的过程中间可能会调用数次。
当msg-append返回值大于0时说明请求报文读并且解析完了。
此时将msg封装在res当中并回调poller-callback。
create_message和append两个回调分别对应Communicator::create_request和Communicator::append_message。
这两个函数核心代码已经在上图③号虚框当中显示读者可以仔细阅读一下。
这里其实涉及到连接上下文entry的两次状态变换。
在create_message时entry-state会变更为CONN_STATE_RECEIVING而在数据解析完毕Communicator::append_message当中的in-append返回大于0进入到下面的if分支又会将entry-state变更为CONN_STATE_SUCCESS。
第四步也是读流程的最后一步Communicator::handle_read_result当中的Communicator::handle_incoming_request函数
状态变迁池拿到res对像后得知operation为PD_OP_READ的res所以调用Communicator::handle_read_result函数来处理res。
因为是服务端所以Communicator::handle_read_result函数会调用Communicator::handle_incoming_request函数。
这里会将session的state设置成CS_STATE_TOREPLY。
CONN_STATE_SUCCESS则将entry挂到target的idle链表上、entry-ref同时entry-state修改成CONN_STATE_IDLE。
session-passive
回调session-handle然后entry-ref–当entry-ref为0时调用__release_conn将连接关闭并free掉entry连接上下文。
[CONN_STATE_IDLE]向客户端发送回复报文————先尽力而为的写然后再异步写。
当服务端需要发送一个回复报文时会调用Communicator::reply接口它的代码如下
Communicator::reply(CommSession
session-in-entry;session-handle(CS_STATE_SUCCESS,
(__sync_sub_and_fetch(entry-ref,
{__release_conn(entry);target-decref();}}
Communicator::reply_reliable(CommSession
-1;pthread_mutex_lock(target-mutex);if
(!list_empty(target-idle_list))
list);list_del(pos);session-out
ENOENT;pthread_mutex_unlock(target-mutex);return
Communicator::send_message(struct
this-send_message_sync(vectors,
}写的设计思路和Muduo的很像muduo源码阅读笔记10、TcpConnection。
这里不过多赘述只讲一下差别。
还是以全面的情况为例子假设现在需要发送一批回复数据并且同步写无法将所有的数据发送完。
那么在同步写一部分我们的数据之后肯定会触发异步写。
而异步写呢就得靠poller层的__poller_handle_write函数。
只要tcp的发送缓存区非满poller_node就会收到通知然后尽力向发送缓存区写一些数据这可能也需要花几轮的功夫去写数据。
在这期间每写一部分数据__poller_handle_write函数就会回调node-data.partial_written从Communicator::send_message_async函数在构造WRITE类型的poller_node时我们可以得知partial_written就是Communicator::partial_written而它的实现如下
Communicator::partial_written(size_t
Communicator::next_timeout(session);mpoller_set_timeout(entry-sockfd,
}在写完部分数据后为什么需要回调一下partial_written呢这里其实就得到了合理的解释既然在规定的写超时时间内我能向发送缓存写一些数据那就说明网没断只是网络状况可能不好。
所以按理来说在规定的时间内发送了部分数据就应该更新一下发送的超时时间避免没有必要的超时。
一旦异步写完成了和__poller_handle_read不同__poller_handle_write会自动将poller_node从epoll上移除然后回调poller-callback。
PS如果你忘了poller的实现建议回顾一下WorkFlow源码剖析——Communicator之TCPServer上
然后同读完成类似在Communicator当中写完成会被Communicator::handle_write_result处理因为是服务断所以会调用Communicator::handle_reply_result。
该函数逻辑如下
Communicator::handle_reply_result(struct
session-keep_alive_timeout();if
{__sync_add_and_fetch(entry-ref,
PD_OP_READ;res-data.create_message
Communicator::create_request;res-data.message
NULL;pthread_mutex_lock(target-mutex);if
以读的方式添加到mpoller当中pthread_mutex_lock(service-mutex);if
entry-state修改成CONN_STATE_KEEPALIVElist_add_tail(entry-list,
CONN_STATE_CLOSING;}pthread_mutex_unlock(service-mutex);}else
出错该释放了__sync_sub_and_fetch(entry-ref,
pthread_mutex_unlock(target-mutex);}if
CS_STATE_SUCCESS;session-handle(state,
(__sync_sub_and_fetch(entry-ref,
{__release_conn(entry);((CommServiceTarget
以读的方式将poller_node加回到mpoller当中继续监听客户端的读请求
将entry-state修改成CONN_STATE_KEEPALIVE并且加到CommService的保活链表当中。
将session的state设置成CS_STATE_SUCCESS再回调session-handle。
所以在经过Communicator::handle_reply_result函数后entry-state状态被修改为CONN_STATE_KEEPALIVE。
然后再处理下一轮客户端请求。
对于session-passive的变化在create_request创建msg请求报文解析器session-passive被置为1在读取并解析完毕请求报文后Communicator::handle_incoming_request函数回将session-passive置为2在调用Communicator::reply向网络发送回复时session-passive会被置为3。
Communicator::send_message_async当中在吧poller_node以WRITE方式加入到mpoller时会走到mpoller_mod分支。
首先一条tcp连接在服务端必定纯在两种状态接收、发送。
而因为追求性能我们不得不采用异步的方式将socketfd设置成非阻塞的。
并且网络传输不管是否阻塞会引来一些问题读不可能一次性读完、写不可能一次性写完所以读的过程当中和写的过程当中都存在一种中间的状态。
所以状态迁移是必然的。
最后在了解了workflow的底层架构之后其实就能感觉到异步编程就是在实现一个状体机的过程。
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback