package io.hekate.metrics.cluster.internal;

import io.hekate.cluster.ClusterNode;
import io.hekate.cluster.ClusterNodeFilter;
import io.hekate.cluster.ClusterNodeId;
import io.hekate.cluster.ClusterService;
import io.hekate.cluster.ClusterView;
import io.hekate.cluster.health.DefaultFailureDetectorConfig;
import io.hekate.core.HekateException;
import io.hekate.core.internal.util.ArgAssert;
import io.hekate.core.internal.util.ConfigCheck;
import io.hekate.core.internal.util.HekateThreadFactory;
import io.hekate.core.service.DependencyContext;
import io.hekate.core.service.DependentService;
import io.hekate.core.service.InitializationContext;
import io.hekate.core.service.InitializingService;
import io.hekate.core.service.TerminatingService;
import io.hekate.messaging.Message;
import io.hekate.messaging.MessagingChannel;
import io.hekate.messaging.MessagingChannelConfig;
import io.hekate.messaging.MessagingConfigProvider;
import io.hekate.messaging.MessagingService;
import io.hekate.metrics.Metric;
import io.hekate.metrics.MetricFilter;
import io.hekate.metrics.MetricValue;
import io.hekate.metrics.cluster.ClusterMetricsService;
import io.hekate.metrics.cluster.ClusterMetricsServiceFactory;
import io.hekate.metrics.cluster.ClusterNodeMetrics;
import io.hekate.metrics.cluster.internal.MetricsProtocol;
import io.hekate.metrics.local.LocalMetricsService;
import io.hekate.util.StateGuard;
import io.hekate.util.async.AsyncUtils;
import io.hekate.util.async.Waiting;
import io.hekate.util.format.ToString;
import io.hekate.util.format.ToStringIgnore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hekate/metrics/cluster/internal/DefaultClusterMetricsService.class */
public class DefaultClusterMetricsService implements ClusterMetricsService, DependentService, InitializingService, TerminatingService, MessagingConfigProvider {
    private static final String CHANNEL_NAME = "hekate.metrics";
    private static final Logger log;
    private static final boolean DEBUG;
    private static final int IDLE_TIMEOUT_MULTIPLY = 3;
    private static final ClusterNodeFilter METRICS_SUPPORT_FILTER;
    private final boolean enabled;
    private final long replicationInterval;
    private final MetricFilter filter;

    @ToStringIgnore
    private final StateGuard guard = new StateGuard(ClusterService.class);

    @ToStringIgnore
    private final Map<ClusterNodeId, Replica> replicas = new HashMap();

    @ToStringIgnore
    private final AtomicLong localVerSeq = new AtomicLong();

    @ToStringIgnore
    private LocalMetricsService localMetrics;

    @ToStringIgnore
    private MessagingService messaging;

    @ToStringIgnore
    private MessagingChannel<MetricsProtocol> channel;

    @ToStringIgnore
    private ReplicationTarget next;

    @ToStringIgnore
    private ClusterView cluster;

    @ToStringIgnore
    private ScheduledExecutorService worker;

    @ToStringIgnore
    private ClusterNodeId localNode;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* renamed from: io.hekate.metrics.cluster.internal.DefaultClusterMetricsService$1, reason: invalid class name */
    /* loaded from: input_file:io/hekate/metrics/cluster/internal/DefaultClusterMetricsService$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$hekate$metrics$cluster$internal$MetricsProtocol$Type = new int[MetricsProtocol.Type.values().length];

        static {
            try {
                $SwitchMap$io$hekate$metrics$cluster$internal$MetricsProtocol$Type[MetricsProtocol.Type.UPDATE_REQUEST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$hekate$metrics$cluster$internal$MetricsProtocol$Type[MetricsProtocol.Type.UPDATE_RESPONSE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hekate/metrics/cluster/internal/DefaultClusterMetricsService$Replica.class */
    public static class Replica {
        private final ClusterNode node;
        private long version;
        private Map<String, MetricValue> metrics;
        private volatile Optional<ClusterNodeMetrics> publicMetrics = Optional.empty();

        public Replica(ClusterNode clusterNode) {
            this.node = clusterNode;
        }

        public ClusterNode node() {
            return this.node;
        }

        public long version() {
            return this.version;
        }

        public Map<String, MetricValue> metrics() {
            return this.metrics;
        }

        public void updateMetrics(long j, Map<String, MetricValue> map) {
            this.version = j;
            this.metrics = map;
            this.publicMetrics = Optional.of(new DefaultClusterNodeMetrics(node(), Collections.unmodifiableMap(new HashMap(map))));
        }

        public Optional<ClusterNodeMetrics> publicMetrics() {
            return this.publicMetrics;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hekate/metrics/cluster/internal/DefaultClusterMetricsService$ReplicationTarget.class */
    public static class ReplicationTarget {
        private final ClusterNodeId to;
        private final MessagingChannel<MetricsProtocol> channel;
        private final ClusterNodeId localNode;
        private final Map<ClusterNodeId, Long> sentVersions = new HashMap();

        public ReplicationTarget(ClusterNodeId clusterNodeId, MessagingChannel<MetricsProtocol> messagingChannel, ClusterNodeId clusterNodeId2) {
            this.to = clusterNodeId;
            this.channel = messagingChannel;
            this.localNode = clusterNodeId2;
        }

        public ClusterNodeId to() {
            return this.to;
        }

        public void send(Collection<Replica> collection) {
            ArrayList arrayList = new ArrayList(collection.size());
            long j = -1;
            synchronized (this.sentVersions) {
                for (Replica replica : collection) {
                    if (replica.node().id().equals(this.to)) {
                        j = replica.version();
                    } else {
                        synchronized (replica) {
                            if (replica.metrics() != null && !replica.metrics().isEmpty()) {
                                ClusterNodeId id = replica.node().id();
                                Long l = this.sentVersions.get(id);
                                long version = replica.version();
                                if (l == null || l.longValue() < version) {
                                    this.sentVersions.put(id, Long.valueOf(version));
                                    arrayList.add(DefaultClusterMetricsService.newUpdate(replica));
                                }
                            }
                        }
                    }
                }
            }
            if (arrayList.isEmpty()) {
                return;
            }
            if (DefaultClusterMetricsService.DEBUG) {
                DefaultClusterMetricsService.log.debug("Sending metrics update [to={}, updates={}]", this.to, arrayList);
            }
            this.channel.send(new MetricsProtocol.UpdateRequest(this.localNode, j, arrayList));
        }

        public void update(Replica replica) {
            synchronized (this.sentVersions) {
                synchronized (replica) {
                    ClusterNodeId id = replica.node().id();
                    Long l = this.sentVersions.get(id);
                    if (l == null || l.longValue() < replica.version()) {
                        this.sentVersions.put(id, Long.valueOf(replica.version()));
                    }
                }
            }
        }
    }

    public DefaultClusterMetricsService(ClusterMetricsServiceFactory clusterMetricsServiceFactory) {
        if (!$assertionsDisabled && clusterMetricsServiceFactory == null) {
            throw new AssertionError("Factory is null.");
        }
        ConfigCheck.get(ClusterMetricsServiceFactory.class).positive(clusterMetricsServiceFactory.getReplicationInterval(), "replication interval");
        this.enabled = clusterMetricsServiceFactory.isEnabled();
        this.replicationInterval = clusterMetricsServiceFactory.getReplicationInterval();
        this.filter = clusterMetricsServiceFactory.getReplicationFilter();
    }

    @Override // io.hekate.core.service.DependentService
    public void resolve(DependencyContext dependencyContext) {
        this.localMetrics = (LocalMetricsService) dependencyContext.require(LocalMetricsService.class);
        this.messaging = (MessagingService) dependencyContext.require(MessagingService.class);
        this.cluster = ((ClusterService) dependencyContext.require(ClusterService.class)).filter(METRICS_SUPPORT_FILTER);
    }

    @Override // io.hekate.messaging.MessagingConfigProvider
    public Collection<MessagingChannelConfig<?>> configureMessaging() {
        return this.enabled ? Collections.singleton(MessagingChannelConfig.of(MetricsProtocol.class).withName(CHANNEL_NAME).withLogCategory(getClass().getName()).withMessageCodec(MetricsProtocolCodec::new).withIdleSocketTimeout(this.replicationInterval * 3).withClusterFilter(METRICS_SUPPORT_FILTER).withWorkerThreads(1).withReceiver(this::handleMessage)) : Collections.emptyList();
    }

    @Override // io.hekate.core.service.InitializingService
    public void initialize(InitializationContext initializationContext) throws HekateException {
        if (this.enabled) {
            this.guard.lockWrite();
            try {
                this.guard.becomeInitialized();
                if (DEBUG) {
                    log.debug("Initializing...");
                }
                this.channel = this.messaging.channel(CHANNEL_NAME, MetricsProtocol.class);
                this.localNode = initializationContext.localNode().id();
                this.cluster.addListener(clusterEvent -> {
                    updateTopology(clusterEvent.topology().nodes());
                });
                this.localMetrics.addListener(metricsUpdateEvent -> {
                    try {
                        updateLocalMetrics(metricsUpdateEvent.allMetrics());
                    } catch (Error | RuntimeException e) {
                        log.error("Got an unexpected runtime error while updating local metrics.", e);
                    }
                });
                this.worker = Executors.newSingleThreadScheduledExecutor(new HekateThreadFactory("ClusterMetrics"));
                this.worker.scheduleAtFixedRate(() -> {
                    try {
                        publishMetrics();
                    } catch (Error | RuntimeException e) {
                        log.error("Got an unexpected runtime error while publishing metrics.", e);
                    }
                }, this.replicationInterval, this.replicationInterval, TimeUnit.MILLISECONDS);
                if (DEBUG) {
                    log.debug("Initialized.");
                }
            } finally {
                this.guard.unlockWrite();
            }
        }
    }

    @Override // io.hekate.core.service.TerminatingService
    public void terminate() throws HekateException {
        if (this.enabled) {
            Waiting waiting = null;
            this.guard.lockWrite();
            try {
                if (this.guard.becomeTerminated()) {
                    if (DEBUG) {
                        log.debug("Terminating...");
                    }
                    if (this.worker != null) {
                        waiting = AsyncUtils.shutdown(this.worker);
                        this.worker = null;
                    }
                    this.replicas.clear();
                    this.worker = null;
                    this.next = null;
                    this.channel = null;
                    this.localNode = null;
                }
                if (waiting != null) {
                    waiting.awaitUninterruptedly();
                    if (DEBUG) {
                        log.debug("Terminated.");
                    }
                }
            } finally {
                this.guard.unlockWrite();
            }
        }
    }

    @Override // io.hekate.metrics.cluster.ClusterMetricsService
    public boolean isEnabled() {
        return this.enabled;
    }

    @Override // io.hekate.metrics.cluster.ClusterMetricsService
    public Optional<ClusterNodeMetrics> of(ClusterNodeId clusterNodeId) {
        ArgAssert.notNull(clusterNodeId, "Node");
        if (!this.enabled) {
            return Optional.empty();
        }
        this.guard.lockRead();
        try {
            Replica replica = this.replicas.get(clusterNodeId);
            if (replica != null) {
                Optional<ClusterNodeMetrics> publicMetrics = replica.publicMetrics();
                this.guard.unlockRead();
                return publicMetrics;
            }
            Optional<ClusterNodeMetrics> empty = Optional.empty();
            this.guard.unlockRead();
            return empty;
        } catch (Throwable th) {
            this.guard.unlockRead();
            throw th;
        }
    }

    @Override // io.hekate.metrics.cluster.ClusterMetricsService
    public Optional<ClusterNodeMetrics> of(ClusterNode clusterNode) {
        ArgAssert.notNull(clusterNode, "Node");
        return of(clusterNode.id());
    }

    @Override // io.hekate.metrics.cluster.ClusterMetricsService
    public List<ClusterNodeMetrics> all() {
        if (!this.enabled) {
            return Collections.emptyList();
        }
        this.guard.lockRead();
        try {
            ArrayList arrayList = new ArrayList(this.replicas.size());
            this.replicas.values().forEach(replica -> {
                Optional<ClusterNodeMetrics> publicMetrics = replica.publicMetrics();
                arrayList.getClass();
                publicMetrics.ifPresent((v1) -> {
                    r1.add(v1);
                });
            });
            return arrayList;
        } finally {
            this.guard.unlockRead();
        }
    }

    @Override // io.hekate.metrics.cluster.ClusterMetricsService
    public List<ClusterNodeMetrics> all(MetricFilter metricFilter) {
        ArgAssert.notNull(metricFilter, "Filter");
        if (!this.enabled) {
            return Collections.emptyList();
        }
        this.guard.lockRead();
        try {
            ArrayList arrayList = new ArrayList(this.replicas.size());
            this.replicas.values().forEach(replica -> {
                replica.publicMetrics().ifPresent(clusterNodeMetrics -> {
                    Iterator<Metric> it = clusterNodeMetrics.allMetrics().values().iterator();
                    while (it.hasNext()) {
                        if (metricFilter.accept(it.next())) {
                            arrayList.add(clusterNodeMetrics);
                            return;
                        }
                    }
                });
            });
            this.guard.unlockRead();
            return arrayList;
        } catch (Throwable th) {
            this.guard.unlockRead();
            throw th;
        }
    }

    private void publishMetrics() {
        this.guard.lockRead();
        try {
            if (this.guard.isInitialized() && this.next != null) {
                this.next.send(this.replicas.values());
            }
        } finally {
            this.guard.unlockRead();
        }
    }

    private void updateLocalMetrics(Map<String, Metric> map) {
        this.guard.lockWrite();
        try {
            if (this.guard.isInitialized()) {
                doUpdateLocalMetrics(map);
            }
        } finally {
            this.guard.unlockWrite();
        }
    }

    private void doUpdateLocalMetrics(Map<String, Metric> map) {
        if (!$assertionsDisabled && !this.guard.isWriteLocked()) {
            throw new AssertionError("Thread must hold write lock.");
        }
        Collection<Metric> values = map.values();
        HashMap hashMap = new HashMap(values.size(), 1.0f);
        values.forEach(metric -> {
            if (this.filter == null || this.filter.accept(metric)) {
                String name = metric.name();
                hashMap.put(name, new MetricValue(name, metric.value()));
            }
        });
        Replica replica = this.replicas.get(this.localNode);
        if (replica != null) {
            long incrementAndGet = this.localVerSeq.incrementAndGet();
            synchronized (replica) {
                replica.updateMetrics(incrementAndGet, hashMap);
            }
        }
    }

    private void updateTopology(List<ClusterNode> list) {
        this.guard.lockWrite();
        try {
            if (this.guard.isInitialized()) {
                Set set = (Set) list.stream().map((v0) -> {
                    return v0.id();
                }).collect(Collectors.toSet());
                this.replicas.keySet().retainAll(set);
                boolean isEmpty = this.replicas.isEmpty();
                list.stream().filter(clusterNode -> {
                    return !this.replicas.containsKey(clusterNode.id());
                }).forEach(clusterNode2 -> {
                    this.replicas.put(clusterNode2.id(), new Replica(clusterNode2));
                });
                if (isEmpty) {
                    doUpdateLocalMetrics(this.localMetrics.allMetrics());
                }
                TreeSet treeSet = new TreeSet(set);
                ClusterNodeId clusterNodeId = (ClusterNodeId) treeSet.tailSet(this.localNode, false).stream().findFirst().orElse(treeSet.headSet(this.localNode, false).stream().findFirst().orElse(null));
                if (clusterNodeId == null) {
                    if (DEBUG && this.next != null) {
                        log.debug("Stopped metrics replication [to={}]", this.next.to());
                    }
                    this.next = null;
                } else if (this.next == null || !this.next.to().equals(clusterNodeId)) {
                    if (DEBUG) {
                        log.debug("Selected new replication target [node={}, ring={}]", clusterNodeId, treeSet);
                    }
                    this.next = new ReplicationTarget(clusterNodeId, this.channel.forNode(clusterNodeId), this.localNode);
                }
            }
        } finally {
            this.guard.unlockWrite();
        }
    }

    private void handleMessage(Message<MetricsProtocol> message) {
        this.guard.lockRead();
        try {
            if (this.guard.isInitialized() && !this.replicas.isEmpty()) {
                MetricsProtocol metricsProtocol = message.get();
                switch (AnonymousClass1.$SwitchMap$io$hekate$metrics$cluster$internal$MetricsProtocol$Type[metricsProtocol.type().ordinal()]) {
                    case 1:
                        MetricsProtocol.UpdateRequest updateRequest = (MetricsProtocol.UpdateRequest) metricsProtocol;
                        List<MetricsUpdate> processUpdates = processUpdates(metricsProtocol.from(), updateRequest.updates(), true, updateRequest.targetVer());
                        if (!processUpdates.isEmpty()) {
                            if (DEBUG) {
                                log.debug("Sending push back updates [from={}, metrics={}]", metricsProtocol.from(), processUpdates);
                            }
                            message.endpoint().channel().forNode(metricsProtocol.from()).send(new MetricsProtocol.UpdateResponse(this.localNode, processUpdates));
                            break;
                        }
                        break;
                    case DefaultFailureDetectorConfig.DEFAULT_FAILURE_DETECTION_QUORUM /* 2 */:
                        processUpdates(metricsProtocol.from(), ((MetricsProtocol.UpdateResponse) metricsProtocol).metrics(), false, -1L);
                        break;
                    default:
                        throw new IllegalArgumentException("Unexpected message type: " + metricsProtocol.type());
                }
            }
        } finally {
            this.guard.unlockRead();
        }
    }

    private List<MetricsUpdate> processUpdates(ClusterNodeId clusterNodeId, List<MetricsUpdate> list, boolean z, long j) {
        ArrayList arrayList;
        if (z) {
            arrayList = new ArrayList(list.size());
            Replica replica = this.replicas.get(this.localNode);
            synchronized (replica) {
                if (j != replica.version()) {
                    arrayList.add(newUpdate(replica));
                }
            }
        } else {
            arrayList = null;
        }
        ArrayList arrayList2 = arrayList;
        list.stream().filter(metricsUpdate -> {
            return !metricsUpdate.node().equals(this.localNode);
        }).forEach(metricsUpdate2 -> {
            ClusterNodeId node = metricsUpdate2.node();
            Replica replica2 = this.replicas.get(node);
            if (replica2 != null) {
                synchronized (replica2) {
                    long version = metricsUpdate2.version();
                    if (version > replica2.version()) {
                        Map<String, MetricValue> metrics = metricsUpdate2.metrics();
                        if (DEBUG) {
                            log.debug("Updating metrics [node={}, metrics={}]", node, metrics);
                        }
                        replica2.updateMetrics(version, metrics);
                        if (this.next != null && clusterNodeId.equals(this.next.to())) {
                            this.next.update(replica2);
                        }
                    } else if (version < replica2.version() && arrayList2 != null) {
                        arrayList2.add(newUpdate(replica2));
                    }
                }
            }
        });
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static MetricsUpdate newUpdate(Replica replica) {
        return new MetricsUpdate(replica.node().id(), replica.version(), replica.metrics());
    }

    public String toString() {
        return ToString.format(ClusterMetricsService.class, this);
    }

    static {
        $assertionsDisabled = !DefaultClusterMetricsService.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(DefaultClusterMetricsService.class);
        DEBUG = log.isDebugEnabled();
        METRICS_SUPPORT_FILTER = clusterNode -> {
            return clusterNode.hasService(ClusterMetricsService.class);
        };
    }
}
