Files
FendxPHP/fendx-framework/fendx-service/src/Discovery/ServiceDiscovery.php
Lawson 2782d765fb feat(database): 添加用户角色权限系统及相关监控功能
- 创建用户表(users)包含基本信息和认证字段
- 创建角色表(roles)用于权限控制
- 创建权限表(permissions)定义系统权限
- 创建用户角色关联表(user_roles)建立用户与角色关系
- 创建角色权限关联表(role_permissions)建立角色与权限关系
- 创建迁移记录表(migrations)追踪数据库变更
- 添加AdminController提供管理员面板功能
- 实现系统监控、配置管理、缓存清理等功能
- 添加AOP切面编程支持的各种通知类型
- 实现告警管理AlertManager支持多渠道告警
- 添加文档注解接口规范
2026-04-08 17:00:28 +08:00

686 lines
19 KiB
PHP

<?php
declare(strict_types=1);
namespace Fendx\Service\Discovery;
use Fendx\Service\Discovery\Resolver\ServiceResolver;
use Fendx\Service\Discovery\Cache\DiscoveryCache;
use Fendx\Service\Discovery\Watcher\ServiceWatcher;
use Fendx\Service\Discovery\LoadBalancer\LoadBalancer;
class ServiceDiscovery
{
protected ServiceResolver $resolver;
protected DiscoveryCache $cache;
protected ServiceWatcher $watcher;
protected LoadBalancer $loadBalancer;
protected array $config = [];
protected array $discoveredServices = [];
protected array $serviceEndpoints = [];
protected array $watchers = [];
public function __construct(array $config = [])
{
$this->config = array_merge($this->getDefaultConfig(), $config);
$this->resolver = new ServiceResolver($this->config);
$this->cache = new DiscoveryCache($this->config);
$this->watcher = new ServiceWatcher($this->config);
$this->loadBalancer = new LoadBalancer($this->config);
$this->initialize();
}
/**
* Discover service instances.
*/
public function discover(string $serviceName, array $options = []): array
{
$cacheKey = $this->generateCacheKey($serviceName, $options);
// Check cache first
if ($this->config['cache_enabled']) {
$cached = $this->cache->get($cacheKey);
if ($cached !== null) {
$this->logDebug("Service discovered from cache: {$serviceName}");
return $cached;
}
}
// Discover from resolver
$instances = $this->resolver->resolve($serviceName, $options);
// Filter and validate instances
$validInstances = $this->filterValidInstances($instances);
// Cache results
if ($this->config['cache_enabled'] && !empty($validInstances)) {
$ttl = $options['cache_ttl'] ?? $this->config['default_cache_ttl'];
$this->cache->set($cacheKey, $validInstances, $ttl);
}
$this->discoveredServices[$serviceName] = $validInstances;
$this->logInfo("Discovered " . count($validInstances) . " instances for service: {$serviceName}");
return $validInstances;
}
/**
* Get a single service instance.
*/
public function getInstance(string $serviceName, array $options = []): ?array
{
$instances = $this->discover($serviceName, $options);
if (empty($instances)) {
return null;
}
// Use load balancer to select instance
$strategy = $options['load_balancing'] ?? $this->config['default_load_balancing'];
return $this->loadBalancer->select($instances, $strategy);
}
/**
* Get service URL.
*/
public function getServiceUrl(string $serviceName, array $options = []): ?string
{
$instance = $this->getInstance($serviceName, $options);
if (!$instance) {
return null;
}
return $this->buildServiceUrl($instance, $options);
}
/**
* Discover multiple services.
*/
public function discoverMultiple(array $serviceNames, array $options = []): array
{
$results = [];
foreach ($serviceNames as $serviceName) {
$results[$serviceName] = $this->discover($serviceName, $options);
}
return $results;
}
/**
* Watch for service changes.
*/
public function watch(string $serviceName, callable $callback, array $options = []): string
{
$watchId = $this->generateWatchId($serviceName);
$this->watchers[$watchId] = [
'service_name' => $serviceName,
'callback' => $callback,
'options' => $options,
'last_instances' => $this->discover($serviceName, $options),
'created_at' => time()
];
$this->watcher->startWatching($serviceName, $callback, $options);
$this->logInfo("Started watching service: {$serviceName} ({$watchId})");
return $watchId;
}
/**
* Stop watching service.
*/
public function stopWatching(string $watchId): bool
{
if (!isset($this->watchers[$watchId])) {
return false;
}
$watch = $this->watchers[$watchId];
$this->watcher->stopWatching($watch['service_name'], $watch['callback']);
unset($this->watchers[$watchId]);
$this->logInfo("Stopped watching service: {$watch['service_name']} ({$watchId})");
return true;
}
/**
* Get all discovered services.
*/
public function getDiscoveredServices(): array
{
return $this->discoveredServices;
}
/**
* Refresh service discovery.
*/
public function refresh(string $serviceName = null): void
{
if ($serviceName) {
// Clear cache for specific service
$this->clearServiceCache($serviceName);
// Rediscover
$this->discover($serviceName);
$this->logInfo("Refreshed service: {$serviceName}");
} else {
// Clear all cache
$this->cache->clear();
// Rediscover all services
$this->discoveredServices = [];
$this->logInfo("Refreshed all services");
}
}
/**
* Add service endpoint.
*/
public function addEndpoint(string $serviceName, array $endpoint): void
{
if (!isset($this->serviceEndpoints[$serviceName])) {
$this->serviceEndpoints[$serviceName] = [];
}
$this->serviceEndpoints[$serviceName][] = $endpoint;
// Clear cache to force rediscovery
$this->clearServiceCache($serviceName);
$this->logInfo("Added endpoint for service: {$serviceName}");
}
/**
* Remove service endpoint.
*/
public function removeEndpoint(string $serviceName, string $endpointId): bool
{
if (!isset($this->serviceEndpoints[$serviceName])) {
return false;
}
foreach ($this->serviceEndpoints[$serviceName] as $key => $endpoint) {
if ($endpoint['id'] === $endpointId) {
unset($this->serviceEndpoints[$serviceName][$key]);
$this->serviceEndpoints[$serviceName] = array_values($this->serviceEndpoints[$serviceName]);
// Clear cache to force rediscovery
$this->clearServiceCache($serviceName);
$this->logInfo("Removed endpoint from service: {$serviceName}");
return true;
}
}
return false;
}
/**
* Get service endpoints.
*/
public function getEndpoints(string $serviceName): array
{
return $this->serviceEndpoints[$serviceName] ?? [];
}
/**
* Check if service is available.
*/
public function isAvailable(string $serviceName, array $options = []): bool
{
$instances = $this->discover($serviceName, $options);
if (empty($instances)) {
return false;
}
// Check if any instance is healthy
foreach ($instances as $instance) {
if ($this->isInstanceHealthy($instance)) {
return true;
}
}
return false;
}
/**
* Get service health status.
*/
public function getHealthStatus(string $serviceName): array
{
$instances = $this->discover($serviceName);
if (empty($instances)) {
return [
'service' => $serviceName,
'status' => 'unknown',
'instances' => 0,
'healthy_instances' => 0,
'unhealthy_instances' => 0
];
}
$healthyCount = 0;
$instanceStatuses = [];
foreach ($instances as $instance) {
$isHealthy = $this->isInstanceHealthy($instance);
if ($isHealthy) {
$healthyCount++;
}
$instanceStatuses[] = [
'id' => $instance['id'],
'host' => $instance['host'],
'port' => $instance['port'],
'healthy' => $isHealthy,
'last_check' => time()
];
}
$status = $healthyCount === count($instances) ? 'healthy' :
($healthyCount > 0 ? 'degraded' : 'unhealthy');
return [
'service' => $serviceName,
'status' => $status,
'instances' => count($instances),
'healthy_instances' => $healthyCount,
'unhealthy_instances' => count($instances) - $healthyCount,
'instance_details' => $instanceStatuses
];
}
/**
* Get discovery statistics.
*/
public function getStatistics(): array
{
$totalServices = count($this->discoveredServices);
$totalInstances = 0;
$healthyInstances = 0;
$cacheStats = $this->cache->getStatistics();
foreach ($this->discoveredServices as $serviceName => $instances) {
$totalInstances += count($instances);
foreach ($instances as $instance) {
if ($this->isInstanceHealthy($instance)) {
$healthyInstances++;
}
}
}
return [
'total_services' => $totalServices,
'total_instances' => $totalInstances,
'healthy_instances' => $healthyInstances,
'unhealthy_instances' => $totalInstances - $healthyInstances,
'health_percentage' => $totalInstances > 0 ? ($healthyInstances / $totalInstances) * 100 : 0,
'active_watchers' => count($this->watchers),
'cache_stats' => $cacheStats,
'endpoints' => array_sum(array_map('count', $this->serviceEndpoints))
];
}
/**
* Set service priority.
*/
public function setPriority(string $serviceName, array $priorities): void
{
$this->loadBalancer->setPriorities($serviceName, $priorities);
// Clear cache to apply new priorities
$this->clearServiceCache($serviceName);
}
/**
* Get service priority.
*/
public function getPriority(string $serviceName): array
{
return $this->loadBalancer->getPriorities($serviceName);
}
/**
* Enable/disable service discovery.
*/
public function setEnabled(bool $enabled): void
{
$this->config['enabled'] = $enabled;
if (!$enabled) {
// Stop all watchers
foreach ($this->watchers as $watchId => $watch) {
$this->stopWatching($watchId);
}
}
$this->logInfo("Service discovery " . ($enabled ? 'enabled' : 'disabled'));
}
/**
* Check if discovery is enabled.
*/
public function isEnabled(): bool
{
return $this->config['enabled'] ?? true;
}
/**
* Clear service cache.
*/
protected function clearServiceCache(string $serviceName): void
{
if ($this->config['cache_enabled']) {
// Clear all cache keys for this service
$pattern = $this->generateCacheKey($serviceName);
$this->cache->clearPattern($pattern);
}
}
/**
* Filter valid instances.
*/
protected function filterValidInstances(array $instances): array
{
return array_filter($instances, function ($instance) {
// Check required fields
if (!isset($instance['host']) || !isset($instance['port'])) {
return false;
}
// Check if enabled
if (isset($instance['enabled']) && !$instance['enabled']) {
return false;
}
// Check health if required
if ($this->config['check_health'] && !$this->isInstanceHealthy($instance)) {
return false;
}
return true;
});
}
/**
* Check if instance is healthy.
*/
protected function isInstanceHealthy(array $instance): bool
{
// If instance has health status, use it
if (isset($instance['healthy'])) {
return $instance['healthy'];
}
// Otherwise, perform health check
return $this->performHealthCheck($instance);
}
/**
* Perform health check on instance.
*/
protected function performHealthCheck(array $instance): bool
{
$timeout = $this->config['health_check_timeout'] ?? 5;
try {
$context = stream_context_create([
'http' => [
'timeout' => $timeout,
'method' => 'GET'
]
]);
$url = $this->buildServiceUrl($instance) . '/health';
$response = @file_get_contents($url, false, $context);
if ($response === false) {
return false;
}
// Try to parse JSON response
$data = json_decode($response, true);
if ($data && isset($data['status'])) {
return $data['status'] === 'healthy' || $data['status'] === 'ok';
}
// If no JSON, consider any response as healthy
return true;
} catch (\Exception $e) {
return false;
}
}
/**
* Build service URL.
*/
protected function buildServiceUrl(array $instance, array $options = []): string
{
$protocol = $options['protocol'] ?? $instance['protocol'] ?? 'http';
$host = $instance['host'];
$port = $instance['port'];
$path = $options['path'] ?? $instance['path'] ?? '/';
$url = "{$protocol}://{$host}";
// Add port if not default
if (($protocol === 'http' && $port != 80) || ($protocol === 'https' && $port != 443)) {
$url .= ":{$port}";
}
$url .= $path;
return $url;
}
/**
* Generate cache key.
*/
protected function generateCacheKey(string $serviceName, array $options = []): string
{
$key = "service:{$serviceName}";
if (!empty($options)) {
ksort($options);
$key .= ':' . md5(serialize($options));
}
return $key;
}
/**
* Generate watch ID.
*/
protected function generateWatchId(string $serviceName): string
{
return $serviceName . '_' . uniqid();
}
/**
* Initialize discovery.
*/
protected function initialize(): void
{
// Initialize resolver
$this->resolver->initialize();
// Initialize cache
if ($this->config['cache_enabled']) {
$this->cache->initialize();
}
// Initialize watcher
$this->watcher->initialize();
// Start background tasks
if ($this->config['background_refresh']) {
$this->startBackgroundRefresh();
}
$this->logInfo("Service discovery initialized");
}
/**
* Start background refresh.
*/
protected function startBackgroundRefresh(): void
{
// This would typically be run as a background process
// For now, we'll just log that it would start
$this->logInfo("Background refresh started");
}
/**
* Log info message.
*/
protected function logInfo(string $message): void
{
if ($this->config['logging_enabled']) {
error_log("[ServiceDiscovery] {$message}");
}
}
/**
* Log debug message.
*/
protected function logDebug(string $message): void
{
if ($this->config['debug_enabled']) {
error_log("[ServiceDiscovery] DEBUG: {$message}");
}
}
/**
* Get default configuration.
*/
protected function getDefaultConfig(): array
{
return [
'enabled' => true,
'cache_enabled' => true,
'default_cache_ttl' => 60,
'check_health' => true,
'health_check_timeout' => 5,
'default_load_balancing' => 'round_robin',
'background_refresh' => true,
'refresh_interval' => 30,
'logging_enabled' => true,
'debug_enabled' => false,
'resolver' => [
'type' => 'consul',
'host' => 'localhost',
'port' => 8500
],
'cache' => [
'type' => 'redis',
'host' => 'localhost',
'port' => 6379,
'prefix' => 'discovery'
]
];
}
/**
* Get configuration.
*/
public function getConfig(): array
{
return $this->config;
}
/**
* Set configuration.
*/
public function setConfig(array $config): void
{
$this->config = array_merge($this->config, $config);
}
/**
* Create discovery instance.
*/
public static function create(array $config = []): self
{
return new self($config);
}
/**
* Create for Consul.
*/
public static function forConsul(string $host = 'localhost', int $port = 8500): self
{
return new self([
'resolver' => [
'type' => 'consul',
'host' => $host,
'port' => $port
]
]);
}
/**
* Create for Eureka.
*/
public static function forEureka(string $host = 'localhost', int $port = 8761): self
{
return new self([
'resolver' => [
'type' => 'eureka',
'host' => $host,
'port' => $port
]
]);
}
/**
* Create for Kubernetes.
*/
public static function forKubernetes(): self
{
return new self([
'resolver' => [
'type' => 'kubernetes',
'in_cluster' => true
]
]);
}
/**
* Create for development.
*/
public static function forDevelopment(): self
{
return new self([
'cache_enabled' => false,
'check_health' => false,
'background_refresh' => false,
'debug_enabled' => true
]);
}
/**
* Create for production.
*/
public static function forProduction(): self
{
return new self([
'cache_enabled' => true,
'default_cache_ttl' => 300,
'check_health' => true,
'health_check_timeout' => 3,
'background_refresh' => true,
'refresh_interval' => 60,
'logging_enabled' => false
]);
}
}