前公司主要是做游戏业务的,有时候搞活动,发发安慰奖或者像王者荣耀老是 BUG 然后维护接着发补偿,在游戏行业是很正常的。
首先要了解下我们发送的用户群大概有多少人,像我们的业务一般就几十万人也不是很多,不过我们发放除了自身业务的处理,可能还要调用甲方供应商的接口进行处理,不过这个不是这里重点。
这篇文章的标题写的是简易版,所以我们使用的方法或者流程其实还是比较简单的。假设我们要给 50W 人发补偿金币,我们会采取多个请求来访问同一个补偿发放接口,通过一些参数让每一次请求都在分段处理业务。
通过监控服务器、数据库的使用情况,来决定要不要增加或者减少访问的请求,或者说你理解成增加或减少消息队列的消费者,但我们这里不是用到消息队列的。
那么要如何分段的处理数据呢?其实就是利用了 SQL 中的 offset 、limit 进行分割,在请求 URL 中带上要分割的值就可以让每一个请求都处理不相同的了。
但是哈,这个不相同其实是很难做到的,假设每个请求都是处理 5W 个用户,A 请求后过一会我们才开始了 B 请求,这时候 A 和 B 拿到要处理的用户列表可能就存在部分相同了,也可能这两个请求相同的用户会在同一时间进行处理,这时就必须通过锁来进行一个阻塞操作了,避免处理了多次相同内容。
同时,请求还要注意会不会 502 之类的请求中止,这时候就需要重新运行了。另外还需要在全部请求完成后在进行一次兜底处理,在查询一次是不是有用户满足条件然后没有被处理到。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
| <?php namespace app\controller;
use app\BaseController; use think\facade\Db; use think\facade\Log;
class Reissue extends BaseController { public function index() { $reissue_key = 'reissue_安慰奖_2022_06_03_1550';
$reissue_pid = 1;
$reissue_value = 100;
$page = (int) request()->param('page');
$limit = (int) request()->param('limit');
if($page < 0) { $page = 1; }
if($limit <= 0) { $limit = 10000; }
$sql = sprintf('SELECT `users`.`id` FROM `users` LEFT JOIN `users_props_logs` ON `users_props_logs`.`uid` = `users`.`id` AND `users_props_logs`.`pid` = %d AND `users_props_logs`.`name` = "%s" WHERE `users_props_logs`.`id` IS NULL LIMIT %d, %d', $reissue_pid, $reissue_key, ($page ? $page - 1 : 0) * $limit, $limit);
$data = Db::query($sql);
if(!count($data)) { return '未查询到满足条件的'; }
$data = array_column($data, 'id');
foreach ($data as $uid) { $log_prefix = sprintf('[游戏补发][%s][%d]', $reissue_key, $uid);
Log::write($log_prefix . "开始处理");
Db::startTrans();
try { $history = Db::table('users_props_logs') ->lock(true) ->where('pid', $reissue_pid) ->where('uid', $uid) ->column('distinct name');
if($history && in_array($reissue_key, $history)) { throw new \Exception('已经发放了'); }
$props = Db::table('users_props_logs') ->lock(true) ->field('pid, uid, surplus') ->where('pid', $reissue_pid) ->where('uid', $uid) ->order('id desc') ->find();
if(!$props) { $props = [ 'pid' => $reissue_pid, 'uid' => $uid, 'surplus' => 0, ];
throw new \Exception('没有初始化?'); }
Db::table('users_props_logs') ->insert([ 'pid' => $props['pid'], 'uid' => $props['uid'], 'name' => $reissue_key, 'value' => $reissue_value, 'surplus' => $props['surplus'] ? (int) bcadd($props['surplus'], $reissue_value) : $reissue_value, ]);
Db::commit();
Log::write($log_prefix . "成功处理"); } catch (\Exception $e) { Db::rollback();
Log::write($log_prefix . "处理失败:" . $e->getMessage()); } } } }
|
在上面代码中,开启事务的处理流程还是有点问题的,比如我现在 A 请求就是处理全部,我现在 A 请求 请求了多次,可能就是第一个请求是正常能进行道具发放的,后面几个被阻塞了,等拿到返回值的时候,它已经不满足继续执行下去的要求了。
其实这里我们可以借助 Redis 锁,可以看一下我之前的文章:FastAdmin 锁的使用
我们在开启事务前,先获取一个处理当前用户的锁,如果能获取到那就走事务流程,如果不可以说明可能有其他请求在处理这个用户了,那这时这个请求就可以避开这个用户继续去走循环了,这时效率就提升上来了,而且相比文章开头提到的做法,现在的这个处理可能就更按照用户的顺序来处理了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| use Predis\Client; use Symfony\Component\Lock\LockFactory; use Symfony\Component\Lock\Store\RedisStore;
$redis = new Client('tcp://127.0.0.1:6379');
$store = new RedisStore($redis); $factory = new LockFactory($store);
foreach ($data as $uid) { $lock = $factory->createLock(sprintf('reissue:%s:%d', md5($reissue_key), $uid), 300, true);
if(!$lock->acquire()) { continue; }
Db::startTrans();
try { Db::commit(); } catch (\Exception $e) { Db::rollback(); } finally { $lock->release(); } }
|