Skip to content

Commit

Permalink
Merge pull request #10 from tarosky/issue/7
Browse files Browse the repository at this point in the history
Support updated DB schema of Cavalcade
Close #7.
  • Loading branch information
harai authored Feb 28, 2021
2 parents 38cf9e0 + 7de066f commit 8e6b5ce
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 103 deletions.
1 change: 1 addition & 0 deletions bin/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/cavalcade-test
18 changes: 14 additions & 4 deletions bin/cavalcade
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ use Exception;

include dirname(__DIR__) . '/bootstrap.php';

$options = getopt('l:p:b:w:c:');
$options = getopt('l:p:b:w:c:i:d:');
$log_path = $options['l'] ?? '/var/log/wp/cron.log';
$pid_file = $options['p'] ?? '/run/cavalcade/cavalcade.pid';
$wp_base_path = $options['b'] ?? '/var/web/wp';
$max_workers_count = $options['w'] ?? 2;
$max_workers_count = intval($options['w'] ?? 2);
$wpcli_path = $options['c'] ?? '/usr/local/bin/wp';
$cleanup_interval = intval($options['i'] ?? 60);
$cleanup_delay = intval($options['d'] ?? 24 * 60 * 60);

date_default_timezone_set('UTC');
$log = Logger::create($log_path);
Expand All @@ -25,8 +27,16 @@ if ($pid === null) {
file_put_contents($pid_file, $pid);

try {
$runner = Runner::instance($log, $max_workers_count, $wpcli_path);
$runner->bootstrap($wp_base_path);
$runner = Runner::instance(
$log,
$max_workers_count,
$wpcli_path,
$cleanup_interval,
$cleanup_delay,
$wp_base_path,
);
$runner->bootstrap();
/*CAVALCADE_HOOKS_FOR_TESTING*/
$log->info('Cavalcade Runner started');
$runner->run();
} catch (SignalInterrupt $e) {
Expand Down
2 changes: 1 addition & 1 deletion build/cavalcade-runner/entrypoint
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ step="$1"

mkdir -p work

command='bin/cavalcade -l work/cavalcade-runner.log -p work/cavalcade-runner.pid -b /www -w 10'
command='bin/cavalcade-test -l work/cavalcade-runner.log -p work/cavalcade-runner.pid -b /www -w 10 -i 2 -d 6'

case "$step" in
validate)
Expand Down
2 changes: 1 addition & 1 deletion build/envs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ export GROUP_ID="$(id -g)"
export PHP_VERSION="${PHP_VERSION:-7.4}"
export WP_VERSION="${WP_VERSION:-latest}"
export WP_MULTISITE="${WP_MULTISITE:-0}"
export CAVALCADE_VERSION="${CAVALCADE_VERSION:-5}"
export CAVALCADE_VERSION="${CAVALCADE_VERSION:-10}"
2 changes: 1 addition & 1 deletion build/wordpress/install-wp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ tmp_caval="$(mktemp)"
tmpd_caval="$(mktemp -d)"
trap "rm -fr $tmp_caval $tmpd_caval" EXIT
curl -fsSL \
"https://github.com/tarosky/Cavalcade/releases/download/build-5/cavalcade.build-$cavalcade_version.zip" \
"https://github.com/tarosky/Cavalcade/releases/download/build-$cavalcade_version/cavalcade.build-$cavalcade_version.zip" \
> "$tmp_caval"
unzip -q -d "$tmpd_caval" "$tmp_caval"
mkdir -p /wp/wp-content/mu-plugins
Expand Down
47 changes: 33 additions & 14 deletions inc/class-job.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@
use DateTimeZone;
use PDO;

const MYSQL_DATE_FORMAT = 'Y-m-d H:i:s';

class Job
{
public $id;
public $site;
public $hook;
public $args;
public $start;
public $nextrun;
public $interval;
public $status;
public $schedule;
public $registered_at;
public $revised_at;
public $started_at;
public $finished_at;
public $deleted_at;

protected $db;
protected $table_prefix;
Expand Down Expand Up @@ -60,12 +63,16 @@ public function get_site_url()
*/
public function acquire_lock()
{
$started_at = new DateTime('now', new DateTimeZone('UTC'));
$this->started_at = $started_at->format(MYSQL_DATE_FORMAT);

$query = "UPDATE {$this->table_prefix}cavalcade_jobs
SET status = \"running\"
SET status = \"running\", started_at = :started_at
WHERE status = \"waiting\" AND id = :id";

$statement = $this->db->prepare($query);
$statement->bindValue(':id', $this->id);
$statement->bindValue(':started_at', $this->started_at);
$statement->execute();

$rows = $statement->rowCount();
Expand All @@ -74,16 +81,19 @@ public function acquire_lock()

public function mark_completed()
{
$data = [];
$finished_at = new DateTime('now', new DateTimeZone('UTC'));
$this->finished_at = $finished_at->format(MYSQL_DATE_FORMAT);

if ($this->interval) {
$this->reschedule();
} else {
$query = "UPDATE {$this->table_prefix}cavalcade_jobs
SET status = \"completed\"
SET status = \"completed\", finished_at = :finished_at
WHERE id = :id";

$statement = $this->db->prepare($query);
$statement->bindValue(':id', $this->id);
$statement->bindValue(':finished_at', $this->finished_at);
$statement->execute();
}
}
Expand All @@ -97,13 +107,14 @@ public function reschedule()
$this->status = 'waiting';

$query = "UPDATE {$this->table_prefix}cavalcade_jobs
SET status = :status, nextrun = :nextrun
SET status = :status, nextrun = :nextrun, finished_at = :finished_at
WHERE id = :id";

$statement = $this->db->prepare($query);
$statement->bindValue(':id', $this->id);
$statement->bindValue(':status', $this->status);
$statement->bindValue(':nextrun', $this->nextrun);
$statement->bindValue(':finished_at', $this->finished_at);
$statement->execute();
}

Expand All @@ -112,14 +123,22 @@ public function reschedule()
*
* @param string $message failure detail message
*/
public function mark_failed($message = '')
public function mark_failed()
{
$query = "UPDATE {$this->table_prefix}cavalcade_jobs
SET status = \"failed\"
WHERE id = :id";
$finished_at = new DateTime('now', new DateTimeZone('UTC'));
$this->finished_at = $finished_at->format(MYSQL_DATE_FORMAT);

$statement = $this->db->prepare($query);
$statement->bindValue(':id', $this->id);
$statement->execute();
if ($this->interval) {
$this->reschedule();
} else {
$query = "UPDATE {$this->table_prefix}cavalcade_jobs
SET status = \"failed\", finished_at = :finished_at
WHERE id = :id";

$statement = $this->db->prepare($query);
$statement->bindValue(':id', $this->id);
$statement->bindValue(':finished_at', $this->finished_at);
$statement->execute();
}
}
}
9 changes: 9 additions & 0 deletions inc/class-logger.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ protected static function worker_values(Worker $worker)
'job_id' => intval($job->id),
'hook' => $job->hook,
'args' => $job->args,
'nextrun' => $job->nextrun,
'interval' => $job->interval,
'status' => $job->status,
'schedule' => $job->schedule,
'registered_at' => $job->registered_at,
'revised_at' => $job->revised_at,
'started_at' => $job->started_at,
'finished_at' => $job->finished_at,
'deleted_at' => $job->deleted_at,
'stdout' => $worker->get_stdout(),
'stderr' => $worker->get_stderr(),
'error_log' => $worker->get_error_log(),
Expand Down
90 changes: 77 additions & 13 deletions inc/class-runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@

namespace HM\Cavalcade\Runner;

use DateInterval;
use DateTime;
use DateTimeZone;
use Exception;
use PDO;

const LOOP_INTERVAL = 1;
const MYSQL_DATE_FORMAT = 'Y-m-d H:i:s';

class Runner
{
public $max_workers;
public $wpcli_path;
public $cleanup_interval;
public $cleanup_delay;

/**
* Hook system for the Runner.
Expand All @@ -31,10 +37,19 @@ class Runner
*/
protected static $instance;

public function __construct($log, $max_workers, $wpcli_path)
{
public function __construct(
$log,
$max_workers,
$wpcli_path,
$cleanup_interval,
$cleanup_delay,
$wp_base_path
) {
$this->max_workers = $max_workers;
$this->wpcli_path = $wpcli_path;
$this->cleanup_interval = $cleanup_interval;
$this->cleanup_delay = $cleanup_delay;
$this->wp_path = realpath($wp_base_path);
$this->hooks = new Hooks();
$this->log = $log;
}
Expand All @@ -44,35 +59,46 @@ public function __construct($log, $max_workers, $wpcli_path)
*
* @return self
*/
public static function instance($log, $max_workers, $wpcli_path)
{
public static function instance(
$log,
$max_workers,
$wpcli_path,
$cleanup_interval,
$cleanup_delay,
$wp_base_path
) {
if (empty(static::$instance)) {
static::$instance = new static($log, $max_workers, $wpcli_path);
static::$instance = new static(
$log,
$max_workers,
$wpcli_path,
$cleanup_interval,
$cleanup_delay,
$wp_base_path,
);
}

return static::$instance;
}

public function bootstrap($wp_path = '.')
public function bootstrap()
{
// Check some requirements first
if (!function_exists('pcntl_signal')) {
throw new Exception('pcntl extension is required');
}

$config_path = realpath($wp_path . '/wp-config.php');
$config_path = $this->wp_path . '/wp-config.php';
if (!file_exists($config_path)) {
$config_path = realpath($wp_path . '/../wp-config.php');
$config_path = realpath($this->wp_path . '/../wp-config.php');
if (!file_exists($config_path)) {
throw new Exception(sprintf(
'Could not find config file at %s',
realpath($wp_path) . '/wp-config.php or next level up.'
$this->wp_path . '/wp-config.php or next level up.'
));
}
}

$this->wp_path = realpath($wp_path);

// Load WP config
define('ABSPATH', dirname(__DIR__) . '/fakewp/');
if (!isset($_SERVER['HTTP_HOST'])) {
Expand All @@ -91,10 +117,40 @@ public function bootstrap($wp_path = '.')

// Connect to the database!
$this->connect_to_db();

$this->upgrade_db();
}

protected function upgrade_db()
{
$output = $retval = null;
exec("$this->wpcli_path --path=$this->wp_path cavalcade upgrade", $output, $retval);
$this->log->info('wp cavalcade upgrade executed', [
'output' => $output,
'retval' => $retval,
]);
}

public function cleanup()
{
$expired = new DateTime('now', new DateTimeZone('UTC'));
$expired->sub(new DateInterval("PT{$this->cleanup_delay}S"));

$query = "DELETE FROM {$this->table_prefix}cavalcade_jobs
WHERE
(deleted_at < :expired1 AND status IN ('completed', 'waiting', 'failed'))
OR
(finished_at < :expired2 AND status IN ('completed', 'failed'))";
$statement = $this->db->prepare($query);
$expired_str = $expired->format(MYSQL_DATE_FORMAT);
$statement->bindValue(':expired1', $expired_str);
$statement->bindValue(':expired2', $expired_str);
$statement->execute();
$count = $statement->rowCount();

$this->log->debug('db cleaned up', ['deleted_rows' => $count]);
}

// TODO: Run updating database schema every time runner is started.
// Keep DB schema updated.
public function run()
{
pcntl_signal(SIGTERM, [$this, 'terminate']);
Expand All @@ -103,9 +159,17 @@ public function run()

$this->hooks->run('Runner.run.before');

$prev_cleanup = time();
while (true) {
pcntl_signal_dispatch();
$this->hooks->run('Runner.run.loop_start', $this);

$now = time();
if ($this->cleanup_interval <= $now - $prev_cleanup) {
$prev_cleanup = $now;
$this->cleanup();
}

$this->check_workers();

if (count($this->workers) === $this->max_workers) {
Expand Down
Loading

0 comments on commit 8e6b5ce

Please sign in to comment.