srs使用的开源c语言网络协程库 state thread 源码分析
state thread是一个开源的c语言网络协程库,它在用户空间实现了协程调度
st最初是由网景(Netscape)公司的MSPR(Netscape Portable Runtime library)项目中剥离出来,后由SGI(Silicon Graphic Inc)和Yahoo!公司(前者是主力)共同开发维护。
2001年发布v1.0以来一直到2009年v1.9稳定版后未再变动
State Threads:回调终结者(必读)
https://blog.csdn.net/caoshangpa/article/details/79565411
st-1.9.tar.gz 是原版, http://state-threads.sourceforge.net/
state-threads-1.9.1.tar.gz 是srs修改版, https://github.com/ossrs/state-threads
st源码编译
tar zxvf st-1.9.tar.gz
cd st-1.9
make linux-debug // make命令可以查看支持的编译选项
obj目录有编译生成的文件st.h, lib*.so,lib*.a
examples目录有几个例子lookupdns,proxy,server
需要的知识点
1 汇编语言(非必需)
2 线程的栈管理(非必需)
3 线程的调度和同步(必须)。线程不同步的测试代码thread.c
4 setjmp/longjmp的使用(必须)。测试代码setjmp.c
5 epoll原理和使用(必须)。测试代码epoll_server.c 和 epoll_client.c
测试代码以及文档下载地址
链接: https://pan.baidu.com/s/1kQz3S1YIt6zUwMKScrnHaQ
提取码: pu9z
分析state_thread源码的目的,是为了正确的使用它
st中thread其实是协程的概念
st_xxx分为 io类 和 延迟类
一些重要的数据结构
_st_vp_t _st_this_vp; virtual processor 虚拟处理器
_st_thread_t *_st_this_thread;
_st_clist_t run_q, io_q, zombie_q, thread_q
_st_thread_t *idle_thread, *sleep_q
代码分析
st库自带的example业务逻辑较为复杂,有兴趣可以看下。
为了简化问题,编写了测试代码st-1.9/examples/st_epoll.c,依据此代码提出问题分析问题。
st_init()做了什么?
_st_idle_thread_start()做了什么?
st_thread_create()做了什么?
st_thread_exit()做了什么?
st_usleep()做了什么?
主业务逻辑(无限循环)协程是如何调度的?
监听的文件描述符是如何调度的?
协程如何正常退出?
1 没有设置终止条件变量(不可以被join)的协程直接return即可退出;
2 设置了终止条件变量(可以被join)的协程退出时,先把自己加入到zombie_q中,然后通知等待的协程,等待的协程退出后,自己在退出。
协程的join(连接)是什么意思?
1 创建协程a的时候 st_thread_create(handle_cycle, NULL, 1, 0) 要设置为1, 表示该协程可以被join
2 协程b代码里要掉用st_thread_join(thread, retvalp),表示我要join到协程a上
3 join的意思是 协程a和协程b 有一定关联行,在协程退出时,要先退出协程b 才能退出协程a
4 st中一个协程只能被另一个协程join,不能被多个协程join
5 可以被join的协程a,在没有其他协程join时,协程a无法正常退出
st里的mutex有什么用?
通常情况下st的多协程是不需要加锁的,但是在有些情况下需要锁来保证原子操作,下面会详细说明。
st_mutex_new(void); 创建锁
st_mutex_destroy(st_mutex_t lock); 等待队列必须为空才能销毁锁
st_mutex_lock(st_mutex_t lock); 第一次掉用能获得锁,以后掉用会加入锁的等待队列中(FIFO)
st_mutex_unlock(st_mutex_t lock); 释放锁并激活等待队列的协程
st_mutex_trylock(st_mutex_t lock); 尝试获得锁不会加入到等待队列
st里的cond有什么用?
通常情况下st的多协程是不需要条件变量的,但是有些情况下需要条件变量来保证协程执行的先后顺序,比如:协程a要先于协程b执行
st_cond_new(void); 创建条件变量
st_cond_destroy(st_cond_t cvar); 等待队列必须为空才能销毁条件变量
st_cond_timedwait(st_cond_t cvar, st_utime_t timeout); 限时等待条件变量,会加入条件变量的等待队列中(FIFO),并加入到sleep_q队列中(可能先于FIFO的顺序被调度到)
st_cond_wait(st_cond_t cvar); 阻塞等待条件变量,会加入条件变量的等待队列中(FIFO)
st_cond_signal(st_cond_t cvar); 唤醒阻塞在条件变量上的一个协程
st_cond_broadcast(st_cond_t cvar); 唤醒阻塞在条件变量上的全部协程
这个图要配合测试代码 st-1.9/examples/st_epoll.c
st中与调度有关的函数
st的setjmp
#define _ST_SWITCH_CONTEXT(_thread) \ 协程切换的两个宏函数之一,停止当前协程并运行其他协程
ST_BEGIN_MACRO \
ST_SWITCH_OUT_CB(_thread); \ 协程切走时调用的函数,一般不管用
if (!MD_SETJMP((_thread)->context)) \ 汇编语言实现 应该跟setjmp()一样 首次掉用返回0
{ \
_st_vp_schedule(); \ 核心调度函数
} \
ST_DEBUG_ITERATE_THREADS(); \
ST_SWITCH_IN_CB(_thread); \ 协程切回时调用的函数,一般不管用
ST_END_MACRO
st的longjmp
#define _ST_RESTORE_CONTEXT(_thread) \ 协程切换的两个宏函数之一,恢复线程运行
ST_BEGIN_MACRO \
_ST_SET_CURRENT_THREAD(_thread); \ 设置全局变量 _st_this_thread = _thread
MD_LONGJMP((_thread)->context, 1); \ 汇编语言实现 应该跟longjmp()一样, 返回值永远为1
ST_END_MACRO
MD_SETJMP的时候,会使用汇编把所有寄存器的信息保留下来,而MD_LONGJMP则会把所有的寄存器信息重新加载出来。两者配合使用的时候,可以完成函数间的跳转。
st的核心调度函数
void _st_vp_schedule(void)
{
_st_thread_t *thread;
printf("in _st_vp_schedule\n");
printf("_st_active_count = %d\n", _st_active_count);
if (_ST_RUNQ.next != &_ST_RUNQ)
{
printf("use runq\n");
/* Pull thread off of the run queue */
thread = _ST_THREAD_PTR(_ST_RUNQ.next);
_ST_DEL_RUNQ(thread);
}
else
{
printf("use idle\n");
/* If there are no threads to run, switch to the idle thread */
thread = _st_this_vp.idle_thread;
}
ST_ASSERT(thread->state == _ST_ST_RUNNABLE);
/* Resume the thread */
thread->state = _ST_ST_RUNNING;
_ST_RESTORE_CONTEXT(thread);
}
st辅助调度函数
void *_st_idle_thread_start(void *arg)
{
printf("i'm in _st_idle_thread_start()\n");
_st_thread_t *me = _ST_CURRENT_THREAD();
while (_st_active_count > 0)
{
/* Idle vp till I/O is ready or the smallest timeout expired */
printf("call _st_epoll_dispatch()\n");
_ST_VP_IDLE(); 处理io类事件
/* Check sleep queue for expired threads */
_st_vp_check_clock(); 处理延时类事件
me->state = _ST_ST_RUNNABLE;
_ST_SWITCH_CONTEXT(me); 从这里恢复运行,然后判断_st_active_count的值
}
/* No more threads */
exit(0); 整个程序退出
/* NOTREACHED */
return NULL;
}
会触发协程切换的函数有哪些?
sched.c:86: _ST_SWITCH_CONTEXT(me); 59 int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
sched.c:234: _ST_SWITCH_CONTEXT(me); 221 void *_st_idle_thread_start(void *arg)
sched.c:261: _ST_SWITCH_CONTEXT(thread); 244 void st_thread_exit(void *retval)
sched.c:276: _ST_SWITCH_CONTEXT(thread); 244 void st_thread_exit(void *retval)
sync.c:131: _ST_SWITCH_CONTEXT(me); 115 int st_usleep(st_utime_t usecs)
sync.c:198: _ST_SWITCH_CONTEXT(me); 180 int st_cond_timedwait(_st_cond_t *cvar, st_utime_t timeout)
sync.c:315: _ST_SWITCH_CONTEXT(me); 290 int st_mutex_lock(_st_mutex_t *lock)
sched.c:134: _ST_RESTORE_CONTEXT(thread); 115 void _st_vp_schedule(void)
st中的interrupt
显示调用void st_thread_interrupt(_st_thread_t *thread)会对协程设置interrupt状态,interrupt状态会中断协程的本次运行(可能是个循环任务),是否导致协程退出,要看协程内部对interrupt返回值的处理。下面以st_usleep()函数为例进行说明。
[ykMac:st-1.9]# grep -nr "_ST_FL_INTERRUPT" *
common.h:311:#define _ST_FL_INTERRUPT 0x08 interrupt的宏定义
sched.c:68: if (me->flags & _ST_FL_INTERRUPT) 59 int st_poll() ,调用函数时,判断是否设置interrupt
sched.c:70: me->flags &= ~_ST_FL_INTERRUPT; 如果设置就退出,退出前对interrupt取反
sched.c:107: if (me->flags & _ST_FL_INTERRUPT) 59 int st_poll(),变为运行协程时,判断是否设置interrupt
sched.c:109: me->flags &= ~_ST_FL_INTERRUPT; 如果设置就退出,退出前对interrupt取反
sched.c:551: thread->flags |= _ST_FL_INTERRUPT; 在545 void st_thread_interrupt()中设置为interrupt
sync.c:119: if (me->flags & _ST_FL_INTERRUPT) { 115 int st_usleep(st_utime_t usecs),调用函数时
sync.c:120: me->flags &= ~_ST_FL_INTERRUPT;
sync.c:133: if (me->flags & _ST_FL_INTERRUPT) { 115 int st_usleep(st_utime_t usecs),变为运行协程时
sync.c:134: me->flags &= ~_ST_FL_INTERRUPT;
sync.c:185: if (me->flags & _ST_FL_INTERRUPT) { 180 int st_cond_timedwait(),调用函数时
sync.c:186: me->flags &= ~_ST_FL_INTERRUPT;
sync.c:208: if (me->flags & _ST_FL_INTERRUPT) { 180 int st_cond_timedwait(),变为运行协程时
sync.c:209: me->flags &= ~_ST_FL_INTERRUPT;
sync.c:294: if (me->flags & _ST_FL_INTERRUPT) { 290 int st_mutex_lock(),调用函数时
sync.c:295: me->flags &= ~_ST_FL_INTERRUPT;
sync.c:319: if ((me->flags & _ST_FL_INTERRUPT) && lock->owner != me) { 290 int st_mutex_lock(),变运行时
sync.c:320: me->flags &= ~_ST_FL_INTERRUPT;
115 int st_usleep(st_utime_t usecs)
116 {
117 _st_thread_t *me = _ST_CURRENT_THREAD();
118
119 if (me->flags & _ST_FL_INTERRUPT) {
120 me->flags &= ~_ST_FL_INTERRUPT; 退出前对interrupt取反
121 errno = EINTR;
122 return -1; 如果不对errno或返回值做处理,循环还是会继续的
123 }
124
125 if (usecs != ST_UTIME_NO_TIMEOUT) {
126 me->state = _ST_ST_SLEEPING;
127 _ST_ADD_SLEEPQ(me, usecs);
128 } else
129 me->state = _ST_ST_SUSPENDED;
130
131 _ST_SWITCH_CONTEXT(me);
132
133 if (me->flags & _ST_FL_INTERRUPT) {
134 me->flags &= ~_ST_FL_INTERRUPT;
135 errno = EINTR;
136 return -1;
137 }
138
139 return 0;
140 }
st的优缺点
优点:
1 用户空间实现协程调度,降低了用户空间和内核空间的切换,一定程度上提高了程序效率。
2 由于是在单核上的单线程多协程,同一时间只会有一个协程在运行,所以对于全局变量也不需要做协程同步。
共享资源释放函数只需做到可重入就行,所谓的可重入就是释放之前先判断是否为空值,释放后要赋空值。
3 协程使用完,直接return即可,st会回收协程资源并做协程切换。
4 可以通过向run_q链表头部加入协程,来实现优先调度。
5 st支持多个操作系统,比如 AIX,CYGWIN,DARWIN,FREEBSD,HPUX,IRIX,LINUX,NETBSD,OPENBSD,SOLARIS
缺点:
1 所有I/O操作必须使用st提供的API,只有这样协程才能被调度器管理。
2 所有协程里不能使用sleep(),sleep()会造成整个线程sleep。
3 被调度到的协程不会被限制运行时长,如果有协程是cpu密集型或死循环,就会严重阻碍其他协程运行。
4 单进程单线程,只能使用单核,想要通过多个cpu提高并发能力,只能开多个程序(进程),多进程通信较麻烦。
补充知识点
1 线程为什么要同步?
线程由内核自动调度
同一个进程上的线程共享该进程的整个虚拟地址空间
同一个进程上的线程代码区是共享的,即不同的线程可以执行同样的函数
所以在并发环境中,多个线程同时对同一个内存地址进行写入,由于CPU寄存器时间调度上的问题,写入数据会被多次的覆盖,会造成共享数据损坏,所以就要使线程同步。
2 什么情况下需要线程同步?
线程同步指的是 不同时发生,就是线程要排队
1 多核,单进程多线程,不同线程会对全局变量读写,这种情况才需要对线程做同步控制
2 单核,单进程多线程,不同线程会对全局变量读写,这种情况不需要对线程做同步控制
3 多核,单进程多线程,不同线程不对全局变量读写,这种情况不需要对线程做同步控制
4 多核,单进程多线程,不同线程会对全局变量读,这种情况不需要对线程做同步控制
问题:对于第2条,这个应该是有点儿片面,有依赖有优先级抢占也是要同步,除非像abcde这种几个线程干一摸一样的事情,而且项目之间不依赖。
有依赖 可以理解为 生产消费关系,虽然是 单核 单进程 多线程 但是同一时间只能有一个线程在运行,也就是说 生产和消费不会同时发生,同样多个生产也不会同时发生,所以不需要锁。
线程是有优先级控制,但是不管怎么控制,只要保证同一时间只能有一个线程在运行,就不需要锁了。
问题:对于第2条,这个应该是有点儿片面,有原子操作且原子操作过程中有线程切换,这种是需要锁的。
比如,线程a 第一次读取全局变量x并做处理,然后发生线程切换(线程由内和自动调度)后切回,然后第二次读取全局变量x并做处理,我们想确保两次读取
x值相同,但是发生了线程切换x值可能被改变。
如何 确保 第一次读取并处理和第二次读取并处理是原子操作呢? 使用st_mutex_t
3 accept()序列化
亦称惊群效应,亦亦称Zeeg难题
https://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/articles/SerializingAccept.html
在多次fork自己之后,每个进程一般将会开始阻塞在 accept() 上
每当socket上尝试进行一个连接,阻塞在 accept() 上的每个进程的 accept() 都会被唤醒。
只有其中一个进程能够真正接收到这个连接,而剩余的进程将会获得一个无聊的 EAGAIN 这导致了大量的CPU周期浪费,实际解决方法是把一个锁放在 accept() 调用之前,来序列化它的使用
4 Internet Applications网络程序架构
多进程架构 Multi-Process
一个进程服务一个连接,要解决数据共享问题
单进程多线程架构 Multi-Threaded
一个线程服务一个连接,要解决数据同步问题
事件驱动的状态机架构 Event-Driven State Machine
事件触发回调函数(缺点是嵌套) 或 用户空间实现协程调度
实际上 EDSM架构 用很复杂的方式模拟了多线程
st提供的就是EDSM机制,它在用户空间实现协程调度
https://blog.csdn.net/caoshangpa/article/details/53282330
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
戴尔科技集团将Dell EMC PowerStore的性能和自动化水平 提升至新高度
4月21日,“数耀新程 瞰见未来”2021戴尔科技集团存储论坛暨新品发布会盛大召开,并携存储新品Dell EMC PowerStore 500亮相,进一步完善戴尔存储产品组合,适应多样化的企业环境,满足各类应用场景工作负载需求。此次PowerStore更新是戴尔科技集团持续创新的又一力作,通过业界领先的产品技术赋能企业打造健壮的“数字原生企业”技术底座,直面数字化转型挑战,专注业务创新。 2021戴尔科技集团存储论坛暨新品发布 戴尔科技集团同时宣布与中国艺术档案学会和博鳌文创院签约合作《数字人文 留住记忆》的科技文化融合研发项目,共同参与关于“史诗档案数字化”的整备。戴尔科技集团战略性地将企业社会责任与文化科技融合,助力中国相关文化遗产数字化保护与存储,提供数字技术解决方案,促进数字化时代下,科技与文化的深度融合与创新发展。作为IT企业参与文化保护的首批典范,戴尔科技集团继续为“在中国,为中国”的战略里程碑添砖加瓦。 戴尔科技集团正通过新的软件和自动化功能升级Dell EMC PowerStore企业存储系统,同时推出PowerStore 500,适用于更广泛的业务和用例,同时具备企业...
- 下一篇
5000字 | 24张图带你彻底理解Java中的21种锁
本篇主要内容如下: 帮你总结好的锁: 序号 锁名称 应用 1 乐观锁 CAS 2 悲观锁 synchronized、vector、hashtable 3 自旋锁 CAS 4 可重入锁 synchronized、Reentrantlock、Lock 5 读写锁 ReentrantReadWriteLock,CopyOnWriteArrayList、CopyOnWriteArraySet 6 公平锁 Reentrantlock(true) 7 非公平锁 synchronized、reentrantlock(false) 8 共享锁 ReentrantReadWriteLock中读锁 9 独占锁 synchronized、vector、hashtable、ReentrantReadWriteLock中写锁 10 重量级锁 synchronized 11 轻量级锁 锁优化技术 12 偏向锁 锁优化技术 13 分段锁 concurrentHashMap 14 互斥锁 synchronized 15 同步锁 synchronized 16 死锁 相互请求对方的资源 17 锁粗化 锁优化技术 18 ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- Mario游戏-低调大师作品
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Red5直播服务器,属于Java语言的直播服务器
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS8编译安装MySQL8.0.19
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS6,CentOS7官方镜像安装Oracle11G
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装