Skip to content

Commit 4e2d245

Browse files
author
vcorre
committed
Update Pheanstalk class
1 parent 7e5c9ac commit 4e2d245

File tree

2 files changed

+40
-18
lines changed

2 files changed

+40
-18
lines changed

src/Pheanstalk.php

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -264,27 +264,11 @@ private function _dispatch($command)
264264
public function create(Workflow $workflow, $force = false): Workflow
265265
{
266266
try {
267-
$tubes = [];
268-
/** @var Job $job */
269-
foreach ($workflow->getJobs() as $job) {
270-
/** @var Task $task */
271-
foreach ($job->getTasks() as $task) {
272-
$tubes = array_merge($tubes, [$task->getQueue()]);
273-
}
274-
}
275-
foreach ($tubes as $tube) {
276-
if (!$this->getCurrentClass()->tubeExists($tube)) {
277-
$this->getCurrentClass()->createTube(new Tube($tube, 1));
278-
};
279-
}
267+
$this->checkAndCreateTubes($workflow);
280268
$workflow = $this->_dispatch(new Command\CreateCommand($workflow));
281269
} catch (ServerDuplicateEntryException $e) {
282270
if ($force) {
283-
$workflows = $this->_dispatch(new Command\ListWorkflowsCommand());
284-
$workflowToDelete = $workflows->filter(function(Workflow $listedWorkflow) use ($workflow) {
285-
return $listedWorkflow->getName() === $workflow->getName()
286-
&& $listedWorkflow->getGroup() === $workflow->getGroup();
287-
})->first();
271+
$workflowToDelete = $this->findWorkflow($workflow);
288272
$this->getCurrentClass()->delete($workflowToDelete);
289273

290274
return $this->getCurrentClass()->create($workflow);
@@ -295,6 +279,43 @@ public function create(Workflow $workflow, $force = false): Workflow
295279
return $workflow;
296280
}
297281

282+
/**
283+
* @param Workflow $workflow
284+
*
285+
* @return Workflow|bool
286+
* @throws Exception\ClientException
287+
*/
288+
public function findWorkflow(Workflow $workflow)
289+
{
290+
$workflows = $this->_dispatch(new Command\ListWorkflowsCommand());
291+
return $workflows->filter(function(Workflow $listedWorkflow) use ($workflow) {
292+
return $listedWorkflow->getName() === $workflow->getName()
293+
&& $listedWorkflow->getGroup() === $workflow->getGroup();
294+
})->first();
295+
}
296+
297+
/**
298+
* @param Workflow $workflow
299+
*
300+
* @throws Exception\ClientException
301+
*/
302+
public function checkAndCreateTubes(Workflow $workflow)
303+
{
304+
$tubes = [];
305+
/** @var Job $job */
306+
foreach ($workflow->getJobs() as $job) {
307+
/** @var Task $task */
308+
foreach ($job->getTasks() as $task) {
309+
$tubes = array_merge($tubes, [$task->getQueue()]);
310+
}
311+
}
312+
foreach ($tubes as $tube) {
313+
if (!$this->getCurrentClass()->tubeExists($tube)) {
314+
$this->getCurrentClass()->createTube(new Tube($tube, 1));
315+
};
316+
}
317+
}
318+
298319
/**
299320
* {@inheritdoc}
300321
*/

src/PheanstalkInterface.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ public function stats();
186186
* @param Workflow $data The workflow to create
187187
* @param null|bool (optional) $force Will erase already existent old workflow if already exists
188188
*
189+
* @throws Exception\ClientException
189190
* @return Workflow The newly created workflow
190191
*/
191192
public function create(Workflow $data, $force = false): Workflow;

0 commit comments

Comments
 (0)