游戏补偿处理之简易版 中,我们使用分页的方式来处理补偿的分发,为了更好的满足业务发展,这篇我们将使用消息队列,通过先进先出的方式来更好完成补偿的分发。
可能你会好奇,简易版出没几天,怎么又来了个队列版。其实很多业务的处理,我们要先弄简单的,然后慢慢优化,增强性能,一步到位可能业务又变了呢,而且初期我们的时间够用吗?
所以队列版我们的选型也不可以太复杂,Redis 其实对我们现阶段是够用了的,暂时没有必要直接就上了 Kafka 、 Apache RocketMQ 、 RabbitMQ 这些的。选 MySQL 又性能可能有些低,而且 Redis 我们也是较为熟悉了的。
大概流程是:
有个命令行来充当消息的生产者,把满足的用户循环推到队列中。
监听队列然后进行消费处理,完成后删除记录。
Redis 的可靠性也是很高的,虽然不如上面那三个专业的,但怕丢失,可以在第一次完成后,再重新跑一遍,毕竟我们代码已经做了严格处理,避免了会重复补偿多次的情况。
那开始我们的实现吧。
第一步要安装两个需要到的依赖。
1 $ composer require topthink/think-queue:~3.0 topthink/think-migration:~3.0 -vvv
完成依赖安装后,我们需要生成一个用于存储执行失败记录的表格。这部分记录相当于没有正常补偿到,后期自己要根据记录信息进行额外的排查、处理。
1 $ php think queue:failed-table
好了,现在我们需要进行一些配置,才能连上我们的 Reids ,和错误的记录处理。
config/queue.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 <?php return [ 'default' => 'redis' , 'connections' => [ 'sync' => [ 'type' => 'sync' , ], 'database' => [ 'type' => 'database' , 'queue' => 'default' , 'table' => 'jobs' , 'connection' => null , ], 'redis' => [ 'type' => 'redis' , 'queue' => 'default' , 'host' => '127.0.0.1' , 'port' => 6379 , 'password' => '' , 'select' => 0 , 'timeout' => 0 , 'persistent' => false , ], ], 'failed' => [ 'type' => 'Database' , 'table' => 'failed_jobs' , ], ];
创建一个生产者的命令。使用命令而不是 HTTP 请求,主要考虑了超时问题。换成命令行方式,相当于我们可以把参数直接通过命令行中传进来。
1 $ php think make:command Reissue reissue
app/command/Reissue.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 <?php namespace app \command ;use think \facade \Db ;use think \facade \Queue ;use think \console \Command ;use think \console \Input ;use think \console \input \Argument ;use think \console \input \Option ;use think \console \Output ;class Reissue extends Command { protected function configure ( ) { $this ->setName ('reissue' ) ->setDescription ('补偿' ) ->addOption ('key' , null , Option ::VALUE_REQUIRED , '补偿标识 ID' , null ) ->addOption ('pid' , null , Option ::VALUE_REQUIRED , '补偿发放道具ID' , 1 ) ->addOption ('value' , null , Option ::VALUE_REQUIRED , '补偿发放道具数量' , null ); } protected function execute (Input $input , Output $output ) { $reissue_key = (string ) $input ->getOption ('key' ); $reissue_pid = (int ) $input ->getOption ('pid' ); if (!$reissue_pid ) { $output ->writeln ('补偿发放道具ID需要大于 0' ); return ; } $reissue_value = (int ) $input ->getOption ('value' ); if (!$reissue_value ) { $output ->writeln ('补偿发放道具数量需要大于 0' ); return ; } $sql = sprintf ('SELECT `users`.`id` FROM `users` LEFT JOIN `users_props_logs` ON `users_props_logs`.`uid` = `users`.`id` AND `users_props_logs`.`pid` = %d AND `users_props_logs`.`name` = "%s" WHERE `users_props_logs`.`id` IS NULL' , $reissue_pid , $reissue_key ); $data = Db ::query ($sql ); if (!count ($data )) { $output ->writeln ('未查询到满足条件的' ); return ; } $data = array_column ($data , 'id' ); $output ->writeln ('返回数:' . count ($data )); foreach ($data as $uid ) { Queue ::push ('app\job\Reissue' , [ 'uid' => $uid , 'reissue_key' => $reissue_key , 'reissue_pid' => $reissue_pid , 'reissue_value' => $reissue_value , ]); } } }
命令已经完成了,我们可以打印下 help 看看。
1 2 3 4 5 6 7 8 9 10 $ php think reissue --help Usage: reissue [options] Options: --key=KEY 补偿标识 ID --pid=PID 补偿发放道 ID [default: 1] --value=VALUE 补偿发放道 数量
执行生产命令。
1 2 $ php think reissue --key "reissue_安慰奖_2022_06_20_1400" --pid 1 --value 100 返回数:10032
可以看见 Redis 中已经是有数据了的。
那现在我们要来处理消费这一边了。
app\job\Reissue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 <?php namespace app \job ;use think \facade \Db ;use think \facade \Log ;use think \queue \Job ;class Reissue { public function fire (Job $job , $data ) { if (!isset ($data ['reissue_key' ]) || !isset ($data ['reissue_pid' ]) || !isset ($data ['reissue_value' ]) || !isset ($data ['uid' ])) { $job ->delete (); return ; } $uid = $data ['uid' ]; $reissue_key = $data ['reissue_key' ]; $reissue_pid = $data ['reissue_pid' ]; $reissue_value = $data ['reissue_value' ]; $log_prefix = sprintf ('[游戏补发][%s][%d]' , $reissue_key , $uid ); Log ::write ($log_prefix . "开始处理" ); Db ::startTrans (); $has_complete = false ; try { $history = Db ::table ('users_props_logs' ) ->lock (true ) ->where ('pid' , $reissue_pid ) ->where ('uid' , $uid ) ->column ('distinct name' ); if ($history && in_array ($reissue_key , $history )) { $has_complete = true ; throw new \Exception ('已经发放了' ); } $props = Db ::table ('users_props_logs' ) ->lock (true ) ->field ('pid, uid, surplus' ) ->where ('pid' , $reissue_pid ) ->where ('uid' , $uid ) ->order ('id desc' ) ->find (); if (!$props ) { $props = [ 'pid' => $reissue_pid , 'uid' => $uid , 'surplus' => 0 , ]; $has_complete = true ; throw new \Exception ('没有初始化?' ); } Db ::table ('users_props_logs' ) ->insert ([ 'pid' => $props ['pid' ], 'uid' => $props ['uid' ], 'name' => $reissue_key , 'value' => $reissue_value , 'surplus' => $props ['surplus' ] ? (int ) bcadd ($props ['surplus' ], $reissue_value ) : $reissue_value , ]); Db ::commit (); Log ::write ($log_prefix . "成功处理" ); } catch (\Exception $e ) { Db ::rollback (); if ($has_complete ) { Log ::write ($log_prefix . $e ->getMessage ()); $job ->delete (); } else { Log ::write ($log_prefix . "处理失败:" . $e ->getMessage ()); $job ->failed ($e ); } } } }
上面代码中,可能会觉得奇怪,为什么完成了处理,还要抛出来一个异常呢?
其实是一开始我们在获取这个用户相同货币的日志记录时,有一个锁锁住了,然后我们后续的流程,比如检查到已经发放了,那这时候下面的流程是不会继续走了的,但是锁还是在的,我们要怎么去解开呢?
在 try 中执行回滚操作来解开锁吗?好像不是很优雅的,这样子也会导致逻辑比较混乱。
所以就采用了 抛出异常+标记完成 的方式,来进行一个解锁处理,相当于 catch 中,我们不管什么情况都要进行一个回滚,但接下来的处理,比如这个消费是不是成功的,要不要删除,则要看有没有标记成功了的。
开始执行下我们的消费队列,这里的命令参数可以自行 help 看一下。
1 2 3 4 5 6 7 $ php think queue:work --queue=default --tries=3 --timeout=5 [2022-06-20 14:19:44][qUideswfqZJgoreNrDhHxZJZcmuQxuam] Processing: app\job\Reissue [2022-06-20 14:19:44][qUideswfqZJgoreNrDhHxZJZcmuQxuam] Processed: app\job\Reissue [2022-06-20 14:19:44][tuYZQBjGQkfRyMdmTAXEnXmzXLMhvBDg] Processing: app\job\Reissue [2022-06-20 14:19:44][tuYZQBjGQkfRyMdmTAXEnXmzXLMhvBDg] Processed: app\job\Reissue [2022-06-20 14:19:44][QrnlFUaJBbvjVRbcZfByoosfHkKncVLI] Processing: app\job\Reissue [2022-06-20 14:19:44][QrnlFUaJBbvjVRbcZfByoosfHkKncVLI] Processed: app\job\Reissue
但好像出现了点问题,我在代码里面加了个随机抛出异常,但数据表中还是一片空白,经过的依赖的排查,原来这块需要我们手动进行一个处理,驱动、队列名称这些好像都不能直接获取,需要我们写死来赋值。
app\job\Reissue
1 2 3 4 5 6 7 public function failed ($payload , $e ) { $connection = 'redis' ; $queue = 'default' ; app ()->get ('queue.failer' )->log ($connection , $queue , $payload , $e ); }
OK,这样子我们的队列也能成功的记录到了错误的信息。
如果消息少,那我们可能开一个消费终端就可以来处理了,如果消息多了,也可以通过服务器、MySQL、Redis 的监控,然后在慢慢增加处理的消费终端,就如同下图一样。
如果你使用了 supervisor 来让消费者一直处于后台运行状态,记得有个 numprocs
变量,可以增加你同时参与监听的消费者数量,更好进行减少消息堆积。