swoole 第4章 异步TCP服务器-task初体验 swoole 第4章 异步TCP服务器-task初体验

2021-06-09

①、task 初体验

在上文和 IO 模型中我们都对同步和异步进行了详细的解释,可能你们都懂了,可能部分人还是没懂,毕竟异步始终是个抽象的概念。

今天我们再来强化下这个概念,说一说 Async Task。

AsyncTask,即异步任务。我们可以利用 AsyncTask 将一个耗时的任务投递到队列中,由进程池异步去执行。

博主你说人话,啥是异步任务?

总有些人吐槽不知道 swoole 的应用场景是啥,我们就以实际中遇到的问题为例:

  • 情景一:管理员需要给指定的用户发送邮件,当勾选 10 封甚至更多封的时候,点击发送,浏览器会一直转圈,直到邮件全部发送完毕。

  • 情景二:大家都爱看小说,我们以某小说网站的一个需求为例:要求作者可以把他事先写好的小说直接批量导入到网站(根据某种规则),这个操作起来同样会比较耗时。

从我们理解的角度思考,这其实都是 php 线程一直被阻塞,客户端才一直在等待服务端的响应。

对用户而言,这就是漫长的等待。如何优雅的提高用户体验就是一个非常棘手的问题。

我们的目的就是当用户选了 10000 封邮件或者提交了他含有 500 章节的内容之后,及时的通知用户邮件正在发送中或者提示用户章节内容正在上传中,对不对?明白我们今天的重点了吗?

对,你没理解错,AsyncTask 的目的就是这个。下面我们来介绍下 AsyncTask 的使用。

②、先创建一个server

$serv = new swoole_server("127.0.0.1", 9501);

③、开启task功能

task 功能默认是关闭的,开启 task 功能需要满足两个条件:

  • 配置 task 进程的数量

  • 注册 task 的回调函数 onTask 和 onFinish

配置 task 进程的数量,即配置 task_worker_num 这个配置项。比如我们开启一个 task 进程

$serv->set([
    "task_worker_num" => 1,
]);

④、task 怎么使用?

task 进程其实是要在 worker 进程内发起的,即我们把需要投递的任务,通过 worker 进程投递到 task 进程中去处理。

怎么操作呢?我们可以利用 swoole_server->task 函数把任务数据投递到 task 进程池中。

swoole_server->task 函数是非阻塞函数,任务投递到 task 进程中后会立即返回,即不管任务需要在 task 进程内处理多久,worker 进程也不需要任何的等待,不会影响到 worker 进程的其他操作。但是 task 进程却是阻塞的,如果当前 task 进程都处于繁忙状态即都在处理任务,你又投递过来 100 个甚至更多任务,这个时候新投递的任务就只能乖乖的排队等 task 进程空闲才能继续处理。

如果投递的任务量总是大于 task 进程的处理能力,建议适当的调大 task_worker_num 的数量,增加 task 进程数,不然一旦 task 塞满缓冲区,就会导致 worker 进程阻塞,这将是我们不期望的结果。

我们写一个例子来解释下上面所说的内容。

$serv->on("Connect", function ($serv, $fd) {
    echo "new client connected." . PHP_EOL;
});
$serv->on("Receive", function ($serv, $fd, $fromId, $data) {
    echo "worker received data: {$data}" . PHP_EOL;

    // 投递一个任务到task进程中
    $serv->task($data);

    // 通知客户端server收到数据了
    $serv->send($fd, "This is a message from server.");

    // 为了校验task是否是异步的,这里和task进程内都输出内容,看看谁先输出
    echo "worker continue run."  . PHP_EOL;
});

⑤、注册 onTask 回调

/**
 * $serv swoole_server
 * $taskId 投递的任务id,因为task进程是由worker进程发起,所以多worker多task下,该值可能会相同
 * $fromId 来自那个worker进程的id
 * $data 要投递的任务数据
 */
$serv->on("Task", function ($serv, $taskId, $fromId, $data) {
    echo "task start. --- from worker id: {$fromId}." . PHP_EOL;
    for ($i=0; $i < 5; $i++) {
        sleep(1);
        echo "task runing. --- {$i}" . PHP_EOL;
    }
    echo "task end." . PHP_EOL;
});

为了模拟判断到底是不是异步的,我们在 task 的回调中循环一个耗时任务,另一个需要注意的地方,我们在 task 回调内的结尾并没有 return 任何内容。

⑥、注册 onFinish 回调

/**
 * 只有在task进程中调用了finish方法或者return了结果,才会触发finish
 */
$serv->on("Finish", function ($serv, $taskId, $data) {
    echo "finish received data {$data}" . PHP_EOL;
});

⑦、最后,调用 server 的 start 方法

$serv->start();

整个过程是这样的:我们在 worker 进程收到数据后,直接调用 swoole_server->task 函数把数据投递给task进程,随后在 swoole_server->task 调用后和 task 进程内都输出内容。

⑧、执行结果

准备就绪之后我们在终端下启动 server,执行

php server.php

客户端的测试,我们仍然利用上文在 client.php 写好的代码进行测试,新开一个终端,执行

php client.php

一起看下测试结果:

服务端

new client connected.
worker received data: hello server.
worker continue run.
task start. --- from worker id: 3.
client closed
task runing. --- 0
task runing. --- 1
task runing. --- 2
task runing. --- 3
task runing. --- 4
task end.

客户端

This is a message from server.

从测试结果中,我们看到在 swoole_server 的 task 函数之后输出的内容“worker continue run”在 task 进程开始之前输出。第二个应该引起你注意的是在结果中我们并没有看到在 onFinish 回调中输出的信息,我们把 task 回调函数的最后一句 echo 改为 return 再试一次。

return "task end." . PHP_EOL;

如果你修改了代码之后,直接去执行 client.php,你会发现结果并没有任何变化。

我们在 server 启动的那个终端下,按 Ctrl+C 退出,然后再重新启动 server

php server.php

发现了什么?有没有看到 server 终端下面的最后一行显示的信息变了?

finish received data "task end.";

怎么回事,为什么是这样的呢?大白天见鬼啦?为什么要重启下 server 代码才生效呢?

这个问题跟常驻内存有关,我们准备后面单独增加一个章节说说这个事。

在结果中我们看到了在 onFinish 回调中打印的信息。为什么这个时候能输出 onFinish 回调的内容了呢?

这是因为 task 进程内一旦 return 或者调用 swoole_server->finish 方法,就会通知到 worker 进程该任务已经完成,worker 进程会继续触发 onFinish 回调,进一步对投递的结果进行处理。

这个过程有没有必要呢?讲真话,还真得看自己的业务需求。比如我们以开篇抛出的情境一发送邮件为例,如果我们在 task 进程内发送完邮件就完事了,不需要关注邮件是否发送成功,反正发不发也无所谓,这个时候就没必要调onFinish 回调了。但是如果说我们还需要确认发送的邮件是否成功,没成功还要再继续发,这个时候我们就可以在 onFinish 回调中继续处理 task 的结果了。

⑨、总结

  • 没有耗时任务的情况下,worker 直接运行,无需开启 task

  • 对于耗时的任务,可以在 worker 内调用 task 函数,把异步任务投递给 task 进程进行处理,task 进程的数量取决于 task_worker_num 的配置

  • task 进程内可以选择调用 finish 方法或者 return,来通知 worker 进程此任务已完成,worker 进程会在 onFinish 回调中对 task 的执行结果进一步处理。如果 worker 进程不关心任务的结果,finish 就不需要了。

阅读 1187