官方文档的例子有点粗糙,先优化下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| <?php
namespace app\job;
use think\queue\Job;
class Demo { public function fire (Job $job, $data) {
if ($job->attempts() > 3) { $e = new \Exception('执行失败次数已达上限。');
$job->failed($e);
$job->delete();
return; }
$job->delete();
$delay = 30;
$job->release($delay); }
public function failed ($data) { } }
|
当任务处理成功后,切记一定要 return;
,因为忘记返回,我一直在排查业务代码有没有问题。
业务前期,我们对队列的需求不是很高,出于可用性和尽早数据最终一致性考虑,我们希望队列处理也可以兼容同步的方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public function fire (Job $job, $data) { $result = $this->handle($data);
if($result === true) { $job->delete();
return; }
if($job->attempts() > 3) { $e = new \Exception('执行失败次数已达上限。');
$job->failed($e);
$job->delete();
return; }
$job->release(30);
return; }
public function handle($data) :bool { return true; }
|
业务同步处理:
1 2 3 4 5
| $result = (new app\job\Demo())->handle($data);
if($result === true) { }
|
后面,我们把微信支付的回调也放到队列中处理,那不可避免我们需要让队列能完成事务。因为前期我们封装了一层 handle
,所以我们处理是非常方便的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public function fire (Job $job, $data) { Db::startTrans();
try { $result = $this->handle($data);
if($result === true) { Db::commit();
$job->delete();
return; } } catch (\Exception $e) { Db::rollback(); } }
|
事务处理业务中,我们又发现一些业务可能存在不需要提交事务,但是需要删除任务的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
|
protected $null_commit = false;
public function fire (Job $job, $data) { Db::startTrans();
try { $result = $this->handle($data);
if($result === true) { if($this->null_commit) { Db::rollback(); } else { Db::commit(); }
$job->delete();
return; } } catch (\Exception $e) { Db::rollback(); } }
public function handle($data) :bool { if(!is_array($data)) { $this->null_commit = true;
return true; }
return true; }
|
当然,我们如果每个队列需求都维护这么一个文件实在太麻烦了,所以我们对一些业务放在了同一个处理队列中,通过类型进行一个区分,即我们的最终版。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
| <?php
namespace app\job;
use think\Db; use think\queue\Job;
class Demo { const TYPE_USER_REGISTER = 1;
protected $null_commit = false;
public function fire (Job $job, $data) { Db::startTrans();
try { $result = $this->handle($data);
if ($result === true) { if ($this->null_commit) { Db::rollback(); } else { Db::commit(); }
$job->delete();
return; } } catch (\Exception $e) { Db::rollback(); }
if ($job->attempts() > 3) { $e = new \Exception('执行失败次数已达上限。');
$job->failed($e);
$job->delete();
return; }
$job->release(30);
return; }
public function handle ($data): bool { if (!is_array($data)) { $this->null_commit = true;
return true; }
if (false) { $data = [ 'type' => 1, 'uid' => 1, 'data' => [ 'ip' => '8.8.8.8', 'ua' => 'xxx', ], ]; }
if (!(isset($data['type']) && isset($data['uid']) && isset($data['data']) && is_integer($data['type']) && is_array($data['data']) )) { $this->null_commit = true;
return true; }
$type = $data['type']; $uid = $data['uid']; $data = $data['data'];
if ($type === Demo::TYPE_USER_REGISTER) { return $this->TYPE_USER_REGISTER($uid, $data); }
return true; }
public function failed ($data) { }
protected function TYPE_USER_REGISTER ($uid, array $data): bool { return true; } }
|
业务使用:
1 2 3 4 5 6 7 8
| think\facade\Queue::push(app\job\Demo::class, [ 'type' => app\job\Demo::TYPE_USER_REGISTER, 'uid' => 1, 'data' => [ 'ip' => '8.8.8.8', 'ua' => 'xxx', ], ]);
|