feat(timer): 新增 run_type 调度、host_id 投递和日志清理

T7: TimerBase shouldExecuteTask() - main/auto/all/manual/disabled modes
    with two-phase DB row lock for auto mode
T8: TimerControllerBase - host_id param, logStart/logEnd methods
    TimerServiceBase - inject host_id into site URLs
T9: TimerLogClean command - php think admin:timer:log:clean --days=30
This commit is contained in:
augushong
2026-05-26 02:41:10 +08:00
parent abeac2c3cb
commit 25fab093fa
7 changed files with 215 additions and 0 deletions

View File

@@ -14,6 +14,7 @@ use think\console\input\Option;
use think\console\Output;
use think\facade\Cache;
use think\facade\Config;
use think\facade\Db;
use think\facade\Log;
use Workerman\Http\Client as HttpClient;
use Workerman\Timer;
@@ -100,6 +101,112 @@ class TimerBase extends Command
}
}
/**
* 基于 run_type 判断当前节点是否应执行该任务(仅对 site 类型生效).
* 在 Cache 节流之后、实际执行之前调用。
*
* @param array|\ArrayAccess $task_item 任务实例(含 name/type/frequency 等)
* @return bool true=应执行 false=跳过
*/
protected function shouldExecuteTask($task_item): bool
{
// call 类型任务不受 run_type 调度层影响
if (($task_item['type'] ?? '') === 'call') {
return true;
}
$task_name = $task_item['name'] ?? '';
try {
$config = Db::name('system_timer_config')
->where('task_name', $task_name)
->find();
} catch (\Throwable $e) {
Log::warning('shouldExecuteTask: 查询 system_timer_config 失败 - ' . $e->getMessage());
return true;
}
// 无数据库记录时不阻塞(回退到默认行为)
if (empty($config)) {
return true;
}
// status=0 表示任务已禁用
if (isset($config['status']) && (int) $config['status'] === 0) {
return false;
}
$run_type = $config['run_type'] ?? 'auto';
switch ($run_type) {
case 'main':
// 仅主节点执行
$master_node = HostService::getMasterNode();
$node_id = HostService::getNodeId();
return ($master_node === $node_id);
case 'auto':
// 两阶段 DB 行锁竞争:竞争成功后释放锁,再执行任务
Db::startTrans();
try {
$row = Db::name('system_timer_config')
->where('task_name', $task_name)
->lock(true)
->find();
if ($row && !empty($row['last_execute_time']) && $row['last_execute_time'] > 0) {
if ((time() - (int) $row['last_execute_time']) < ($task_item['frequency'] ?? 0)) {
Db::commit();
return false; // 另一个节点刚执行过
}
}
Db::name('system_timer_config')
->where('task_name', $task_name)
->update([
'last_execute_node' => HostService::getNodeId(),
'last_execute_time' => time(),
'update_time' => time(),
]);
Db::commit();
return true;
} catch (\Throwable $e) {
Db::rollback();
Log::warning('shouldExecuteTask auto mode exception: ' . $e->getMessage());
return false;
}
case 'all':
// 所有节点都执行
return true;
case 'manual':
// 手动触发:检查 manual_trigger=1执行一次后重置为 0
if (!empty($config['manual_trigger'])) {
Db::name('system_timer_config')
->where('task_name', $task_name)
->update([
'manual_trigger' => 0,
'update_time' => time(),
]);
return true;
}
return false;
default:
// 未知 run_type 不阻塞
return true;
}
}
public function runParallel()
{
// 重新构造命令行参数,以便兼容workerman的命令
@@ -152,6 +259,12 @@ class TimerBase extends Command
$request_item['is_running'] = true;
$request_item['last_run_time'] = time();
// run_type 调度检查(节流之后、实际执行之前)
if (!$this->shouldExecuteTask($request_item)) {
$request_item['is_running'] = false;
continue;
}
//
$http->request($host . $request_item['target'], [
@@ -231,6 +344,11 @@ class TimerBase extends Command
Cache::tag($cache_tag)->set($cache_key, time());
// run_type 调度检查Cache 节流之后、实际执行之前)
if (!$this->shouldExecuteTask($request_item)) {
continue;
}
$output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']);
$list_promises[$request_item['name']] = $client->getAsync($request_item['target']);
}