ThinkPHP 队列模板

官方文档的例子有点粗糙,先优化下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?php

namespace app\job;

use think\queue\Job;

class Demo
{
public function fire (Job $job, $data)
{
//....这里执行具体的任务

if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了
$e = new \Exception('执行失败次数已达上限。');

// 进行异常处理
$job->failed($e);

// 删除任务
$job->delete();

return;
}

//如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
$job->delete();

// 重新发布时间(秒)
$delay = 30;

// 重新发布
$job->release($delay);
}

public function failed ($data)
{
// 异常处理,因为队列会把数据删除,所以这里要进行持久化操作。
}
}

当任务处理成功后,切记一定要 return; ,因为忘记返回,我一直在排查业务代码有没有问题。

业务前期,我们对队列的需求不是很高,出于可用性和尽早数据最终一致性考虑,我们希望队列处理也可以兼容同步的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public function fire (Job $job, $data)
{
$result = $this->handle($data);

// 返回 true 代表处理成功,可以删除队列
if($result === true) {
$job->delete();

return;
}

if($job->attempts() > 3) {
$e = new \Exception('执行失败次数已达上限。');

$job->failed($e);

$job->delete();

return;
}

$job->release(30);

return;
}

/**
* 业务处理
*
* @param void $data
* @return bool
*/
public function handle($data) :bool
{
return true;
}

业务同步处理:

1
2
3
4
5
$result = (new app\job\Demo())->handle($data);

if($result === true) {
// 业务处理成功
}

后面,我们把微信支付的回调也放到队列中处理,那不可避免我们需要让队列能完成事务。因为前期我们封装了一层 handle,所以我们处理是非常方便的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public function fire (Job $job, $data)
{
// 开启事务
Db::startTrans();

try {
$result = $this->handle($data);

// 返回 true 代表处理成功,可以删除队列
if($result === true) {
// 提交事务
Db::commit();

$job->delete();

return;
}
} catch (\Exception $e) {
// 回滚
Db::rollback();
}
}

事务处理业务中,我们又发现一些业务可能存在不需要提交事务,但是需要删除任务的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 是否需要空提交。
* @var bool
*/
protected $null_commit = false;

public function fire (Job $job, $data)
{
// 开启事务
Db::startTrans();

try {
$result = $this->handle($data);

// 返回 true 代表处理成功,可以删除队列
if($result === true) {
if($this->null_commit) {
// 空提交,需要回滚来进行锁释放
Db::rollback();
} else {
// 提交事务
Db::commit();
}

$job->delete();

return;
}
} catch (\Exception $e) {
// 回滚
Db::rollback();
}
}

/**
* 业务处理
*
* @param void $data
* @return bool
*/
public function handle($data) :bool
{
if(!is_array($data)) {
// 数据类型错误,需要删除该任务
$this->null_commit = true;

return true;
}

return true;
}

当然,我们如果每个队列需求都维护这么一个文件实在太麻烦了,所以我们对一些业务放在了同一个处理队列中,通过类型进行一个区分,即我们的最终版。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
<?php

namespace app\job;

use think\Db;
use think\queue\Job;

class Demo
{
// 用户注册
const TYPE_USER_REGISTER = 1;

/**
* 是否需要空提交。
*
* @var bool
*/
protected $null_commit = false;

public function fire (Job $job, $data)
{
// 开启事务
Db::startTrans();

try {
$result = $this->handle($data);

// 返回 true 代表处理成功,可以删除队列
if ($result === true) {
if ($this->null_commit) {
// 空提交,需要回滚来进行锁释放
Db::rollback();
} else {
// 提交事务
Db::commit();
}

$job->delete();

return;
}
}
catch (\Exception $e) {
// 回滚
Db::rollback();
}

if ($job->attempts() > 3) {
$e = new \Exception('执行失败次数已达上限。');

$job->failed($e);

$job->delete();

return;
}

$job->release(30);

return;
}

/**
* 业务处理
*
* @param void $data
*
* @return bool
*/
public function handle ($data): bool
{
if (!is_array($data)) {
// 数据类型错误,需要删除该任务
$this->null_commit = true;

return true;
}

if (false) {
$data = [
'type' => 1,
'uid' => 1,
'data' => [
// data 是 type 的定制数据
'ip' => '8.8.8.8',
'ua' => 'xxx',
],
];
}

if (!(isset($data['type'])
&& isset($data['uid'])
&& isset($data['data'])
&& is_integer($data['type'])
&& is_array($data['data'])
)) {
// 数据不符合要求
$this->null_commit = true;

return true;
}

$type = $data['type'];
$uid = $data['uid'];
$data = $data['data'];

if ($type === Demo::TYPE_USER_REGISTER) {
return $this->TYPE_USER_REGISTER($uid, $data);
}

return true;
}

public function failed ($data)
{
// 异常处理,因为队列会把数据删除,所以这里要进行持久化操作。
}

protected function TYPE_USER_REGISTER ($uid, array $data): bool
{
return true;
}
}

业务使用:

1
2
3
4
5
6
7
8
think\facade\Queue::push(app\job\Demo::class, [
'type' => app\job\Demo::TYPE_USER_REGISTER,
'uid' => 1,
'data' => [
'ip' => '8.8.8.8',
'ua' => 'xxx',
],
]);
往上