Files
ulthon_admin/extend/base/common/command/TimerBase.php
augushong 25fab093fa feat(timer): 新增 run_type 调度、host_id 投递和日志清理
T7: TimerBase shouldExecuteTask() - main/auto/all/manual/disabled modes
    with two-phase DB row lock for auto mode
T8: TimerControllerBase - host_id param, logStart/logEnd methods
    TimerServiceBase - inject host_id into site URLs
T9: TimerLogClean command - php think admin:timer:log:clean --days=30
2026-05-26 18:33:42 +08:00

393 lines
14 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
declare(strict_types=1);
namespace base\common\command;
use app\common\console\Command;
use app\common\service\HostService;
use app\common\service\TimerService;
use GuzzleHttp\Client;
use GuzzleHttp\Promise\Utils;
use think\console\Input;
use think\console\input\Option;
use think\console\Output;
use think\facade\Cache;
use think\facade\Config;
use think\facade\Db;
use think\facade\Log;
use Workerman\Http\Client as HttpClient;
use Workerman\Timer;
use Workerman\Worker;
class TimerBase extends Command
{
protected $host;
protected $siteDomain;
protected $siteHost;
protected $requestList;
protected $callList;
protected function configure()
{
parent::configure();
// 指令配置
$this->setName('timer')
->addOption('temp', null, Option::VALUE_NONE)
->addOption('quit', null, Option::VALUE_NONE)
->addOption('local', null, Option::VALUE_NONE)
->addOption('local-host', null, Option::VALUE_OPTIONAL, '本地域名', 'http://localhost')
->addOption('local-port', null, Option::VALUE_OPTIONAL, '本地端口', '80')
->setDescription('内置秒级定时器');
}
protected function execute(Input $input, Output $output)
{
try {
// 指令输出
$output->writeln('start timer');
$site_domain = sysconfig('site', 'site_domain');
if (empty($site_domain)) {
$output->writeln('请前往后台设置站点域名site_domain配置项');
return;
}
$host = $site_domain;
if ($input->hasOption('local')) {
$host = $input->getOption('local-host') . ':' . $input->getOption('local-port');
}
$output->writeln('站点域名:' . $host);
$site_host = parse_url($host, PHP_URL_HOST);
// 同步配置到数据库
TimerService::syncConfigToDatabase();
// 设置配置的任务
$timer_service = new TimerService();
$request_list = $timer_service->generateAllRequestList();
$call_list = $timer_service->generateAllCallList();
// 内置的节点注册任务
$system_host_register =
[
'name' => 'system_host_register', // 定时任务的名称,不能重复
'type' => 'call', // 定时任务的类型默认只支持site你也可以重写定时器命令行以支持其他命令
'target' => [HostService::class, 'heartbeat'], // 要访问的地址如果不是以https开头那么以后台的系统配置中读取相关配置如果没有配置则不执行
'frequency' => 30, // 执行频率单位填写10则每10秒过后执行一次
];
$system_host_call_list = TimerService::generateTaskInstanceFromConfig($system_host_register);
$call_list = array_merge($call_list, $system_host_call_list);
$this->host = $host;
$this->siteDomain = $site_domain;
$this->siteHost = $site_host;
$this->requestList = $request_list;
$this->callList = $call_list;
// 文本模式:正常运行定时器
$timer_mode = Config::get('timer.mode', 'normal');
if ($timer_mode == 'normal') {
$this->runNormal();
} else {
$this->runParallel();
}
} catch (\Throwable $e) {
throw $e;
}
}
/**
* 基于 run_type 判断当前节点是否应执行该任务(仅对 site 类型生效).
* 在 Cache 节流之后、实际执行之前调用。
*
* @param array|\ArrayAccess $task_item 任务实例(含 name/type/frequency 等)
* @return bool true=应执行 false=跳过
*/
protected function shouldExecuteTask($task_item): bool
{
// call 类型任务不受 run_type 调度层影响
if (($task_item['type'] ?? '') === 'call') {
return true;
}
$task_name = $task_item['name'] ?? '';
try {
$config = Db::name('system_timer_config')
->where('task_name', $task_name)
->find();
} catch (\Throwable $e) {
Log::warning('shouldExecuteTask: 查询 system_timer_config 失败 - ' . $e->getMessage());
return true;
}
// 无数据库记录时不阻塞(回退到默认行为)
if (empty($config)) {
return true;
}
// status=0 表示任务已禁用
if (isset($config['status']) && (int) $config['status'] === 0) {
return false;
}
$run_type = $config['run_type'] ?? 'auto';
switch ($run_type) {
case 'main':
// 仅主节点执行
$master_node = HostService::getMasterNode();
$node_id = HostService::getNodeId();
return ($master_node === $node_id);
case 'auto':
// 两阶段 DB 行锁竞争:竞争成功后释放锁,再执行任务
Db::startTrans();
try {
$row = Db::name('system_timer_config')
->where('task_name', $task_name)
->lock(true)
->find();
if ($row && !empty($row['last_execute_time']) && $row['last_execute_time'] > 0) {
if ((time() - (int) $row['last_execute_time']) < ($task_item['frequency'] ?? 0)) {
Db::commit();
return false; // 另一个节点刚执行过
}
}
Db::name('system_timer_config')
->where('task_name', $task_name)
->update([
'last_execute_node' => HostService::getNodeId(),
'last_execute_time' => time(),
'update_time' => time(),
]);
Db::commit();
return true;
} catch (\Throwable $e) {
Db::rollback();
Log::warning('shouldExecuteTask auto mode exception: ' . $e->getMessage());
return false;
}
case 'all':
// 所有节点都执行
return true;
case 'manual':
// 手动触发:检查 manual_trigger=1执行一次后重置为 0
if (!empty($config['manual_trigger'])) {
Db::name('system_timer_config')
->where('task_name', $task_name)
->update([
'manual_trigger' => 0,
'update_time' => time(),
]);
return true;
}
return false;
default:
// 未知 run_type 不阻塞
return true;
}
}
public function runParallel()
{
// 重新构造命令行参数,以便兼容workerman的命令
global $argv;
$argv = [];
array_unshift($argv, 'think', 'start');
$host = $this->host;
$site_host = $this->siteHost;
$output = $this->output;
$input = $this->input;
$call_list = $this->callList;
$worker = new Worker();
$worker->count = 1;
$worker->name = 'timer_request';
$worker->timerRequestList = $this->requestList;
$worker->onWorkerStart = function () use ($worker, $host, $site_host, $output, $input) {
$options = [
'max_conn_per_addr' => Config::get('timer.max_conn_per_addr', 1000),
'keepalive_timeout' => Config::get('timer.keepalive_timeout', 86400),
'connect_timeout' => Config::get('timer.connect_timeout', 86400),
'timeout' => Config::get('timer.timeout', 86400),
];
$http = new HttpClient($options);
Timer::add(1, function () use ($worker, $host, $site_host, $output, $input, $http) {
$request_list = $worker->timerRequestList;
foreach ($request_list as $request_item) {
$output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']);
if (!isset($request_item['is_running'])) {
$request_item['is_running'] = false;
}
if ($request_item['is_running']) {
$output->writeln('进行中,跳过');
continue;
}
if (!isset($request_item['last_run_time'])) {
$request_item['last_run_time'] = 0;
}
if (time() - $request_item['last_run_time'] < $request_item['frequency']) {
continue;
}
$request_item['is_running'] = true;
$request_item['last_run_time'] = time();
// run_type 调度检查(节流之后、实际执行之前)
if (!$this->shouldExecuteTask($request_item)) {
$request_item['is_running'] = false;
continue;
}
//
$http->request($host . $request_item['target'], [
'headers' => [
'Host' => $site_host,
'Accept' => 'application/json,text/plain',
],
'success' => function ($response) use ($request_item, $output) {
$request_item['is_running'] = false;
$output->writeln((string) $response->getBody());
},
'error' => function ($exception) use ($request_item, $output) {
$request_item['is_running'] = false;
$output->writeln($exception);
},
]);
}
});
};
// 运行call任务
foreach ($call_list as $call_item) {
$worker_call = new Worker();
$worker_call->count = 1;
$worker_call->name = 'timer_call_' . $call_item['name'] . '_' . $call_item['concurrency_id'];
$worker_call->onWorkerStart = function () use ($worker_call, $call_item, $output, $input) {
Timer::add(1, function () use ($worker_call, $call_item, $output, $input) {
// TODO:统一通过相同的方法判断任务是否该执行
$cache_key = 'timer_request_' . $call_item['name'] . '_' . $call_item['concurrency_id'];
$cache_tag = 'system_timer';
$last_exec_time = Cache::get($cache_key, 0);
if ($last_exec_time >= time() - $call_item['frequency']) {
return;
}
Cache::tag($cache_tag)->set($cache_key, time());
call_user_func($call_item['target']);
$output->writeln(date('Y-m-d H:i:s') . ': call ' . $call_item['name']);
});
};
}
Worker::runAll();
}
public function runNormal()
{
$host = $this->host;
$site_host = $this->siteHost;
$request_list = $this->requestList;
$output = $this->output;
$input = $this->input;
$client = new Client([
'base_uri' => $host,
'headers' => [
'Host' => $site_host,
'Accept' => 'application/json,text/plain',
],
'verify' => false,
]);
while (true) {
// 运行请求任务
try {
$list_promises = [];
foreach ($request_list as $request_item) {
$name = $request_item['name'];
$cache_key = 'timer_request_' . $name . '_' . $request_item['concurrency_id'];
$cache_tag = 'system_timer';
$last_exec_time = Cache::get($cache_key, 0);
if ($last_exec_time >= time() - $request_item['frequency']) {
continue;
}
Cache::tag($cache_tag)->set($cache_key, time());
// run_type 调度检查Cache 节流之后、实际执行之前)
if (!$this->shouldExecuteTask($request_item)) {
continue;
}
$output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']);
$list_promises[$request_item['name']] = $client->getAsync($request_item['target']);
}
if (empty($list_promises)) {
if (!$input->hasOption('quit')) {
$output->writeln(date('Y-m-d H:i:s') . ' no request');
}
} else {
$results = Utils::unwrap($list_promises);
$output->writeln(date('Y-m-d H:i:s') . ': request all finished');
}
} catch (\Throwable $th) {
// throw $th;
$output->writeln('error:' . $th->getMessage());
Log::error($th->getMessage());
}
// 运行call任务
$call_list = $this->callList;
foreach ($call_list as $call_item) {
$name = $call_item['name'];
$cache_key = 'timer_call_' . $name . '_' . $call_item['concurrency_id'];
$cache_tag = 'call_timer';
$last_exec_time = Cache::get($cache_key, 0);
if ($last_exec_time >= time() - $call_item['frequency']) {
continue;
}
Cache::tag($cache_tag)->set($cache_key, time());
call_user_func($call_item['target']);
$output->writeln(date('Y-m-d H:i:s') . ': call function ' . $name . ' finished');
}
if ($input->hasOption('temp')) {
break;
}
sleep(1);
}
}
}