Files
ulthon_admin/extend/base/common/command/TimerBase.php

228 lines
8.2 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
declare(strict_types=1);
namespace base\common\command;
use app\common\service\TimerService;
use GuzzleHttp\Client;
use GuzzleHttp\Promise\Utils;
use think\console\Command;
use think\console\Input;
use think\console\input\Option;
use think\console\Output;
use think\facade\Cache;
use think\facade\Config;
use think\facade\Log;
use Workerman\Http\Client as HttpClient;
use Workerman\Timer;
use Workerman\Worker;
class TimerBase extends Command
{
protected $host;
protected $siteDomain;
protected $siteHost;
protected $requestList;
protected function configure()
{
// 指令配置
$this->setName('timer')
->addOption('temp', null, Option::VALUE_NONE)
->addOption('quit', null, Option::VALUE_NONE)
->addOption('local', null, Option::VALUE_NONE)
->addOption('local-host', null, Option::VALUE_OPTIONAL, '本地域名', 'http://localhost')
->addOption('local-port', null, Option::VALUE_OPTIONAL, '本地端口', '80')
->setDescription('内置秒级定时器');
}
protected function execute(Input $input, Output $output)
{
// 指令输出
$output->writeln('start timer');
$site_domain = sysconfig('site', 'site_domain');
if (empty($site_domain)) {
$output->writeln('请前往后台设置站点域名site_domain配置项');
return;
}
$host = $site_domain;
if ($input->hasOption('local')) {
$host = $input->getOption('local-host') . ':' . $input->getOption('local-port');
}
$output->writeln('站点域名:' . $host);
$site_host = parse_url($host, PHP_URL_HOST);
$request_list = TimerService::generateAllRequestList();
$system_host_register =
[
'name' => 'system_host_register', // 定时任务的名称,不能重复
'type' => 'site', // 定时任务的类型默认只支持site你也可以重写定时器命令行以支持其他命令
'target' => '/tools/timer.SystemHost/do', // 要访问的地址如果不是以https开头那么以后台的系统配置中读取相关配置如果没有配置则不执行
'frequency' => 30, // 执行频率单位填写10则每10秒过后执行一次
];
$system_host_register = TimerService::initConfigItem($system_host_register);
$system_host_request_list = TimerService::generateRequestListFromConfig($system_host_register);
$request_list = array_merge($request_list, $system_host_request_list);
$this->host = $host;
$this->siteDomain = $site_domain;
$this->siteHost = $site_host;
$this->requestList = $request_list;
$timer_mode = Config::get('timer.mode', 'normal');
if ($timer_mode == 'normal') {
$this->runNormal();
} else {
$this->runParallel();
}
}
public function runParallel()
{
// 重新构造命令行参数,以便兼容workerman的命令
global $argv;
$argv = [];
array_unshift($argv, 'think', 'start');
$host = $this->host;
$site_host = $this->siteHost;
$output = $this->output;
$input = $this->input;
$worker = new Worker();
$worker->count = 1;
$worker->name = 'start_timer_parallel';
$worker->timerRequestList = $this->requestList;
$worker->onWorkerStart = function () use ($worker, $host, $site_host, $output, $input) {
$options = [
'max_conn_per_addr' => Config::get('timer.max_conn_per_addr', 1000),
'keepalive_timeout' => Config::get('timer.keepalive_timeout', 86400),
'connect_timeout' => Config::get('timer.connect_timeout', 86400),
'timeout' => Config::get('timer.timeout', 86400),
];
$http = new HttpClient($options);
Timer::add(1, function () use ($worker, $host, $site_host, $output, $input, $http) {
$request_list = $worker->timerRequestList;
foreach ($request_list as $request_item) {
$output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']);
if (!isset($request_item['is_running'])) {
$request_item['is_running'] = false;
}
if ($request_item['is_running']) {
$output->writeln('进行中,跳过');
continue;
}
if (!isset($request_item['last_run_time'])) {
$request_item['last_run_time'] = 0;
}
if (time() - $request_item['last_run_time'] < $request_item['frequency']) {
continue;
}
$request_item['is_running'] = true;
$request_item['last_run_time'] = time();
//
$http->request($host . $request_item['target'], [
'headers' => [
'Host' => $site_host,
'Accept' => 'application/json,text/plain',
],
'success' => function ($response) use ($request_item, $output) {
$request_item['is_running'] = false;
$output->writeln((string) $response->getBody());
},
'error' => function ($exception) use ($request_item, $output) {
$request_item['is_running'] = false;
$output->writeln($exception);
},
]);
}
});
};
Worker::runAll();
}
public function runNormal()
{
$host = $this->host;
$site_host = $this->siteHost;
$request_list = $this->requestList;
$output = $this->output;
$input = $this->input;
$client = new Client([
'base_uri' => $host,
'headers' => [
'Host' => $site_host,
'Accept' => 'application/json,text/plain',
],
'verify' => false,
]);
while (true) {
try {
$list_promises = [];
foreach ($request_list as $request_item) {
$name = $request_item['name'];
$cache_key = 'timer_' . $name . '_' . $request_item['concurrency_id'];
$cache_tag = 'system_timer';
$last_exec_time = Cache::get($cache_key, 0);
if ($last_exec_time >= time() - $request_item['frequency']) {
continue;
}
Cache::tag($cache_tag)->set($cache_key, time());
$type = $request_item['type'];
switch ($type) {
case 'site':
$output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']);
$list_promises[$request_item['name']] = $client->getAsync($request_item['target']);
break;
default:
$output->writeln(date('Y-m-d H:i:s') . 'unsupport type:' . $type);
break;
}
}
if (empty($list_promises)) {
if (!$input->hasOption('quit')) {
$output->writeln(date('Y-m-d H:i:s') . ' no request');
}
} else {
$results = Utils::unwrap($list_promises);
$output->writeln(date('Y-m-d H:i:s') . ': request all finished');
}
} catch (\Throwable $th) {
// throw $th;
$output->writeln('error:' . $th->getMessage());
Log::error($th->getMessage());
}
if ($input->hasOption('temp')) {
break;
}
sleep(1);
}
}
}