Skip to content

Commit

Permalink
优化完善 提升Queue的命名空间
Browse files Browse the repository at this point in the history
  • Loading branch information
yunwuxin committed Nov 30, 2016
1 parent 8ce6895 commit 9a9d5e0
Show file tree
Hide file tree
Showing 24 changed files with 195 additions and 120 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
> composer require topthink/think-queue
## 配置
> 配置文件位于 `application/extra/queue.php`
### 公共配置

```
'queue'=>[
'type'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动,topthink:Topthink驱动
[
'connector'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动,topthink:Topthink驱动
//或其他自定义的完整的类名
]
```

### 驱动配置
> 各个驱动的具体可用配置项在`think\queue\driver`目录下各个驱动类里的`options`属性中,写在上面的`queue`配置里即可覆盖
> 各个驱动的具体可用配置项在`think\queue\connector`目录下各个驱动类里的`options`属性中,写在上面的`queue`配置里即可覆盖

## 使用 Database
Expand Down Expand Up @@ -108,7 +109,7 @@ class Job2{


## 发布任务
> `think\queue\Queue:push($job, $data = '', $queue = null)``think\queue\Queue::later($delay, $job, $data = '', $queue = null)` 两个方法,前者是立即执行,后者是在`$delay`秒后执行
> `think\Queue:push($job, $data = '', $queue = null)``think\Queue::later($delay, $job, $data = '', $queue = null)` 两个方法,前者是立即执行,后者是在`$delay`秒后执行
`$job` 是任务名
单模块的,且命名空间是`app\job`的,比如上面的例子一,写`Job1`类名即可
Expand Down
15 changes: 12 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
{
"name": "topthink/think-queue",
"description": "The ThinkPHP5 Queue Package",
"type": "think-extend",
"authors": [
{
"name": "yunwuxin",
"email": "[email protected]"
}
],
"license": "Apache-2.0",
"minimum-stability": "dev",
"autoload": {
"psr-4": {
"think\\queue\\": "src"
"think\\": "src"
},
"files": [
"src/config.php"
"src/common.php"
]
},
"require": {
"topthink/think-helper": "^1.0",
"topthink/think-installer": ">=1.0.10"
},
"extra": {
"think-config": {
"queue": "src/config.php"
}
}
}
84 changes: 25 additions & 59 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,75 +9,41 @@
// | Author: yunwuxin <[email protected]>
// +----------------------------------------------------------------------

namespace think\queue;

use think\Config;

namespace think;

use think\helper\Str;
use think\queue\Connector;

/**
* Class Queue
* @package think\queue
*
* @method static push($job, $data = '', $queue = null)
* @method static later($delay, $job, $data = '', $queue = null)
* @method static pop($queue = null)
* @method static marshal()
*/
class Queue
{
protected static $instance = [];

/**
* 添加任务到队列
* @param $job
* @param string $data
* @param null $queue
*/
public static function push($job, $data = '', $queue = null)
{
self::handle()->push($job, $data, $queue);

}
/** @var Connector */
protected static $connector;

/**
* 添加延迟任务到队列
* @param $delay
* @param $job
* @param string $data
* @param null $queue
*/
public static function later($delay, $job, $data = '', $queue = null)
{
self::handle()->later($delay, $job, $data, $queue);
}

/**
* 获取第一个任务
* @param null $queue
* @return mixed
*/
public static function pop($queue = null)
{
if (!method_exists(self::handle(), 'pop'))
throw new \RuntimeException('pop queues not support for this type');

return self::handle()->pop($queue);
}


/**
* 由订阅的推送执行任务
*/
public static function marshal()
{
if (!method_exists(self::handle(), 'marshal'))
throw new \RuntimeException('push queues not support for this type');

self::handle()->marshal();
}

private static function handle()
private static function buildConnector()
{
$options = Config::get('queue');
$type = !empty($options['type']) ? $options['type'] : 'Sync';

if (!isset(self::$instance[$type])) {
if (!isset(self::$connector)) {

$class = false !== strpos($type, '\\') ? $type : '\\think\\queue\\driver\\' . ucwords($type);
$class = false !== strpos($type, '\\') ? $type : '\\think\\queue\\connector\\' . Str::studly($type);

self::$instance[$type] = new $class($options);
self::$connector = new $class($options);
}
return self::$instance[$type];
return self::$connector;
}

public static function __callStatic($name, $arguments)
{
return call_user_func_array([self::buildConnector(), $name], $arguments);
}
}
17 changes: 17 additions & 0 deletions src/common.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <[email protected]>
// +----------------------------------------------------------------------

\think\Console::addDefaultCommands([
"think\\queue\\command\\Work",
"think\\queue\\command\\Restart",
"think\\queue\\command\\Listen",
"think\\queue\\command\\Subscribe"
]);
11 changes: 4 additions & 7 deletions src/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <[email protected]>
// +----------------------------------------------------------------------

\think\Console::addDefaultCommands([
"think\\queue\\command\\Work",
"think\\queue\\command\\Restart",
"think\\queue\\command\\Listen",
"think\\queue\\command\\Subscribe"
]);
return [
'connector' => 'Sync'
];
36 changes: 36 additions & 0 deletions src/queue/CallQueuedHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <[email protected]>
// +----------------------------------------------------------------------

namespace think\queue;

class CallQueuedHandler
{

public function call(Job $job, array $data)
{
$command = unserialize($data['command']);

call_user_func([$command, 'handle']);

if (!$job->isDeletedOrReleased()) {
$job->delete();
}
}

public function failed(array $data, $e)
{
$command = unserialize($data['command']);

if (method_exists($command, 'failed')) {
$command->failed($e);
}
}
}
69 changes: 69 additions & 0 deletions src/queue/Connector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <[email protected]>
// +----------------------------------------------------------------------

namespace think\queue;

use InvalidArgumentException;

abstract class Connector
{
protected $options = [];

abstract function push($job, $data = '', $queue = null);

abstract function later($delay, $job, $data = '', $queue = null);

abstract public function pop($queue = null);

public function marshal()
{
throw new \RuntimeException('pop queues not support for this type');
}

protected function createPayload($job, $data = '', $queue = null)
{
if (is_object($job)) {
$payload = json_encode([
'job' => 'think\queue\CallQueuedHandler@call',
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
]);
} else {
$payload = json_encode($this->createPlainPayload($job, $data));
}

if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
}

return $payload;
}

protected function createPlainPayload($job, $data)
{
return ['job' => $job, 'data' => $data];
}

protected function setMeta($payload, $key, $value)
{
$payload = json_decode($payload, true);
$payload[$key] = $value;
$payload = json_encode($payload);

if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
}

return $payload;
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
11 changes: 3 additions & 8 deletions src/driver/Database.php → src/queue/connector/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
// | Author: yunwuxin <[email protected]>
// +----------------------------------------------------------------------

namespace think\queue\driver;
namespace think\queue\connector;

use think\Db;
use think\queue\Connector;
use think\queue\job\Database as DatabaseJob;

class Database
class Database extends Connector
{
protected $db;

Expand Down Expand Up @@ -44,12 +45,6 @@ public function later($delay, $job, $data = '', $queue = null)
return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
}


protected function createPayload($job, $data)
{
return json_encode(['job' => $job, 'data' => $data]);
}

public function pop($queue = null)
{
$queue = $this->getQueue($queue);
Expand Down
Loading

0 comments on commit 9a9d5e0

Please sign in to comment.