本文共 2552 字,大约阅读时间需要 8 分钟。
WBThrottle 是为让filestore来限流。throttle的核心函数如下:void WBThrottle::throttle(){ Mutex::Locker l(lock); while (!stopping && need_flush()) cond.Wait(lock);}可以看出wbthrottle 主要是通过cond_wait 来让出cpu从而限制filestore写入的速度其等待的条件有两个!stopping && need_flush() bool need_flush() const { if (cur_ios < io_limits.second && pending_wbs.size() < fd_limits.second && cur_size < size_limits.second) return false; else return true; }从need_flush 中可以知道主要从三个方面限流分别是io_limit表示为刷新的io数,size_limit 表示为刷新的字节数,fd_limit 表示为刷新的对象个数由于核心是在cond.Wait,那我们看看什么时候时候触发cond.Signal();我们知道WBThrottle 是一个thread的子类,其入口函数是entryvoid *WBThrottle::entry(){ Mutex::Locker l(lock); boost::tuplewb; #得到wb,然后根据wb中的值更新cur_ios/cur_size while (get_next_should_flush(&wb)) { clearing = wb.get<0>(); cur_ios -= wb.get<2>().ios; cur_size -= wb.get<2>().size; lock.Unlock();#if defined(HAVE_FDATASYNC) ::fdatasync(**wb.get<1>());#else ::fsync(**wb.get<1>());#endif#ifdef HAVE_POSIX_FADVISE if (cct->_conf->filestore_fadvise && wb.get<2>().nocache) { int fa_r = posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED); assert(fa_r == 0); }#endif lock.Lock(); clearing = ghobject_t(); #由于wb已经有更新,触发throttle看是否要限流 cond.Signal(); wb = boost::tuple (); } return 0;}bool WBThrottle::get_next_should_flush( boost::tuple *next){ assert(lock.is_locked()); assert(next); while (!stopping && (!beyond_limit() || pending_wbs.empty())) cond.Wait(lock); if (stopping) return false; assert(!pending_wbs.empty()); ghobject_t obj(pop_object()); ceph::unordered_map >::iterator i = pending_wbs.find(obj); #从pending_wbs 中拿到wb *next = boost::make_tuple(obj, i->second.second, i->second.first); pending_wbs.erase(i); return true;}那我们 看看pending_wbs中是如何插入数据的void WBThrottle::queue_wb( FDRef fd, const ghobject_t &hoid, uint64_t offset, uint64_t len, bool nocache){ Mutex::Locker l(lock); ceph::unordered_map >::iterator wbiter = pending_wbs.find(hoid); #还没有记录过,则新建一个wb,然后插入到pending_wbs if (wbiter == pending_wbs.end()) { wbiter = pending_wbs.insert( make_pair(hoid, make_pair( PendingWB(), fd))).first; logger->inc(l_wbthrottle_inodes_dirtied); } else { #已经有wb的话,则先删除这个wb remove_object(hoid); } #更新io 和字节数 cur_ios++; logger->inc(l_wbthrottle_ios_dirtied); cur_size += len; logger->inc(l_wbthrottle_bytes_dirtied, len); wbiter->second.first.add(nocache, len, 1); insert_object(hoid); #如果已经超过limit了,则唤醒throttle来限流 if (beyond_limit()) cond.Signal();}
转载地址:http://jjnmi.baihongyu.com/