ThinkPHP6 游戏补偿处理之队列版

游戏补偿处理之简易版 中,我们使用分页的方式来处理补偿的分发,为了更好的满足业务发展,这篇我们将使用消息队列,通过先进先出的方式来更好完成补偿的分发。

可能你会好奇,简易版出没几天,怎么又来了个队列版。其实很多业务的处理,我们要先弄简单的,然后慢慢优化,增强性能,一步到位可能业务又变了呢,而且初期我们的时间够用吗?

所以队列版我们的选型也不可以太复杂,Redis 其实对我们现阶段是够用了的,暂时没有必要直接就上了 Kafka 、 Apache RocketMQ 、 RabbitMQ 这些的。选 MySQL 又性能可能有些低,而且 Redis 我们也是较为熟悉了的。

大概流程是:

  1. 有个命令行来充当消息的生产者,把满足的用户循环推到队列中。
  2. 监听队列然后进行消费处理,完成后删除记录。

Redis 的可靠性也是很高的,虽然不如上面那三个专业的,但怕丢失,可以在第一次完成后,再重新跑一遍,毕竟我们代码已经做了严格处理,避免了会重复补偿多次的情况。

那开始我们的实现吧。

第一步要安装两个需要到的依赖。

1
$ composer require topthink/think-queue:~3.0 topthink/think-migration:~3.0 -vvv

完成依赖安装后,我们需要生成一个用于存储执行失败记录的表格。这部分记录相当于没有正常补偿到,后期自己要根据记录信息进行额外的排查、处理。

1
$ php think queue:failed-table

好了,现在我们需要进行一些配置,才能连上我们的 Reids ,和错误的记录处理。

config/queue.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?php

return [
// 默认驱动
'default' => 'redis',

// 驱动列表
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false,
],
],
// 失败处理
'failed' => [
// 数据表记录,默认 None,处理可见 `vendor/topthink/think-queue/src/queue/failed`
'type' => 'Database',
// 数据表名称
'table' => 'failed_jobs',
],
];

创建一个生产者的命令。使用命令而不是 HTTP 请求,主要考虑了超时问题。换成命令行方式,相当于我们可以把参数直接通过命令行中传进来。

1
$ php think make:command Reissue reissue

app/command/Reissue.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
<?php
namespace app\command;

use think\facade\Db;
use think\facade\Queue;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

class Reissue extends Command
{
protected function configure()
{
// 指令配置
$this->setName('reissue')
->setDescription('补偿')
->addOption('key', null, Option::VALUE_REQUIRED, '补偿标识 ID', null)
->addOption('pid', null, Option::VALUE_REQUIRED, '补偿发放道具ID', 1)
->addOption('value', null, Option::VALUE_REQUIRED, '补偿发放道具数量', null);
}

protected function execute(Input $input, Output $output)
{
// 补偿标识 ID
$reissue_key = (string) $input->getOption('key');

// 补偿发放道具ID,1 为金币
$reissue_pid = (int) $input->getOption('pid');

if(!$reissue_pid) {
$output->writeln('补偿发放道具ID需要大于 0');

return;
}

// 补偿发放道具数量
$reissue_value = (int) $input->getOption('value');

if(!$reissue_value) {
$output->writeln('补偿发放道具数量需要大于 0');

return;
}

// 查询满足条件的用户
$sql = sprintf('SELECT `users`.`id`
FROM `users`
LEFT JOIN `users_props_logs` ON `users_props_logs`.`uid` = `users`.`id` AND `users_props_logs`.`pid` = %d AND `users_props_logs`.`name` = "%s"
WHERE `users_props_logs`.`id` IS NULL', $reissue_pid, $reissue_key);

$data = Db::query($sql);

if(!count($data)) {
$output->writeln('未查询到满足条件的');

return;
}

$data = array_column($data, 'id');

$output->writeln('返回数:' . count($data));

foreach ($data as $uid) {
// 遍历推到队列中
Queue::push('app\job\Reissue', [
'uid' => $uid,
'reissue_key' => $reissue_key,
'reissue_pid' => $reissue_pid,
'reissue_value' => $reissue_value,
]);
}
}
}

命令已经完成了,我们可以打印下 help 看看。

1
2
3
4
5
6
7
8
9
10
$ php think reissue --help
Usage:
reissue [options]

Options:
--key=KEY 补偿标识 ID
--pid=PID 补偿发放道
ID [default: 1]
--value=VALUE 补偿发放道
数量

执行生产命令。

1
2
$ php think reissue --key "reissue_安慰奖_2022_06_20_1400" --pid 1 --value 100
返回数:10032

可以看见 Redis 中已经是有数据了的。

那现在我们要来处理消费这一边了。

app\job\Reissue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
<?php
namespace app\job;

use think\facade\Db;
use think\facade\Log;
use think\queue\Job;

class Reissue
{
public function fire(Job $job, $data)
{
if (!isset($data['reissue_key'])
|| !isset($data['reissue_pid'])
|| !isset($data['reissue_value'])
|| !isset($data['uid'])) {
// 入参有问题,需要根据业务,判断要直接删除还是报错然后失败记录表。
$job->delete();

return;
}

$uid = $data['uid'];
$reissue_key = $data['reissue_key'];
$reissue_pid = $data['reissue_pid'];
$reissue_value = $data['reissue_value'];

$log_prefix = sprintf('[游戏补发][%s][%d]', $reissue_key, $uid);

Log::write($log_prefix . "开始处理");

// 开启事务
Db::startTrans();

// 是否完成,主要作用来记录异常处理方式
$has_complete = false;

try {
// 判断是否已经发放过了,加锁是为了阻塞,避免重复消费
$history = Db::table('users_props_logs')
->lock(true)
->where('pid', $reissue_pid)
->where('uid', $uid)
->column('distinct name');

if($history && in_array($reissue_key, $history)) {
$has_complete = true;

throw new \Exception('已经发放了');
}

// 获取用户该道具的最新信息
$props = Db::table('users_props_logs')
->lock(true)
->field('pid, uid, surplus')
->where('pid', $reissue_pid)
->where('uid', $uid)
->order('id desc')
->find();

if(!$props) {
// 没有初始化该货币
$props = [
'pid' => $reissue_pid,
'uid' => $uid,
'surplus' => 0,
];

$has_complete = true;

throw new \Exception('没有初始化?');
}

// 插入记录
Db::table('users_props_logs')
->insert([
'pid' => $props['pid'],
'uid' => $props['uid'],
'name' => $reissue_key,
'value' => $reissue_value,
'surplus' => $props['surplus'] ? (int) bcadd($props['surplus'], $reissue_value) : $reissue_value,
]);

// 提交事务
Db::commit();

Log::write($log_prefix . "成功处理");
} catch (\Exception $e) {
// 回滚事务
Db::rollback();

if($has_complete) {
Log::write($log_prefix . $e->getMessage());

$job->delete();
} else {
Log::write($log_prefix . "处理失败:" . $e->getMessage());

$job->failed($e);
}
}
}
}

上面代码中,可能会觉得奇怪,为什么完成了处理,还要抛出来一个异常呢?

其实是一开始我们在获取这个用户相同货币的日志记录时,有一个锁锁住了,然后我们后续的流程,比如检查到已经发放了,那这时候下面的流程是不会继续走了的,但是锁还是在的,我们要怎么去解开呢?

在 try 中执行回滚操作来解开锁吗?好像不是很优雅的,这样子也会导致逻辑比较混乱。

所以就采用了 抛出异常+标记完成 的方式,来进行一个解锁处理,相当于 catch 中,我们不管什么情况都要进行一个回滚,但接下来的处理,比如这个消费是不是成功的,要不要删除,则要看有没有标记成功了的。

开始执行下我们的消费队列,这里的命令参数可以自行 help 看一下。

1
2
3
4
5
6
7
$ php think queue:work --queue=default --tries=3 --timeout=5
[2022-06-20 14:19:44][qUideswfqZJgoreNrDhHxZJZcmuQxuam] Processing: app\job\Reissue
[2022-06-20 14:19:44][qUideswfqZJgoreNrDhHxZJZcmuQxuam] Processed: app\job\Reissue
[2022-06-20 14:19:44][tuYZQBjGQkfRyMdmTAXEnXmzXLMhvBDg] Processing: app\job\Reissue
[2022-06-20 14:19:44][tuYZQBjGQkfRyMdmTAXEnXmzXLMhvBDg] Processed: app\job\Reissue
[2022-06-20 14:19:44][QrnlFUaJBbvjVRbcZfByoosfHkKncVLI] Processing: app\job\Reissue
[2022-06-20 14:19:44][QrnlFUaJBbvjVRbcZfByoosfHkKncVLI] Processed: app\job\Reissue

但好像出现了点问题,我在代码里面加了个随机抛出异常,但数据表中还是一片空白,经过的依赖的排查,原来这块需要我们手动进行一个处理,驱动、队列名称这些好像都不能直接获取,需要我们写死来赋值。

app\job\Reissue

1
2
3
4
5
6
7
public function failed($payload, $e)
{
$connection = 'redis';
$queue = 'default';

app()->get('queue.failer')->log($connection, $queue, $payload, $e);
}

OK,这样子我们的队列也能成功的记录到了错误的信息。

如果消息少,那我们可能开一个消费终端就可以来处理了,如果消息多了,也可以通过服务器、MySQL、Redis 的监控,然后在慢慢增加处理的消费终端,就如同下图一样。

如果你使用了 supervisor 来让消费者一直处于后台运行状态,记得有个 numprocs 变量,可以增加你同时参与监听的消费者数量,更好进行减少消息堆积。

往上