srs的hls回调
srs 在国内算是比较有名气的流媒体服务器,底层使用了st_thread
这个协程库,整个进程是单核运行的.
它的hls直播功能支持以下回调:
1
|
on_hls http://example.com/xxxx;
|
也就是在直播时,每生成一个ts片段都会回调上面配置的http地址,用来通知用户,用户可以根据回调做一些业务,例如分发ts片段用于cdn缓存,录制直播时的片段等等.
我测试时发现了一个现象, 每次推流断开后,srs生成了最后一个ts片段,但是我的app server没有收到最后一个片段的回调.这应该是srs的一个bug.
上它的github
项目issuse
上去搜索,已经有人提过这个bug, 详见issue 2068,作者回复"这个后面看看".
这个功能如果真的用于线上,那么这个bug影响还是比较大的,会直接造成业务方忽略最后一个片段,少则几秒,多则十几秒的数据.
原因
调试一下查看这个问题的原因,srs的日志其实已经对这个情况有记录了
1
|
[2021-01-18 21:44:57.483][Warn][256506][194][4] ignore task failed code=1018 : callback on_hls http://localhost:8088/callback/on_hls : http: post http://localhost:8088/callback/on_hls with {"action":"on_hls","client_id":194,"ip":"xxxx","vhost":"defaultVhost","app":"live","stream":"xx","param":"","duration":3.24,"cwd":"/home/hh/srs-3.0release/trunk","file":"./objs/nginx/html/live/xx-1.ts","url":"live/xx-1.ts","m3u8":"./objs/nginx/html/live/xx.m3u8","m3u8_url":"live/xx.m3u8","seq_no":1}, status=0, res= : http: client post : http: connect server : http: tcp connect localhost:8088 to=30000ms, rto=30000ms : tcp: connect localhost:8088 to=30000ms : connect to localhost:8088
|
可以看到这个回调任务结果失败了
从源码中看看逻辑
- 推流断开后,最后一个on_hls通知会作为
asynctask
加到队列中等待在SrsAsyncCallWorker::cycle()
执行,当执行到了SrsAsyncCallWorker::cycle()
中的flush_tasks()
,调用到了http请求的tcp连接st_connect
.
- 这个时候由于推流断开,会通过
unpublish
触发到SrsAsyncCallWorker::stop()
函数中,这个调用与上一个调用是不同的两个协程,然后trd->stop()
会被调用,这个函数最终会调用st_thread_interrupt
- 上面第2步说的
interrupt
刚好打断了第1步的st_connect
,所以最后一个on_hls的http请求发不出去了.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
void SrsAsyncCallWorker::stop()
{
flush_tasks();
srs_cond_signal(wait);
trd->stop();
}
srs_error_t SrsAsyncCallWorker::cycle()
{
srs_error_t err = srs_success;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "async call worker");
}
if (tasks.empty()) {
srs_cond_wait(wait);
}
flush_tasks();
}
return err;
}
|
上面这段代码是srs中stop
和协程循环cycle
的函数体
抛离on_hls
这个问题,从宏观上来看上面的代码,stop时内部调用interrupt是可能打断最后的tasks的执行的.所以它们的协作从逻辑上看是有问题的.
修改
我修改的方案是srscoroutine增加一个stopwithoutinterrupt函数供调用,不会调用st_thread_interrupt
去打断正在执行的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
diff --git a/trunk/src/app/srs_app_async_call.cpp b/trunk/src/app/srs_app_async_call.cpp
index 271964d9..90ba9eb4 100644
--- a/trunk/src/app/srs_app_async_call.cpp
+++ b/trunk/src/app/srs_app_async_call.cpp
@@ -91,7 +91,7 @@ void SrsAsyncCallWorker::stop()
{
flush_tasks();
srs_cond_signal(wait);
- trd->stop();
+ trd->stopwithoutinterrupt();
}
srs_error_t SrsAsyncCallWorker::cycle()
diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp
index 831a0613..84710ed6 100755
--- a/trunk/src/app/srs_app_st.cpp
+++ b/trunk/src/app/srs_app_st.cpp
@@ -64,6 +64,9 @@ srs_error_t SrsDummyCoroutine::start()
void SrsDummyCoroutine::stop()
{
}
+void SrsDummyCoroutine::stopwithoutinterrupt()
+{
+}
void SrsDummyCoroutine::interrupt()
{
@@ -130,6 +133,40 @@ srs_error_t SrsSTCoroutine::start()
return err;
}
+void SrsSTCoroutine::stopwithoutinterrupt()
+{
+ if (disposed) {
+ return;
+ }
+ disposed = true;
+
+ interrupted = true;
+ if (trd_err == srs_success) {
+ trd_err = srs_error_new(ERROR_THREAD_INTERRUPED, "interrupted");
+ }
+
+ // When not started, the rd is NULL.
+ if (trd) {
+ void* res = NULL;
+ int r0 = st_thread_join((st_thread_t)trd, &res);
+ srs_assert(!r0);
+
+ srs_error_t err_res = (srs_error_t)res;
+ if (err_res != srs_success) {
+ // When worker cycle done, the error has already been overrided,
+ // so the trd_err should be equal to err_res.
+ srs_assert(trd_err == err_res);
+ }
+ }
+
+ // If there's no error occur from worker, try to set to terminated error.
+ if (trd_err == srs_success && !cycle_done) {
+ trd_err = srs_error_new(ERROR_THREAD_TERMINATED, "terminated");
+ }
+
+ return;
+}
+
void SrsSTCoroutine::stop()
{
if (disposed) {
diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp
index 956e00b4..1549cb82 100644
--- a/trunk/src/app/srs_app_st.hpp
+++ b/trunk/src/app/srs_app_st.hpp
@@ -76,6 +76,7 @@ public:
public:
virtual srs_error_t start() = 0;
virtual void stop() = 0;
+ virtual void stopwithoutinterrupt() = 0;
virtual void interrupt() = 0;
// @return a copy of error, which should be freed by user.
// NULL if not terminated and user should pull again.
@@ -93,6 +94,7 @@ public:
public:
virtual srs_error_t start();
virtual void stop();
+ virtual void stopwithoutinterrupt();
virtual void interrupt();
virtual srs_error_t pull();
virtual int cid();
@@ -143,6 +145,7 @@ public:
// many threads to stop like the encoder, use the interrupt to notify all threads
// to terminate then use stop to wait for each to terminate.
virtual void stop();
+ virtual void stopwithoutinterrupt();
// Interrupt the thread and notify it to terminate, it will be wakeup if it's blocked
// in some IO operations, such as st_read or st_write, then it will found should quit,
// finally the thread should terminated normally, user can use the stop to join it.
|
但是这么改是否会导致一些任务永远阻塞呢? 这个需要看看st_thread
库的调用是不是有可能造成永远阻塞.或者srs作者有什么更好的解决方案让任务能正常执行完毕,也许设定超时是个办法.
后续
这个issue到最后还是没有讨论清楚,看起来这个问题并不是必现的,issue里面的修复方式只是把调用顺序换了一下,似乎这个问题就很难再复现了,但是我这里测试仍然是必现的.
仔细想一想,我这里测试的环境是wsl, 网络和协程的调度和真实linux内核应该是有区别的, linux环境里也许st_connect
能立刻得到调度执行完毕,所以被打断的概率变的极小,掩盖了逻辑上的漏洞.而wsl环境下st_connect
被阻塞时间较长,这可能是这个问题在我的环境极大概率出现的原因.
srs版本: 3.0release
测试环境: wsl1