贝壳找房 | Thrift 中 TNonblockingServer 工作流程解析
本文主要介绍 TNonblockingServer 服务模型,这是thrift框架提供的一种非阻塞式IO服务模型,目前是thrift框架中最好的模型,这也是我们重点介绍的模型。
thrift是Facebook开源的一款开源跨语言的RPC通信框架,主要提供三种服务模型:1)TThreadPoolServer 服务模型,这是线程池服务模型,使用标准的阻塞式IO,预先创建一组线程处理请求;2)TSimpleServer 服务模型,这是简单的单线程服务模型,一般用于测试;3)TNonblockingServer 服务模型,这是thrift框架提供的一种非阻塞式IO服务模型,目前是thrift框架中最好的模型,这也是我们重点介绍的模型。
整个TNonblockingServer可以分为3部分,如下图所示,分别是IO线程,状态同步,以及工作线程,下面我们分别介绍这三部分。
1 IO线程
为了更好的叙述,我们简单介绍一下socket,socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket接口后面,对用户来说,一组简单的接口就是全部,让Socket去组织数据,以符合指定的协议。IO线程在thrift框架可以简单分为两种,一种是只执行读写,另外一种是既执行读写任务,还要负责socket监听,thrift中所有IO线程都保存在IOthread这个数组中, IOthread[0] 可以看成服务的主线程,整个服务的启动也是从 IOthread[0] 开始。
TNonblockingServer::serve() {
if (ioThreads_.empty())
registerEvents(NULL);
// Run the primary (listener) IO thread loop in our main thread; this will
// only return when the server is shutting down.
ioThreads_[0]->run();
// Ensure all threads are finished before exiting serve()
for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
ioThreads_[i]->join();
GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
}
}
IOthread[0]就是我们上述所说的既执行读写任务,还要负责socket监听的线程,除此之外,数组IOthread中其他线程只负责读写。
// Register the server event
event_set(&serverEvent_,
listenSocket_,
EV_READ | EV_PERSIST,
TNonblockingIOThread::listenHandler,
server_);
event_base_set(eventBase_, &serverEvent_);
2 状态转移
当IOthread[0] 监听到一个Socket的accept事件时,会同时间建立一个TConnection 并为其分配一个线程进行处理,并将TConnection::eventHandler和TNonblockingIOThread::notifyHandler注册到iothread的event_base上,用于记录状态转移。
TConnection 中主要有两种方法:
2.1 socket状态转移
void TNonblockingServer::TConnection::workSocket()
socket端主要分为3种状态:
/// Three states for sockets: recv frame size, recv data, and send mode
enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };
- SOCKET_RECV_FRAMING:主要标记接收帧头部。每个帧分为两个部分:帧头部和帧数据。帧头部以字节为单位标明帧数据的长度。由于整个过程是非阻塞的,读取过程可以不连续,每次尽力而为的从socket读取字节,如果头部没达到4byte(thrift-0.10.0版本),表示没有读取结束,可以存起来,下次调用的时候继续读取,直到获取到完整的帧头部,然后调用transition。(这里不会多读数据。如果对方断开连接,读取的长度会是0,此时关闭socket。server规定帧数据部分的长度不得超过256MB,这已经是相当大的一个范围了。如果帧头部指明的长度超过了server预设的最大值,则认为是读出了非法的帧格式,也会关闭socket);
- SOCKET_RECV:接收帧数据。依然是非阻塞,也支持非连续读取,当获得了完整的帧数据,调用transition;
- SOCKET_SEND:发送帧头部和帧数据。因为是非阻塞,所以要尽力而为的发送。
2.2 server状态转移
void TNonblockingServer::TConnection::transition()
server端一共有6种状态:
/**
* Five states for the nonblocking server:
* 1) initialize
* 2) read 4 byte frame size
* 3) read frame of data
* 4) send back data (if any)
* 5) force immediate connection close
*/
enum TAppState {
APP_INIT,
APP_READ_FRAME_SIZE,
APP_READ_REQUEST,
APP_WAIT_TASK,
APP_SEND_RESULT,
APP_CLOSE_CONNECTION
};
- APP_INIT:server最开始的状态。设置读写缓冲区等等基本工作,需要设置等待读的标记位,并开始注册读时间。
// Clear write buffer variables
writeBuffer_ = NULL;
writeBufferPos_ = 0;
writeBufferSize_ = 0;
// Into read4 state we go
socketState_ = SOCKET_RECV_FRAMING;
appState_ = APP_READ_FRAME_SIZE;
readBufferPos_ = 0;
// Register read event
setRead();
- APP_READ_FRAME_SIZE:server已经读到了帧长度。调整读缓冲区大小,以适应帧数据接收,如果出现残缺,直接放弃。
readWant_ += 4;
// We just read the request length
// Double the buffer size until it is big enough
if (readWant_ > readBufferSize_) {
if (readBufferSize_ == 0) {
readBufferSize_ = 1;
}
uint32_t newSize = readBufferSize_;
while (readWant_ > newSize) {
newSize *= 2;
}
uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
if (newBuffer == NULL) {
// nothing else to be done...
throw std::bad_alloc();
}
readBuffer_ = newBuffer;
readBufferSize_ = newSize;
}
readBufferPos_ = 4;
*((uint32_t*)readBuffer_) = htonl(readWant_ - 4);
// Move into read request state
socketState_ = SOCKET_RECV;
appState_ = APP_READ_REQUEST;
- APP_READ_REQUEST:server已经获得了完整的帧。将输入缓冲区封装为inputTransport结构,并且重置输入缓冲以待将来使用。此时,满足处理请求的条件了。如果后台不是线程池模式,那么立即执行。否则,构造一个Task结构。
// We are done reading the request, package the read buffer into transport
// and get back some data from the dispatch function
if (server_->getHeaderTransport()) {
inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
outputTransport_->resetBuffer();
} else {
// We saved room for the framing size in case header transport needed it,
// but just skip it for the non-header case
inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4);
outputTransport_->resetBuffer();
// Prepend four bytes of blank space to the buffer so we can
// write the frame size there later.
outputTransport_->getWritePtr(4);
outputTransport_->wroteBytes(4);
}
server_->incrementActiveProcessors();
- APP_WAIT_TASK:server已经处理好这个请求,准备回送返回结果。一般的,计算回送帧大小填充到帧首部,然后调用setFlags(EV_WRITE | EV_PERSIST)声明有数据需要写入socket。
// We have now finished processing a task and the result has been written
// into the outputTransport_, so we grab its contents and place them into
// the writeBuffer_ for actual writing by the libevent thread
server_->decrementActiveProcessors();
// Get the result of the operation
outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
// If the function call generated return data, then move into the send
// state and get going
// 4 bytes were reserved for frame size
if (writeBufferSize_ > 4) {
// Move into write state
writeBufferPos_ = 0;
socketState_ = SOCKET_SEND;
// Put the frame size into the write buffer
int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
memcpy(writeBuffer_, &frameSize, 4);
// Socket into write mode
appState_ = APP_SEND_RESULT;
setWrite();
// Try to work the socket immediately
// workSocket();
return;
}
- APP_SEND_RESULT:如果设置了每隔若干请求就重新调整输入输出缓冲区大小再执行。其实对接口比较少的应用来说意义并不大,只是之前一直是缓冲不够的时候变大,在这里把缓冲区收回来,然后就是执行APP_INIT逻辑了。
/ it's now safe to perform buffer size housekeeping.
if (writeBufferSize_ > largestWriteBufferSize_) {
largestWriteBufferSize_ = writeBufferSize_;
}
if (server_->getResizeBufferEveryN() > 0
&& ++callsForResize_ >= server_->getResizeBufferEveryN()) {
checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
server_->getIdleWriteBufferLimit());
callsForResize_ = 0;
}
- APP_CLOSE_CONNECTION:关闭连接,减少活跃Processor计数,不在基本状态机里面。
server_->decrementActiveProcessors();
close();
return;
这6种状态大致是这样的:
3. 工作线程
thrift 框架在处理task时,设置了一个ThreadManager ,这个ThreadManager的主要任务是线程分配和状态同步,IOthread 将待处理的任务统一放到任务队列,ThreadManager 会去监测这个任务队列是否为空,不为空时,会从队列pop任务,分配处理线程。
//标记manager状态(当前工作线程数达到小于最大数或者manager_工作中&&任务队列不为空)
bool isActive() const {
return (manager_->workerCount_ <= manager_->workerMaxCount_)
|| (manager_->state_ == JOINING && !manager_->tasks_.empty());
}
while (active) {
//active=true 只有有空余线程时才会计入这个状态,这个时候框架运行是非阻塞模式
active = isActive();
while (active && manager_->tasks_.empty()) {
//可用线程数加1
manager_->idleCount_++;
//任务队列为空时进入等待
manager_->monitor_.wait();
//manager_离开等待时,必须有空余线程
active = isActive();
//一旦manager_离开等待状态,可用线程数减1
manager_->idleCount_--;
}
shared_ptr<ThreadManager::Task> task;
if (active) {
if (!manager_->tasks_.empty()) {
//从队列中拿取任务
task = manager_->tasks_.front();
manager_->tasks_.pop_front();
if (task->state_ == ThreadManager::Task::WAITING) {
// If the state is changed to anything other than EXECUTING or TIMEDOUT here
// then the execution loop needs to be changed below.
//任务执行时间超时设置
task->state_ =
(task->getExpireTime() && task->getExpireTime() < Util::currentTime()) ?
ThreadManager::Task::TIMEDOUT :
ThreadManager::Task::EXECUTING;
}
}
if (manager_->pendingTaskCountMax_ != 0
&& manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
manager_->maxMonitor_.notify();
}
}
if (task) {
if (task->state_ == ThreadManager::Task::EXECUTING) {
//释放锁,任务执行过程中,manager不需要同步信息了
manager_->mutex_.unlock();
try {
//开始执行任务(也就是用户的处理逻辑)
task->run();
} catch (const std::exception& e) {
GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what());
} catch (...) {
GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
}
// 重新加锁,下次调度线程进入这个过程需要也要同步信息
manager_->mutex_.lock();
} else if (manager_->expireCallback_) {
// The only other state the task could have been in is TIMEDOUT (see above)
manager_->expireCallback_(task->getRunnable());
manager_->expiredCount_++;
}
}
}
manager_->deadWorkers_.insert(this->thread());
if (--manager_->workerCount_ == manager_->workerMaxCount_) {
manager_->workerMonitor_.notify();
}
}
我们可以与evpp(360开源框架)的worker thread机制做下比较。当前evpp处理request方案,是采取轮寻机制,当面临新的请求时,依次将任务加入到各线程处理队列:
evpp与thrift的worker thread处理机制差别:
- evpp:每个线程维护自己的工作队列,当有新任务时,按照顺序加入到线程的任务队列中,如果一个任务处理时间太长,可能整个队列的任务都是超时;
- thrift:所有线程处理一个工作队列,中间会涉及信息同步,由于需要线程见同步信息,所以thrift 锁粒度高于evpp。
参考资料
- https://blog.csdn.net/mumumuwudi/article/details/48480545?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.nonecase&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.nonecase
- http://thrift.apache.org/docs/
作者简介
董志雄,毕业于西安交通大学,现于贝壳找房语言智能与搜索部工作。
好文章收藏
哈哈哈, 有收获就好~,贝壳找房出的博客不错的
评论功能关闭了吗