协程是另外一种并发方式,相比多进程或者多线程来说,其上下文的切换成本开销更小,这也意味这协程的执行更加高效。
Lua 作为一种简单的语言,仍然可以支持闭包、协程等较新的特性,在此介绍一下协程相关的内容。
简介
对于并发任务,通常有两种解决方法:A) 抢占式 (preemptive) 多任务,常见的是通过操作系统决定何时执行哪个任务;B) 协作式 (cooperative) 多任务,有任务自身决定何时放弃任务的执行。
协程 (coroutine) 是一种程序控制机制,实际上在上世纪 60 年代就已经被提出,用来方便地实现协作式的多任务。在很多主流的程序语言中,如 C/C++、Java 等,很少能看到协程,但现在不少脚本语言,如 Python、Lua 等,都提供了协程或与之相似的机制。
Lua 实现的是一种非对称式 (asymmetric) 或者称为半对称式 (semi-symmetric) 协程,之所以被称为非对称的,是因为它提供了两种传递程序控制权的操作:A) 利用 resume() (重)调用协程;B) 通过 yield 挂起协程并将程序控制权返回给协程的调用者。这看起来有点像函数的调用过程。
而 libtask 的实现就更像对称式的协程。
协程类似于线程,包含了自己的栈、本地变量、指针,但是包括了其它的全局变量是共享的。与线程不同的是,线程可以真正的在多个核上并发执行,而协程在具体的时间只能运行其中的一个。
常用接口
与协程相关的函数包含在 coroutine table 中,其中的 create() 函数用来创建一个协程,该函数会返回一个 thread 类型的变量,用来表示协程。
co = coroutine.create(function () print("Hello World") end)
print(co) --> thread: 0x24a0b20
Lua 中的协程包含了四种状态:suspended、running、dead 以及 normal,可以通过 satatus() 查看。
print(coroutine.status(co)) --> suspended
刚创建的协程处于 suspended 状态,也就是说创建时不会自动运行;可以通过 resume() 函数运行或者重新运行该协程,将状态从 suspended 转换为 running 。如上的函数中,仅仅打印字符串,那么执行完成之后,状态会变为 dead 。
到此为止,协程就像是一个执行起来比较麻烦的函数;不过其真正有用的是 yield() 函数,可以将当前协程转换为 suspended 状态,然后等待协程后续执行。
co = coroutine.create(function ()
for i = 1, 3 do
print("co", i)
coroutine.yield()
end
end)
----- OK,开始执行,直到第一次到达yield
coroutine.resume(co) --> co 1
----- 此时状态再次转换为suspended
print(coroutine.status(co)) --> suspended
----- 再次运行多次
coroutine.resume(co) --> co 2
coroutine.resume(co) --> co 3
coroutine.resume(co) --> prints nothing
print(coroutine.resume(co)) --> false cannot resume dead coroutine
当执行了三次之后,在第四次执行时,协程将结束循环并返回;如果再此尝试执行,则会返回 false 以及相关的错误信息。
当在协程 A 中 resume() 另外一个协程 B 时,A 的状态不是 suspended,因为我们不能再通过 resume() 调用执行,而且协程 A 也不是 running 状态,协程 B 才是,因此协程 A 处于 normal 状态。
resume()
在调用 resume() 执行时,是处于保护状态的,也就是说如果在协程中有异常时,会返回到 resmue() 调用处。正常来说,yield() 只会返回 true,而实际上还可以通过该函数返回值。
co = coroutine.create(function (a,b)
coroutine.yield(a + b, a - b)
end)
print(coroutine.resume(co, 20, 10)) --> true 30 10
co = coroutine.create(function ()
return 6, 7
end)
print(coroutine.resume(co)) --> true 6 7
如上,通过 return 同样可以返回相应的值。
示例
接下来查看几个经典的示例。
生产者-消费者
这也许是最经典的示例了,其中生产者一直从文件中读取数据,然后将读取的数据传递给消费者,两个程序的执行过程如下:
function producer ()
while true do
local x = io.read() -- 从文件中读取新的值
send(x) -- 将数据发送给消费者
end
end
function consumer ()
while true do
local x = receive() -- 从生产者中接收新的值
io.write(x, "\n") -- 消费之
end
end
从表面看来,两个程序都有自己的循环,而且每个程序都是可以被调用的,但实际上只能有一个是主循环,而另外一个将成为被调用者。通过 resume-yield 刚好可以实现这种逻辑。
当协程调用 yield() 后,将会返回到 resume() 调用;同样,调用 resume() 时会返回 yield() 的结果。正好符合上述的场景,receive() 调用生产者生成新的数据,然后 send() 将新的值返回给消费者。当然,上述的模型中,生产者将成为协程。
如下为完整的示例,生产者将会从 stdin 中读取一行,然后传递给消费者,通过 Ctrl-D 退出。
function receive ()
local status, value = coroutine.resume(producer)
return value
end
function send (x)
coroutine.yield(x)
end
producer = coroutine.create(function ()
while true do
local x = io.read() -- 从文件中读取新的值
send(x) -- 将数据发送给消费者
end
end)
function consumer ()
while true do
local x = receive() -- 从生产者中接收新的值
if x == nil then
return 0
else
io.write(x, "\n") -- 消费之
end
end
end
consumer()
在上述程序中,当消费者需要数据时将会调用生产者,相当于是消费者驱动的;当然,也可以使用生产者驱动,此时消费者将成为协程。
生产者-过滤器-消费者
这是上面生产者-消费者模型的扩展,过滤器用来作些数据处理,例如数据转换、过滤等。此时的过滤器同时是生产者也是消费者,将 resume 调用生产者生成数据,将转换后的值 yield 给消费者。
整个函数的调用过程如下:
function receive (prod)
local status, value = coroutine.resume(prod)
return value
end
function send (x)
coroutine.yield(x)
end
function producer ()
return coroutine.create(function ()
while true do
local x = io.read() -- 从文件中读取新的值
send(x) -- 将数据发送给消费者
end
end)
end
function filter (prod)
return coroutine.create(function ()
for line = 1, math.huge do
local x = receive(prod)
if x ~= nil then
x = string.format("%5d %s", line, x)
end
send(x)
end
end)
end
function consumer (prod)
while true do
local x = receive(prod) -- 从生产者中接收新的值
if x == nil then
return 0
else
io.write(x, "\n") -- 消费之
end
end
end
----- 其调用过程大致如下
p = producer()
f = filter(p)
consumer(f)
----- 也可以简单写为
--consumer(filter(producer()))
非抢占式的多线程
实际上协程更类似于非抢占式的多线程,当一个协程运行时,除非主动通过 yield() 让出执行权,否则不能被动的抢占。很多场景下这一模型都能正常运行,但是当任意协程调用阻塞式的操作时,那么整个程序都将会被阻塞。接下来看看如何解决这一问题。
简单通过下载多个网页作为示例。注意,该示例依赖于 socket 模块,在 CentOS 可以直接通过 yum install lua-socket 进行安装,依赖 EPEL 源。
如下为简单读取 http://www.baidu.com/ 的内容。
#!/usr/bin/lua
local socket = require("socket")
local host, file = "www.baidu.com", "/"
-- 创建一个TCP连接,连接到HTTP连接的标准80端口上
local sock = assert(socket.connect(host, 80))
sock:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
repeat
-- 以1K的字节块来接收数据,并把接收到字节块输出来
local chunk, status, partial = sock:receive(2^10)
io.write(chunk or partial)
until status == "closed"
-- 关闭 TCP 连接
sock:close()
如果采用串行的方式,实际上很大一部分时间都耗费在了等待网络数据上,也就是阻塞在了 receive() 函数调用中。所以,通过并发可以提高读取效率。
通过协程可以很容易实现并发,当一个协程没有数据时返回给一个 dispatcher,然后调度另外的协程。
首先需要将上述的程序改写为一个函数,实际上我们对网页的内容并不感兴趣,因此只统计一下网站返回的字节数,而非其内容。
function receive (connection)
connection:settimeout(0) -- 设置为非阻塞模式
local s, status, partial = connection:receive(2^10)
if status == "timeout" then
coroutine.yield(connection)
end
return s or partial, status
end
function download (host, file)
local c = assert(socket.connect(host, 80))
local count = 0 -- 用于统计读取的字节数
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
repeat
local chunk, status = receive(c)
count = count + #chunk
until status == "closed"
c:close()
print(file, count)
end
接下来看看 dispatcher 的内容。
threads = {}
function get (host, file)
-- 创建协程
local co = coroutine.create(function ()
download(host, file)
end)
table.insert(threads, co) -- 将创建的协程添加到列表中,等待被调用
end
function dispatch ()
local i = 1
while true do
if threads[i] == nil then -- 判断是否还有需要执行的线程
if threads[1] == nil then break end -- 判断是否已经都执行完
i = 1 -- 还有,从头开始执行一次
end
local status, res = coroutine.resume(threads[i])
if not res then -- 返回false表示该协程已经执行完
table.remove(threads, i) -- 删除之
else
i = i + 1 -- 然后执行下一个
end
end
end
其中 threads 保存了正在执行的协程,get() 函数用于添加需要下载的页面。dispatch() 则是主要的任务调度过程,其处理也非常简单,就是不断的遍历整个表,查看是否还有需要执行的任务。
然后通过如下方式进行调用。
host = "www.w3.org"
get(host, "/TR/html401/html40.txt")
get(host, "/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
get(host, "/TR/REC-html32.html")
get(host, "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")
dispatch()