mirror of
https://gitee.com/ulthon/ulthon_admin.git
synced 2026-07-01 15:32:48 +08:00
feat: 增加call类型的定时任务;
This commit is contained in:
@@ -4,6 +4,7 @@ declare(strict_types=1);
|
||||
|
||||
namespace base\common\command;
|
||||
|
||||
use app\common\service\HostService;
|
||||
use app\common\service\TimerService;
|
||||
use GuzzleHttp\Client;
|
||||
use GuzzleHttp\Promise\Utils;
|
||||
@@ -24,6 +25,7 @@ class TimerBase extends Command
|
||||
protected $siteDomain;
|
||||
protected $siteHost;
|
||||
protected $requestList;
|
||||
protected $callList;
|
||||
|
||||
protected function configure()
|
||||
{
|
||||
@@ -58,23 +60,27 @@ class TimerBase extends Command
|
||||
$output->writeln('站点域名:' . $host);
|
||||
$site_host = parse_url($host, PHP_URL_HOST);
|
||||
|
||||
$request_list = TimerService::generateAllRequestList();
|
||||
// 设置配置的任务
|
||||
$timer_service = new TimerService();
|
||||
$request_list = $timer_service->generateAllRequestList();
|
||||
$call_list = $timer_service->generateAllCallList();
|
||||
|
||||
// 内置的节点注册任务
|
||||
$system_host_register =
|
||||
[
|
||||
'name' => 'system_host_register', // 定时任务的名称,不能重复
|
||||
'type' => 'site', // 定时任务的类型,默认只支持site,你也可以重写定时器命令行以支持其他命令
|
||||
'target' => '/tools/timer.SystemHost/do', // 要访问的地址,如果不是以https开头,那么以后台的系统配置中读取相关配置,如果没有配置则不执行
|
||||
'type' => 'call', // 定时任务的类型,默认只支持site,你也可以重写定时器命令行以支持其他命令
|
||||
'target' => [HostService::class, 'heartbeat'], // 要访问的地址,如果不是以https开头,那么以后台的系统配置中读取相关配置,如果没有配置则不执行
|
||||
'frequency' => 30, // 执行频率,单位:秒,填写10,则每10秒过后执行一次
|
||||
];
|
||||
$system_host_register = TimerService::initConfigItem($system_host_register);
|
||||
$system_host_request_list = TimerService::generateRequestListFromConfig($system_host_register);
|
||||
$request_list = array_merge($request_list, $system_host_request_list);
|
||||
$system_host_call_list = TimerService::generateTaskInstanceFromConfig($system_host_register);
|
||||
$call_list = array_merge($call_list, $system_host_call_list);
|
||||
|
||||
$this->host = $host;
|
||||
$this->siteDomain = $site_domain;
|
||||
$this->siteHost = $site_host;
|
||||
$this->requestList = $request_list;
|
||||
$this->callList = $call_list;
|
||||
|
||||
$timer_mode = Config::get('timer.mode', 'normal');
|
||||
if ($timer_mode == 'normal') {
|
||||
@@ -96,9 +102,11 @@ class TimerBase extends Command
|
||||
$output = $this->output;
|
||||
$input = $this->input;
|
||||
|
||||
$call_list = $this->callList;
|
||||
|
||||
$worker = new Worker();
|
||||
$worker->count = 1;
|
||||
$worker->name = 'start_timer_parallel';
|
||||
$worker->name = 'timer_request';
|
||||
$worker->timerRequestList = $this->requestList;
|
||||
|
||||
$worker->onWorkerStart = function () use ($worker, $host, $site_host, $output, $input) {
|
||||
@@ -154,6 +162,28 @@ class TimerBase extends Command
|
||||
});
|
||||
};
|
||||
|
||||
// 运行call任务
|
||||
foreach ($call_list as $call_item) {
|
||||
$worker_call = new Worker();
|
||||
$worker_call->count = 1;
|
||||
$worker_call->name = 'timer_call_' . $call_item['name'] . '_' . $call_item['concurrency_id'];
|
||||
$worker_call->onWorkerStart = function () use ($worker_call, $call_item, $output, $input) {
|
||||
Timer::add(1, function () use ($worker_call, $call_item, $output, $input) {
|
||||
// TODO:统一通过相同的方法判断任务是否该执行
|
||||
$cache_key = 'timer_request_' . $call_item['name'] . '_' . $call_item['concurrency_id'];
|
||||
$cache_tag = 'system_timer';
|
||||
$last_exec_time = Cache::get($cache_key, 0);
|
||||
if ($last_exec_time >= time() - $call_item['frequency']) {
|
||||
return;
|
||||
}
|
||||
Cache::tag($cache_tag)->set($cache_key, time());
|
||||
|
||||
call_user_func($call_item['target']);
|
||||
$output->writeln(date('Y-m-d H:i:s') . ': call ' . $call_item['name']);
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
Worker::runAll();
|
||||
}
|
||||
|
||||
@@ -175,12 +205,13 @@ class TimerBase extends Command
|
||||
]);
|
||||
|
||||
while (true) {
|
||||
// 运行请求任务
|
||||
try {
|
||||
$list_promises = [];
|
||||
foreach ($request_list as $request_item) {
|
||||
$name = $request_item['name'];
|
||||
|
||||
$cache_key = 'timer_' . $name . '_' . $request_item['concurrency_id'];
|
||||
$cache_key = 'timer_request_' . $name . '_' . $request_item['concurrency_id'];
|
||||
$cache_tag = 'system_timer';
|
||||
$last_exec_time = Cache::get($cache_key, 0);
|
||||
|
||||
@@ -189,18 +220,9 @@ class TimerBase extends Command
|
||||
}
|
||||
|
||||
Cache::tag($cache_tag)->set($cache_key, time());
|
||||
$type = $request_item['type'];
|
||||
switch ($type) {
|
||||
case 'site':
|
||||
$output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']);
|
||||
$list_promises[$request_item['name']] = $client->getAsync($request_item['target']);
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
$output->writeln(date('Y-m-d H:i:s') . 'unsupport type:' . $type);
|
||||
break;
|
||||
}
|
||||
$output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']);
|
||||
$list_promises[$request_item['name']] = $client->getAsync($request_item['target']);
|
||||
}
|
||||
|
||||
if (empty($list_promises)) {
|
||||
@@ -217,6 +239,21 @@ class TimerBase extends Command
|
||||
Log::error($th->getMessage());
|
||||
}
|
||||
|
||||
// 运行call任务
|
||||
$call_list = $this->callList;
|
||||
foreach ($call_list as $call_item) {
|
||||
$name = $call_item['name'];
|
||||
$cache_key = 'timer_call_' . $name . '_' . $call_item['concurrency_id'];
|
||||
$cache_tag = 'call_timer';
|
||||
$last_exec_time = Cache::get($cache_key, 0);
|
||||
if ($last_exec_time >= time() - $call_item['frequency']) {
|
||||
continue;
|
||||
}
|
||||
Cache::tag($cache_tag)->set($cache_key, time());
|
||||
call_user_func($call_item['target']);
|
||||
$output->writeln(date('Y-m-d H:i:s') . ': call function ' . $name . ' finished');
|
||||
}
|
||||
|
||||
if ($input->hasOption('temp')) {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -6,24 +6,52 @@ use app\common\model\VirtualModel;
|
||||
|
||||
class TimerServiceBase
|
||||
{
|
||||
public static function generateAllRequestList()
|
||||
protected $taskList = null;
|
||||
|
||||
public static function generateAllTaskInstanceList()
|
||||
{
|
||||
$config_list = include app_file_path('common/command/timer/config.php');
|
||||
$request_list = [];
|
||||
$task_list = [];
|
||||
foreach ($config_list as $config_item) {
|
||||
$config_item = static::initConfigItem($config_item);
|
||||
|
||||
if ($config_item['name'] == 'http_demo' && !env('adminsystem.is_demo', false)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$request_list = array_merge($request_list, static::generateRequestListFromConfig($config_item));
|
||||
$task_list = array_merge($task_list, static::generateTaskInstanceFromConfig($config_item));
|
||||
}
|
||||
|
||||
return $task_list;
|
||||
}
|
||||
|
||||
public function generateAllRequestList()
|
||||
{
|
||||
if ($this->taskList === null) {
|
||||
$this->taskList = static::generateAllTaskInstanceList();
|
||||
}
|
||||
|
||||
$request_list = array_filter($this->taskList, function ($item) {
|
||||
return $item['type'] == 'site';
|
||||
});
|
||||
|
||||
return $request_list;
|
||||
}
|
||||
|
||||
public static function generateRequestListFromConfig($config_item)
|
||||
public function generateAllCallList()
|
||||
{
|
||||
if ($this->taskList === null) {
|
||||
$this->taskList = static::generateAllTaskInstanceList();
|
||||
}
|
||||
|
||||
$request_list = array_filter($this->taskList, function ($item) {
|
||||
return $item['type'] == 'call';
|
||||
});
|
||||
|
||||
return $request_list;
|
||||
}
|
||||
|
||||
public static function generateTaskInstanceFromConfig($config_item)
|
||||
{
|
||||
$config_item = static::initConfigItem($config_item);
|
||||
$request_list = [];
|
||||
$concurrency = $config_item['concurrency'];
|
||||
|
||||
@@ -33,14 +61,19 @@ class TimerServiceBase
|
||||
'concurrency_id' => $i,
|
||||
'concurrency_count' => $concurrency,
|
||||
];
|
||||
$target_info = parse_url($target);
|
||||
$query_params = [];
|
||||
if (isset($target_info['query'])) {
|
||||
parse_str($target_info['query'], $query_params);
|
||||
// 处理target
|
||||
if ($config_item['type'] == 'site') {
|
||||
$target_info = parse_url($target);
|
||||
$query_params = [];
|
||||
if (isset($target_info['query'])) {
|
||||
parse_str($target_info['query'], $query_params);
|
||||
}
|
||||
$query_params = array_merge($query_params, $params);
|
||||
$target_info['query'] = http_build_query($query_params);
|
||||
$target = unparse_url($target_info);
|
||||
} elseif ($config_item['type'] == 'call') {
|
||||
$target = $config_item['target'];
|
||||
}
|
||||
$query_params = array_merge($query_params, $params);
|
||||
$target_info['query'] = http_build_query($query_params);
|
||||
$target = unparse_url($target_info);
|
||||
|
||||
$new_config_item = clone $config_item;
|
||||
$new_config_item['target'] = $target;
|
||||
@@ -48,9 +81,9 @@ class TimerServiceBase
|
||||
|
||||
$request_list[] = $new_config_item;
|
||||
}
|
||||
|
||||
return $request_list;
|
||||
}
|
||||
|
||||
|
||||
public static function initConfigItem($config)
|
||||
{
|
||||
@@ -60,6 +93,7 @@ class TimerServiceBase
|
||||
'target' => '',
|
||||
'frequency' => 600,
|
||||
'concurrency' => 1,
|
||||
'run_type' => 'auto',
|
||||
];
|
||||
|
||||
$data = array_merge($default, $config);
|
||||
|
||||
@@ -9,6 +9,7 @@ use app\common\service\HostService;
|
||||
use think\facade\Db;
|
||||
use think\facade\Log;
|
||||
|
||||
// TODO:去掉控制器的心跳注册
|
||||
class SystemHostBase extends TimerController
|
||||
{
|
||||
protected $frequency = 10;
|
||||
|
||||
Reference in New Issue
Block a user