package io.hekate.coordinate.internal;

import io.hekate.cluster.ClusterNode;
import io.hekate.cluster.ClusterTopology;
import io.hekate.coordinate.CoordinationMember;
import io.hekate.coordinate.CoordinationRequestCallback;
import io.hekate.coordinate.internal.CoordinationProtocol;
import io.hekate.core.internal.util.ArgAssert;
import io.hekate.messaging.MessagingChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/hekate/coordinate/internal/DefaultCoordinationMember.class */
public class DefaultCoordinationMember implements CoordinationMember {
    private static final Logger log;
    private static final boolean DEBUG;
    private final Set<Future<?>> requests = Collections.newSetFromMap(new IdentityHashMap());
    private final Object mux = new Object();
    private final String process;
    private final ClusterNode remoteNode;
    private final ClusterNode localNode;
    private final CoordinationEpoch epoch;
    private final ClusterTopology topology;
    private final boolean coordinator;
    private final MessagingChannel<CoordinationProtocol> channel;
    private final ExecutorService async;
    private volatile boolean disposed;
    static final /* synthetic */ boolean $assertionsDisabled;

    public DefaultCoordinationMember(String str, ClusterNode clusterNode, CoordinationEpoch coordinationEpoch, ClusterTopology clusterTopology, MessagingChannel<CoordinationProtocol> messagingChannel, ExecutorService executorService) {
        if (!$assertionsDisabled && str == null) {
            throw new AssertionError("Coordination process name is null.");
        }
        if (!$assertionsDisabled && clusterNode == null) {
            throw new AssertionError("Remote node is null.");
        }
        if (!$assertionsDisabled && coordinationEpoch == null) {
            throw new AssertionError("Epoch is null.");
        }
        if (!$assertionsDisabled && clusterTopology == null) {
            throw new AssertionError("Topology is null.");
        }
        if (!$assertionsDisabled && messagingChannel == null) {
            throw new AssertionError("Channel is null.");
        }
        if (!$assertionsDisabled && executorService == null) {
            throw new AssertionError("Executor service is null.");
        }
        this.process = str;
        this.topology = clusterTopology;
        this.localNode = clusterTopology.localNode();
        this.remoteNode = clusterNode;
        this.epoch = coordinationEpoch;
        this.coordinator = clusterNode.id().equals(coordinationEpoch.coordinator());
        this.async = executorService;
        this.channel = (MessagingChannel) messagingChannel.forNode(clusterNode);
    }

    @Override // io.hekate.coordinate.CoordinationMember
    public boolean isCoordinator() {
        return this.coordinator;
    }

    @Override // io.hekate.coordinate.CoordinationMember
    public ClusterNode node() {
        return this.remoteNode;
    }

    @Override // io.hekate.coordinate.CoordinationMember
    public void request(Object obj, CoordinationRequestCallback coordinationRequestCallback) {
        ArgAssert.notNull(obj, "Request");
        ArgAssert.notNull(coordinationRequestCallback, "Callback");
        doRequest(new CoordinationProtocol.Request(this.process, this.localNode.id(), this.epoch, obj), coordinationRequestCallback);
    }

    public void sendPrepare(CoordinationRequestCallback coordinationRequestCallback) {
        doRequest(new CoordinationProtocol.Prepare(this.process, this.localNode.id(), this.epoch, this.topology.hash()), coordinationRequestCallback);
    }

    public void sendComplete(CoordinationRequestCallback coordinationRequestCallback) {
        doRequest(new CoordinationProtocol.Complete(this.process, this.localNode.id(), this.epoch), coordinationRequestCallback);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v12, types: [java.util.List] */
    public void dispose() {
        ArrayList arrayList;
        synchronized (this.mux) {
            if (this.disposed) {
                arrayList = Collections.emptyList();
            } else {
                this.disposed = true;
                arrayList = new ArrayList(this.requests);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        arrayList.forEach(future -> {
            future.cancel(false);
        });
    }

    private void doRequest(CoordinationProtocol.RequestBase requestBase, CoordinationRequestCallback coordinationRequestCallback) {
        if (DEBUG) {
            log.debug("Sending coordination request [to={}, message={}]", this.remoteNode, requestBase);
        }
        CompletableFuture<Object> newRequestFuture = newRequestFuture(requestBase, coordinationRequestCallback);
        if (newRequestFuture.isDone()) {
            return;
        }
        this.channel.newRequest(requestBase).withAffinity(this.process).withRetry(requestRetryPolicy -> {
            requestRetryPolicy.unlimitedAttempts().alwaysTrySameNode().whileTrue(() -> {
                return (this.disposed || newRequestFuture.isDone()) ? false : true;
            }).whileResponse(response -> {
                if (response.is(CoordinationProtocol.Reject.class)) {
                    if (!DEBUG) {
                        return true;
                    }
                    log.debug("Got a reject [from={}, request={}]", this.remoteNode, requestBase);
                    return true;
                }
                if (response.is(CoordinationProtocol.Confirm.class)) {
                    if (DEBUG) {
                        log.debug("Got a confirmation [from={}, request={}]", this.remoteNode, requestBase);
                    }
                    newRequestFuture.complete(null);
                    return false;
                }
                CoordinationProtocol.Response response = (CoordinationProtocol.Response) response.payload(CoordinationProtocol.Response.class);
                if (DEBUG) {
                    log.debug("Got a response [from={}, response={}]", this.remoteNode, response.response());
                }
                newRequestFuture.complete(response.response());
                return false;
            });
        }).submit((th, response) -> {
            unregister(newRequestFuture);
            if (th != null) {
                newRequestFuture.completeExceptionally(th);
            }
        });
    }

    private CompletableFuture<Object> newRequestFuture(CoordinationProtocol.RequestBase requestBase, CoordinationRequestCallback coordinationRequestCallback) {
        CompletableFuture<Object> completableFuture = new CompletableFuture<>();
        completableFuture.whenCompleteAsync((obj, th) -> {
            if (th == null) {
                try {
                    if (!this.disposed) {
                        if (DEBUG) {
                            log.debug("Received coordination response [from={}, message={}]", this.remoteNode, obj);
                        }
                        coordinationRequestCallback.onResponse(obj, this);
                    }
                } catch (Error | RuntimeException e) {
                    log.error("Got an unexpected runtime error while notifying coordination request callback.", e);
                    return;
                }
            }
            if (DEBUG) {
                log.debug("Canceled coordination request [to={}, message={}]", this.remoteNode, requestBase);
            }
            coordinationRequestCallback.onCancel();
        }, (Executor) this.async);
        return tryRegisterOrCancel(completableFuture);
    }

    private CompletableFuture<Object> tryRegisterOrCancel(CompletableFuture<Object> completableFuture) {
        if (!tryRegister(completableFuture)) {
            completableFuture.cancel(false);
        }
        return completableFuture;
    }

    private boolean tryRegister(CompletableFuture<Object> completableFuture) {
        boolean z;
        synchronized (this.mux) {
            z = !this.disposed && this.requests.add(completableFuture);
        }
        return z;
    }

    private void unregister(CompletableFuture<Object> completableFuture) {
        synchronized (this.mux) {
            this.requests.remove(completableFuture);
        }
    }

    public String toString() {
        return this.remoteNode.toString();
    }

    static {
        $assertionsDisabled = !DefaultCoordinationMember.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(DefaultCoordinationMember.class);
        DEBUG = log.isDebugEnabled();
    }
}
