Skip to content

Commit 6eff60a

Browse files
committed
【新增】v2.0.0
1 parent 6f16206 commit 6eff60a

21 files changed

+6834
-299
lines changed

README.md

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@
1414
### 特性
1515
- 最低毫秒级任务延时
1616
- 自定义重试次数和错误回调
17-
- 自定义超时时间和超时回调
17+
- 自定义超时时间
1818
- 自定义启动预执行函数,方便初始化其他框架与其他项目配合使用
1919
- master协程监听队列降低延时
2020
- 多worker进程消费任务
21-
- worker进程支持一键化协程协程
21+
- worker进程支持一键化协程
2222
- 支持后台守护运行,无需其余进程管理工具
23-
- 支持分布式部署
23+
- 支持多服务器分布式部署
24+
- 高可用,根据超时时间判断,确保进程挂掉后,回调函数执行成功。
2425

2526
### 进程结构图
2627

@@ -45,17 +46,22 @@
4546
|queue[0].memory_limit | int || 128 | 工作进程最大使用内存数(单位mb)(0无限制)|
4647
|queue[0].sleep_seconds | floot || 1 | 监视进程休眠时间(秒,最小到0.001) |
4748
|queue[0].timeout | int || 120 | 超时时间(s)以投递任务方为准 |
48-
|queue[0].fail_number | int || 3 | 最大失败次数以投递任务方为准 |
49-
|queue[0].fail_expire | int || 3 | 失败延时投递时间(s)以投递任务方为准 |
50-
|queue[0].timeout_handle | callable ||| 任务超时执行函数 |
51-
|queue[0].fail_handle | callable ||| 任务失败执行函数 |
49+
|queue[0].fail_number | int || 3 | 最大失败次数以投递任务方为准,达到最大次数前失败后会重新投递 |
50+
|queue[0].fail_expire | int || 3 | 失败延时投递时间(s 支持小数精度到0.001)以投递任务方为准 |
51+
|queue[0].fail_handle | callable ||| 任务失败执行函数(当任务超时或者达到最大投递次数后会执行) |
5252
|queue[0].worker_start_handle | callable ||| worker进程启动加载函数(当前队列有效) |
53+
|queue[0].model | int | 否 | \QueueConfig::MODEl_DISTRIBUTE |队列运行模式(QueueConfig::MODEl_DISTRIBUTE 分发模式 QueueConfig::MODEL_GRAB 抢占模式)
5354

54-
timeout_handle 会传入一个参数 $info 任务详细信息
55+
fail_handle 会传入两个参数 $jobInfo 任务详细信息、$e出错的异常类
56+
fail_handle的执行时间也受timeout的控制
57+
#### 注意:任务超时后会直接记录为失败,不会根据fail_number进行失败重试,可以在fail_handle中根据$jobInfo['type']判断是否是超时任务。
5558

56-
fail_handle 会传入两个参数 $info 任务详细信息、$e出错的异常类
57-
58-
#### 注意:任务超时后会触发timeout_handle会直接记录为失败,不会根据fail_number进行失败重试,也不会触发fail_handle。
59+
##2.0改动
60+
1. 取消超时回调,超时后记录为失败,触发失败回调。
61+
2. 由原来的每次失败都触发失败回调改为,超时或达到失败重试次数后才触发。
62+
3. 增加抢占模式:此模式下worker进程会主动抢任务会增加数据库查询和连接数量(1.0版本的模式为分发模式)
63+
4. 增加高可用性,超时时间适用于失败回调,如果进程在失败回调执行期间挂掉,其余进程会在超时时间到达后再次执行失败回调。
64+
5. 优化分发模式性能,按当前空闲进程数一次获取多个任务进行分发,qps大大提高。
5965

6066
### 配置示例
6167
```

main.php

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@
99
'name' => 'mp-queue-1',//多个服务器同时启动时需要分别设置名字
1010
'driver' => new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
1111
],
12+
'log'=>[
13+
'level'=>\Monolog\Logger::DEBUG
14+
],
1215
'queue' => [
1316
[
1417
'name' => 'test',//队列名称
15-
'timeout_handle' => function () {
16-
var_dump('超时了');
17-
},//超时后触发函数
18-
'fail_handle' => function () {
18+
'fail_handle' => function ($info,$e) {
19+
var_dump(getmypid());
20+
var_dump($info);
21+
var_dump($e);
1922
var_dump('失败了');
2023
},//失败回调函数
2124
],

src/Config/BasicsConfig.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,6 @@ public static function set($name, $pid_path,DriverInterface $driver,$worker_star
4141
*/
4242
public static function pid_file(): string
4343
{
44-
return self::$pid_path . '/mpQueue.pid';
44+
return self::$pid_path . '/mpQueue-'.BasicsConfig::name().'.pid';
4545
}
4646
}

src/Config/Config.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ class Config
4242
'timeout' => 120,//超时时间(s)以投递任务方为准
4343
'fail_number' => 3,//最大失败次数以投递任务方为准
4444
'fail_expire' => 3,//失败延时投递时间(s)投递任务方为准
45-
'timeout_handle' => '', //任务超时触发函数
4645
'fail_handle' => '', //任务失败触发函数
4746
'worker_start_handle' => '',//worker进程启动加载函数
47+
'model'=> QueueConfig::MODEl_DISTRIBUTE,//MODEl_DISTRIBUTE分发模式 QUEUE_GRAB 抢占模式
4848
]
4949
],
5050
],

src/Config/QueueConfig.php

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,18 @@
1313
* @method static timeout() 获取当前队列的任务超时时间 秒
1414
* @method static fail_expire() 获取当前队列的配置对象的间隔等待时间 秒
1515
* @method static fail_number() 获取当前队列的允许最大失败次数
16-
* @method static timeout_handle() 获取超时后需要执行的callable
1716
* @method static fail_handle() 获取失败后需要执行的fail_handle
1817
* @method static worker_start_handle() 当前队列的worker进程启动后执行函数
18+
* @method static model() 队列运行模式分发/抢占
1919
* @package MPQueue\Config
2020
*/
2121
class QueueConfig implements ConfigInterface
2222
{
2323
use Config;
2424

25+
const MODEl_DISTRIBUTE = 1;//分发模式
26+
const MODEL_GRAB = 2;//抢占模式
27+
2528
protected static $queues = [];
2629
protected $name;
2730
protected $worker_number;
@@ -30,11 +33,11 @@ class QueueConfig implements ConfigInterface
3033
protected $timeout;
3134
protected $fail_number;
3235
protected $fail_expire;
33-
protected $timeout_handle;
3436
protected $fail_handle;
3537
protected $worker_start_handle;
38+
protected $model;
3639

37-
public function __construct($name, $worker_number, $memory_limit, $sleep_seconds,$timeout,$fail_number,$fail_expire,$timeout_handle,$fail_handle,$worker_start_handle)
40+
public function __construct($name, $worker_number, $memory_limit, $sleep_seconds,$timeout,$fail_number,$fail_expire,$fail_handle,$worker_start_handle,$model)
3841
{
3942
$this->name = $name;
4043
$this->worker_number = $worker_number;
@@ -43,10 +46,9 @@ public function __construct($name, $worker_number, $memory_limit, $sleep_seconds
4346
$this->timeout = $timeout;
4447
$this->fail_number = $fail_number;
4548
$this->fail_expire = $fail_expire;
46-
$this->timeout_handle = $timeout_handle;
4749
$this->fail_handle = $fail_handle;
4850
$this->worker_start_handle = $worker_start_handle;
49-
51+
$this->model = $model;
5052
}
5153

5254
public static function set($queues)
@@ -61,9 +63,9 @@ public static function set($queues)
6163
$value['timeout'],
6264
$value['fail_number'],
6365
$value['fail_expire'],
64-
$value['timeout_handle'],
6566
$value['fail_handle'],
66-
$value['worker_start_handle']
67+
$value['worker_start_handle'],
68+
$value['model']
6769
);
6870
}
6971
}

src/Job.php

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,21 +62,14 @@ public function getFailExpire()
6262
*/
6363
abstract public function handle();
6464

65-
/**
66-
* 任务超时后被调用(先于队列的handle调用 除非返回false 否则不再调用队列的fail_handle)
67-
* @param array $jobInfo 任务详细信息数组
68-
*/
69-
public function timeout_handle(array $jobInfo){
70-
return false;
71-
}
7265

7366
/**
7467
* 任务失败后被调用
75-
* @param array $jobInfo 任务详细信息数组
68+
* @param array $jobInfo 任务详细信息数组 $jobInfo['type']标识当前任务类型 1正常任务 2超时的任务
7669
* @param \Throwable $e 错误异常对象
7770
* @return mixed (先于队列的handle调用 除非返回false 否则不再调用队列的fail_handle)
7871
*/
79-
public function fail_handle(array $jobInfo,\Throwable $e)
72+
public function fail_handle(array &$jobInfo,\Throwable $e)
8073
{
8174
return false;
8275
}

src/Log/Log.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,14 @@ public static function getDriver(): LogDriverInterface
5252

5353
public static function __callStatic($name, $arguments)
5454
{
55-
$arguments[0] = '['.ProcessConfig::queue().':'.ProcessConfig::getType().':'.getmypid().']'.$arguments[0];
55+
$arguments[0] = '['.ProcessConfig::queue().':'.ProcessConfig::getType().':'.getmypid().']'.":'".$arguments[0];
5656
if(call_user_func_array([self::getDriver(),$name],$arguments) && !ProcessConfig::daemon()){
5757
//记录成功并且非后台运行则打印对应信息到屏幕
5858
OutPut::{self::$levelOut[$name]}(
5959
(new \DateTimeImmutable())->format('Y-m-d\TH:i:s.v')
6060
."{$name}".$arguments[0]
6161
.(!empty($arguments[1])?JsonSerialize::serialize($arguments[1]):'')
62-
."\n");
62+
."'\n");
6363
}
6464
}
6565

src/Process/ManageProcess.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ class ManageProcess
2525
public function __construct()
2626
{
2727
swoole_set_process_name('mpq:manage');
28-
2928
}
3029

3130
/**

src/Process/MasterProcess.php

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ public function __construct(Process $process, string $queue)
4141
$this->queue = $queue;
4242
$this->manageClient = new MasterProcessClient($process->exportSocket(), $this);
4343
$this->queueDriver = new Queue(BasicsConfig::driver(),$queue);
44-
$this->status = ProcessConfig::STATUS_IDLE;
4544
$this->workerChannel = new Channel(QueueConfig::worker_number());
4645
}
4746

@@ -67,11 +66,12 @@ public function start()
6766
$this->setOver();
6867
$this->getStatus();
6968
Log::debug("子进程启动成功");
70-
//必须添加阻塞程序,否则异步信号监听不生效(协程等待时间异步信号监听会被阻塞)
71-
while (true) {
72-
Coroutine::sleep(0.001);
73-
}
74-
69+
//必须添加阻塞程序,否则异步信号监听不生效(协程socket连接等待时间异步信号监听会被阻塞)
70+
go(function (){
71+
while (true) {
72+
sleep(0);
73+
}
74+
});
7575
}
7676

7777

@@ -125,24 +125,30 @@ protected function monitorQueue()
125125
{
126126
//队列定时执行任务
127127
$this->queueDriver->timerInterval();
128-
$this->queueDriver->popInterval();
129-
//处理队列任务
130-
go(function () {
131-
while (true) {
132-
$pid = $this->workerChannel->pop();
133-
$idInfo = $this->queueDriver->pop();
134-
$this->status = ProcessConfig::STATUS_BUSY;
135-
if (!empty($this->workProcess[$pid])) {
136-
Log::debug("获取到队列任务", $idInfo);
137-
$this->status = ProcessConfig::STATUS_BUSY;
138-
//如果进程存在则进行分发
139-
$this->setWorkerStatus(ProcessConfig::STATUS_BUSY, $pid);
140-
$this->sendToWorker($pid, 'consume' . $idInfo['type'], ['id' => $idInfo['id']]);
141-
Log::debug('将队列任务分发给woker进程:' . $pid, $idInfo);
128+
if(QueueConfig::model() == QueueConfig::MODEl_DISTRIBUTE) {
129+
//分发队列任务
130+
go(function () {
131+
while (true) {
132+
$pid = $this->workerChannel->pop();
133+
$ids = $this->queueDriver->pop($this->workerChannel->length() + 1);
134+
Log::debug("获取到队列任务", $ids);
135+
$this->queueDriver->setWorkerNumber(0 - count($ids));
136+
foreach ($ids as $id) {
137+
do{
138+
if (!empty($this->workProcess[$pid])) {
139+
//如果进程存在则进行分发
140+
$this->setWorkerStatus(ProcessConfig::STATUS_BUSY, $pid);
141+
$this->sendToWorker($pid, 'consumeJob', ['id' => $id]);
142+
Log::debug('将队列任务分发给woker进程:' . $pid, [$id]);
143+
$pid = 0;
144+
break;
145+
}
146+
}while($pid = $this->workerChannel->pop());
147+
}
142148
}
143-
$this->status = ProcessConfig::STATUS_IDLE;
144-
}
145-
});
149+
});
150+
}
151+
$this->setStatus(ProcessConfig::STATUS_BUSY);
146152
}
147153

148154

@@ -161,6 +167,13 @@ public function getStatus(){
161167
$this->sendToManage('setProcessStatus', ['status' => $this->status,'startTime'=>$this->startTime]);
162168
}
163169

170+
public function setStatus($status){
171+
if($status != $this->status){
172+
$this->status = $status;
173+
$this->getStatus();
174+
}
175+
}
176+
164177
/**
165178
* 向manage进程发送信息
166179
* @param string $type
@@ -243,6 +256,13 @@ public function setWorkerStatus($status, $pid)
243256
$this->workProcess[$pid]['status'] = $status;
244257
switch ($status) {
245258
case ProcessConfig::STATUS_IDLE:
259+
$count = 1;
260+
foreach ($this->workProcess as $value){
261+
if($value['status'] === ProcessConfig::STATUS_IDLE){
262+
$count++;
263+
}
264+
}
265+
$this->queueDriver->setWorkerNumber($count);
246266
$this->workerChannel->push($pid);
247267
break;
248268
}

0 commit comments

Comments
 (0)