Files
ulthon_admin/extend/base/common/command/TimerBase.php
augushong abeac2c3cb feat(timer): 新增配置同步到数据库及主节点选举
T5: TimerServiceBase.syncConfigToDatabase() - syncs task config to DB
T6: HostServiceBase - auto master election, stale node detection,
    getMasterNode/setMasterNode methods
2026-05-26 18:33:41 +08:00

275 lines
10 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
declare(strict_types=1);
namespace base\common\command;
use app\common\console\Command;
use app\common\service\HostService;
use app\common\service\TimerService;
use GuzzleHttp\Client;
use GuzzleHttp\Promise\Utils;
use think\console\Input;
use think\console\input\Option;
use think\console\Output;
use think\facade\Cache;
use think\facade\Config;
use think\facade\Log;
use Workerman\Http\Client as HttpClient;
use Workerman\Timer;
use Workerman\Worker;
class TimerBase extends Command
{
protected $host;
protected $siteDomain;
protected $siteHost;
protected $requestList;
protected $callList;
protected function configure()
{
parent::configure();
// 指令配置
$this->setName('timer')
->addOption('temp', null, Option::VALUE_NONE)
->addOption('quit', null, Option::VALUE_NONE)
->addOption('local', null, Option::VALUE_NONE)
->addOption('local-host', null, Option::VALUE_OPTIONAL, '本地域名', 'http://localhost')
->addOption('local-port', null, Option::VALUE_OPTIONAL, '本地端口', '80')
->setDescription('内置秒级定时器');
}
protected function execute(Input $input, Output $output)
{
try {
// 指令输出
$output->writeln('start timer');
$site_domain = sysconfig('site', 'site_domain');
if (empty($site_domain)) {
$output->writeln('请前往后台设置站点域名site_domain配置项');
return;
}
$host = $site_domain;
if ($input->hasOption('local')) {
$host = $input->getOption('local-host') . ':' . $input->getOption('local-port');
}
$output->writeln('站点域名:' . $host);
$site_host = parse_url($host, PHP_URL_HOST);
// 同步配置到数据库
TimerService::syncConfigToDatabase();
// 设置配置的任务
$timer_service = new TimerService();
$request_list = $timer_service->generateAllRequestList();
$call_list = $timer_service->generateAllCallList();
// 内置的节点注册任务
$system_host_register =
[
'name' => 'system_host_register', // 定时任务的名称,不能重复
'type' => 'call', // 定时任务的类型默认只支持site你也可以重写定时器命令行以支持其他命令
'target' => [HostService::class, 'heartbeat'], // 要访问的地址如果不是以https开头那么以后台的系统配置中读取相关配置如果没有配置则不执行
'frequency' => 30, // 执行频率单位填写10则每10秒过后执行一次
];
$system_host_call_list = TimerService::generateTaskInstanceFromConfig($system_host_register);
$call_list = array_merge($call_list, $system_host_call_list);
$this->host = $host;
$this->siteDomain = $site_domain;
$this->siteHost = $site_host;
$this->requestList = $request_list;
$this->callList = $call_list;
// 文本模式:正常运行定时器
$timer_mode = Config::get('timer.mode', 'normal');
if ($timer_mode == 'normal') {
$this->runNormal();
} else {
$this->runParallel();
}
} catch (\Throwable $e) {
throw $e;
}
}
public function runParallel()
{
// 重新构造命令行参数,以便兼容workerman的命令
global $argv;
$argv = [];
array_unshift($argv, 'think', 'start');
$host = $this->host;
$site_host = $this->siteHost;
$output = $this->output;
$input = $this->input;
$call_list = $this->callList;
$worker = new Worker();
$worker->count = 1;
$worker->name = 'timer_request';
$worker->timerRequestList = $this->requestList;
$worker->onWorkerStart = function () use ($worker, $host, $site_host, $output, $input) {
$options = [
'max_conn_per_addr' => Config::get('timer.max_conn_per_addr', 1000),
'keepalive_timeout' => Config::get('timer.keepalive_timeout', 86400),
'connect_timeout' => Config::get('timer.connect_timeout', 86400),
'timeout' => Config::get('timer.timeout', 86400),
];
$http = new HttpClient($options);
Timer::add(1, function () use ($worker, $host, $site_host, $output, $input, $http) {
$request_list = $worker->timerRequestList;
foreach ($request_list as $request_item) {
$output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']);
if (!isset($request_item['is_running'])) {
$request_item['is_running'] = false;
}
if ($request_item['is_running']) {
$output->writeln('进行中,跳过');
continue;
}
if (!isset($request_item['last_run_time'])) {
$request_item['last_run_time'] = 0;
}
if (time() - $request_item['last_run_time'] < $request_item['frequency']) {
continue;
}
$request_item['is_running'] = true;
$request_item['last_run_time'] = time();
//
$http->request($host . $request_item['target'], [
'headers' => [
'Host' => $site_host,
'Accept' => 'application/json,text/plain',
],
'success' => function ($response) use ($request_item, $output) {
$request_item['is_running'] = false;
$output->writeln((string) $response->getBody());
},
'error' => function ($exception) use ($request_item, $output) {
$request_item['is_running'] = false;
$output->writeln($exception);
},
]);
}
});
};
// 运行call任务
foreach ($call_list as $call_item) {
$worker_call = new Worker();
$worker_call->count = 1;
$worker_call->name = 'timer_call_' . $call_item['name'] . '_' . $call_item['concurrency_id'];
$worker_call->onWorkerStart = function () use ($worker_call, $call_item, $output, $input) {
Timer::add(1, function () use ($worker_call, $call_item, $output, $input) {
// TODO:统一通过相同的方法判断任务是否该执行
$cache_key = 'timer_request_' . $call_item['name'] . '_' . $call_item['concurrency_id'];
$cache_tag = 'system_timer';
$last_exec_time = Cache::get($cache_key, 0);
if ($last_exec_time >= time() - $call_item['frequency']) {
return;
}
Cache::tag($cache_tag)->set($cache_key, time());
call_user_func($call_item['target']);
$output->writeln(date('Y-m-d H:i:s') . ': call ' . $call_item['name']);
});
};
}
Worker::runAll();
}
public function runNormal()
{
$host = $this->host;
$site_host = $this->siteHost;
$request_list = $this->requestList;
$output = $this->output;
$input = $this->input;
$client = new Client([
'base_uri' => $host,
'headers' => [
'Host' => $site_host,
'Accept' => 'application/json,text/plain',
],
'verify' => false,
]);
while (true) {
// 运行请求任务
try {
$list_promises = [];
foreach ($request_list as $request_item) {
$name = $request_item['name'];
$cache_key = 'timer_request_' . $name . '_' . $request_item['concurrency_id'];
$cache_tag = 'system_timer';
$last_exec_time = Cache::get($cache_key, 0);
if ($last_exec_time >= time() - $request_item['frequency']) {
continue;
}
Cache::tag($cache_tag)->set($cache_key, time());
$output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']);
$list_promises[$request_item['name']] = $client->getAsync($request_item['target']);
}
if (empty($list_promises)) {
if (!$input->hasOption('quit')) {
$output->writeln(date('Y-m-d H:i:s') . ' no request');
}
} else {
$results = Utils::unwrap($list_promises);
$output->writeln(date('Y-m-d H:i:s') . ': request all finished');
}
} catch (\Throwable $th) {
// throw $th;
$output->writeln('error:' . $th->getMessage());
Log::error($th->getMessage());
}
// 运行call任务
$call_list = $this->callList;
foreach ($call_list as $call_item) {
$name = $call_item['name'];
$cache_key = 'timer_call_' . $name . '_' . $call_item['concurrency_id'];
$cache_tag = 'call_timer';
$last_exec_time = Cache::get($cache_key, 0);
if ($last_exec_time >= time() - $call_item['frequency']) {
continue;
}
Cache::tag($cache_tag)->set($cache_key, time());
call_user_func($call_item['target']);
$output->writeln(date('Y-m-d H:i:s') . ': call function ' . $name . ' finished');
}
if ($input->hasOption('temp')) {
break;
}
sleep(1);
}
}
}