Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements system signal handling for archiving interruption / termination #22487

Open
wants to merge 11 commits into
base: 5.x-dev
Choose a base branch
from
47 changes: 45 additions & 2 deletions core/CliMulti.php
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ class CliMulti
*/
private $logger;

/**
* @var int|null
*/
private $signal = null;

public function __construct(LoggerInterface $logger = null)
{
$this->supportsAsync = $this->supportsAsync();
Expand All @@ -103,6 +108,26 @@ public function __construct(LoggerInterface $logger = null)
$this->logger = $logger ?: new NullLogger();
}

public function handleSignal(int $signal): void
{
$this->signal = $signal;

if (\SIGTERM !== $signal) {
return;
}

foreach ($this->processes as $process) {
if ($process instanceof ProcessSymfony) {
sgiehl marked this conversation as resolved.
Show resolved Hide resolved
$this->logger->debug(
'Aborting command: {command} [method = asyncCliSymfony]',
['command' => $process->getCommandLine()]
);

$process->stop(0);
}
}
}

/**
* It will request all given URLs in parallel (async) using the CLI and wait until all requests are finished.
* If multi cli is not supported (eg windows) it will initiate an HTTP request instead (not async).
Expand All @@ -124,13 +149,21 @@ public function request(array $piwikUrls)
}
}

$chunks = array($piwikUrls);
$chunks = [$piwikUrls];

if ($this->concurrentProcessesLimit) {
$chunks = array_chunk($piwikUrls, $this->concurrentProcessesLimit);
}

$results = array();
$results = [];

foreach ($chunks as $urlsChunk) {
if (null !== $this->signal) {
$this->logSkippedRequests($urlsChunk);

continue;
}

$results = array_merge($results, $this->requestUrls($urlsChunk));
}

Expand Down Expand Up @@ -616,6 +649,16 @@ private function requestUrls(array $piwikUrls)
return $results;
}

private function logSkippedRequests(array $urls): void
{
foreach ($urls as $url) {
$this->logger->debug(
'Skipped climulti:request after abort signal received: {url}',
['url' => $url]
);
}
}

private static function getSuperUserTokenAuth()
{
return Piwik::requestTemporarySystemAuthToken('CliMultiNonAsyncArchive', 36);
Expand Down
96 changes: 89 additions & 7 deletions core/CronArchive.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
use Piwik\Plugins\UsersManager\API as APIUsersManager;
use Piwik\Plugins\UsersManager\UserPreferences;
use Piwik\Log\LoggerInterface;
use Piwik\Scheduler\Scheduler;

/**
* ./console core:archive runs as a cron and is a useful tool for general maintenance,
Expand Down Expand Up @@ -230,6 +231,28 @@ class CronArchive
*/
private $supportsAsync;

/**
* @var null|int
*/
private $signal = null;

/**
* @var CliMulti|null
*/
private $cliMultiHandler = null;

/**
* @var Scheduler|null
*/
private $scheduler = null;

private $step = 0;

private const STEP_INIT = 1;
private const STEP_ARCHIVING = 2;
private const STEP_SCHEDULED_TASKS = 3;
private const STEP_FINISH = 4;

/**
* Constructor.
*
Expand Down Expand Up @@ -272,16 +295,50 @@ public function main()
$self = $this;
Access::doAsSuperUser(function () use ($self) {
try {
$this->step = self::STEP_INIT;
$self->init();

$this->step = self::STEP_ARCHIVING;
$self->run();

$this->step = self::STEP_SCHEDULED_TASKS;
$self->runScheduledTasks();

$this->step = self::STEP_FINISH;
$self->end();
} catch (StopArchiverException $e) {
$this->logger->info("Archiving stopped by stop archiver exception" . $e->getMessage());
}
});
}

public function handleSignal(int $signal): void
{
$this->logger->info('Received system signal to stop archiving: ' . $signal);

$this->signal = $signal;

// initialisation and finishing can be stopped directly.
if (in_array($this->step, [self::STEP_INIT, self::STEP_FINISH])) {
$this->logger->info('Archiving stopped');
exit;
}

// stop archiving
if (!empty($this->cliMultiHandler)) {
$this->logger->info('Trying to stop running cli processes...');
$this->cliMultiHandler->handleSignal($signal);
}

// stop scheduled tasks
if (!empty($this->scheduler)) {
$this->logger->info('Trying to stop running tasks...');
$this->scheduler->handleSignal($signal);
}

// Note: finishing the archiving process will be handled in `run()`
}

public function init()
{
$this->segmentArchiving = StaticContainer::get(SegmentArchiving::class);
Expand Down Expand Up @@ -386,6 +443,11 @@ public function run()
$queueConsumer->setMaxSitesToProcess($this->maxSitesToProcess);

while (true) {
if (null !== $this->signal) {
$this->logger->info("Archiving will stop now because signal to abort received");
return;
}

if ($this->isMaintenanceModeEnabled()) {
$this->logger->info("Archiving will stop now because maintenance mode is enabled");
return;
Expand Down Expand Up @@ -498,18 +560,29 @@ private function launchArchivingFor($archives, QueueConsumer $queueConsumer)
return 0; // all URLs had no visits and were using the tracker
}

$cliMulti = $this->makeCliMulti();
$cliMulti->timeRequests();
$this->cliMultiHandler = $this->makeCliMulti();
$this->cliMultiHandler->timeRequests();

$responses = $cliMulti->request($urls);
$responses = $this->cliMultiHandler->request($urls);

$this->disconnectDb();

$timers = $cliMulti->getTimers();
$timers = $this->cliMultiHandler->getTimers();
$successCount = 0;

foreach ($urls as $index => $url) {
$content = array_key_exists($index, $responses) ? $responses[$index] : null;

if (null !== $this->signal && empty($content)) {
// processes killed by system
$idinvalidation = $archivesBeingQueried[$index]['idinvalidation'];

$this->model->releaseInProgressInvalidation($idinvalidation);
$this->logger->info('Archiving process killed, reset invalidation with id ' . $idinvalidation);

continue;
}

$checkInvalid = $this->checkResponse($content, $url);

$stats = json_decode($content, $assoc = true);
Expand All @@ -527,7 +600,6 @@ private function launchArchivingFor($archives, QueueConsumer $queueConsumer)

$visitsForPeriod = $this->getVisitsFromApiResponse($stats);


$this->logArchiveJobFinished(
$url,
$timers[$index],
Expand All @@ -537,7 +609,6 @@ private function launchArchivingFor($archives, QueueConsumer $queueConsumer)
!$checkInvalid
);


$this->deleteInvalidatedArchives($archivesBeingQueried[$index]);

$this->repairInvalidationsIfNeeded($archivesBeingQueried[$index]);
Expand Down Expand Up @@ -618,6 +689,11 @@ public function getErrors()
*/
public function end()
{
if (null !== $this->signal) {
// Skip if abort signal has been received
return;
}

/**
* This event is triggered after archiving.
*
Expand Down Expand Up @@ -650,6 +726,11 @@ public function logFatalError($m)

public function runScheduledTasks()
{
if (null !== $this->signal) {
// Skip running scheduled task if abort signal has been received
return;
}

$this->logSection("SCHEDULED TASKS");

if ($this->disableScheduledTasks) {
Expand All @@ -673,7 +754,8 @@ public function runScheduledTasks()
// enable/disable the task
Rules::$disablePureOutdatedArchive = true;

CoreAdminHomeAPI::getInstance()->runScheduledTasks();
$this->scheduler = StaticContainer::get(Scheduler::class);
$this->scheduler->run();

$this->logSection("");
}
Expand Down
40 changes: 39 additions & 1 deletion core/Plugin/ConsoleCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
namespace Piwik\Plugin;

use Symfony\Component\Console\Command\Command as SymfonyCommand;
use Symfony\Component\Console\Command\SignalableCommandInterface;
use Symfony\Component\Console\Exception\LogicException;
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Helper\QuestionHelper;
Expand All @@ -28,7 +29,7 @@
*
* @api
*/
class ConsoleCommand extends SymfonyCommand
class ConsoleCommand extends SymfonyCommand implements SignalableCommandInterface
{
/**
* @var ProgressBar|null
Expand Down Expand Up @@ -124,6 +125,43 @@ final public function run(InputInterface $input, OutputInterface $output): int
return parent::run($input, $output);
}

/**
* Method is final to make it impossible to overwrite it in plugin commands
* use getSystemSignalsToHandle() instead.
*
* @return array<int>
*/
final public function getSubscribedSignals(): array
{
return $this->getSystemSignalsToHandle();
}

/**
* Method is final to make it impossible to overwrite it in plugin commands
* use handleSystemSignal() instead.
*/
final public function handleSignal(int $signal): void
{
$this->handleSystemSignal($signal);
}

/**
* Returns the list of system signals to subscribe.
*
* @return array<int>
*/
public function getSystemSignalsToHandle(): array
{
return [];
}

/**
* The method will be called when the application is signaled.
*/
public function handleSystemSignal(int $signal): void
{
}

/**
* Adds a negatable option (e.g. --ansi / --no-ansi)
*
Expand Down
15 changes: 15 additions & 0 deletions core/Scheduler/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ class Scheduler
*/
private $lock;

/**
* @var int|null
*/
private $signal = null;

public function __construct(TaskLoader $loader, LoggerInterface $logger, ScheduledTaskLock $lock)
{
$this->timetable = new Timetable();
Expand All @@ -89,6 +94,11 @@ public function __construct(TaskLoader $loader, LoggerInterface $logger, Schedul
$this->lock = $lock;
}

public function handleSignal(int $signal): void
{
$this->signal = $signal;
}

/**
* Executes tasks that are scheduled to run, then reschedules them.
*
Expand Down Expand Up @@ -121,6 +131,11 @@ public function run()

// loop through each task
foreach ($tasks as $task) {
if (in_array($this->signal, [\SIGINT, \SIGTERM], true)) {
$this->logger->info("Scheduler: Aborting due to received signal");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be helpful to log the signal that is received, SIGINT or SIGTERM

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might not hurt, but it also doesn't make a difference between the signals here.
Also this is currently only triggered through core archiving, and that one prints the signal once it receives one: https://github.com/matomo-org/matomo/pull/22487/files#diff-f41c275e883996b8b69cb765b1818e12e82cbe034e648bc3f7d03610cc7abad1R317

return $executionResults;
}

// if the task does not have the current priority level, don't execute it yet
if ($task->getPriority() != $priority) {
continue;
Expand Down
Loading
Loading