package io.hekate.messaging.internal;

import io.hekate.cluster.ClusterAddress;
import io.hekate.cluster.health.DefaultFailureDetectorConfig;
import io.hekate.codec.CodecException;
import io.hekate.core.internal.util.ErrorUtils;
import io.hekate.messaging.MessageQueueOverflowException;
import io.hekate.messaging.MessageReceiver;
import io.hekate.messaging.MessagingEndpoint;
import io.hekate.messaging.internal.MessagingProtocol;
import io.hekate.messaging.operation.SendCallback;
import io.hekate.network.NetworkEndpoint;
import io.hekate.network.NetworkFuture;
import io.hekate.network.NetworkMessage;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/hekate/messaging/internal/MessagingConnectionIn.class */
public class MessagingConnectionIn<T> extends MessagingConnection<T> {
    private final Logger log;
    private final MessagingExecutor async;
    private final MessageReceiver<T> receiver;
    private final SendPressureGuard sendPressure;
    private final MessageInterceptors<T> interceptors;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.hekate.messaging.internal.MessagingConnectionIn$1, reason: invalid class name */
    /* loaded from: input_file:io/hekate/messaging/internal/MessagingConnectionIn$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$hekate$messaging$internal$MessagingProtocol$Type = new int[MessagingProtocol.Type.values().length];

        static {
            try {
                $SwitchMap$io$hekate$messaging$internal$MessagingProtocol$Type[MessagingProtocol.Type.NOTIFICATION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$hekate$messaging$internal$MessagingProtocol$Type[MessagingProtocol.Type.AFFINITY_NOTIFICATION.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$hekate$messaging$internal$MessagingProtocol$Type[MessagingProtocol.Type.REQUEST.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$hekate$messaging$internal$MessagingProtocol$Type[MessagingProtocol.Type.VOID_REQUEST.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$hekate$messaging$internal$MessagingProtocol$Type[MessagingProtocol.Type.SUBSCRIBE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$hekate$messaging$internal$MessagingProtocol$Type[MessagingProtocol.Type.AFFINITY_REQUEST.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$io$hekate$messaging$internal$MessagingProtocol$Type[MessagingProtocol.Type.AFFINITY_VOID_REQUEST.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$io$hekate$messaging$internal$MessagingProtocol$Type[MessagingProtocol.Type.AFFINITY_SUBSCRIBE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$io$hekate$messaging$internal$MessagingProtocol$Type[MessagingProtocol.Type.FINAL_RESPONSE.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$io$hekate$messaging$internal$MessagingProtocol$Type[MessagingProtocol.Type.RESPONSE_CHUNK.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$io$hekate$messaging$internal$MessagingProtocol$Type[MessagingProtocol.Type.VOID_RESPONSE.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$io$hekate$messaging$internal$MessagingProtocol$Type[MessagingProtocol.Type.ERROR_RESPONSE.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$io$hekate$messaging$internal$MessagingProtocol$Type[MessagingProtocol.Type.CONNECT.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
        }
    }

    public MessagingConnectionIn(NetworkEndpoint<MessagingProtocol> networkEndpoint, MessagingEndpoint<T> messagingEndpoint, MessagingGatewayContext<T> messagingGatewayContext) {
        super(messagingGatewayContext, messagingEndpoint, networkEndpoint);
        this.log = messagingGatewayContext.log();
        this.async = messagingGatewayContext.async();
        this.sendPressure = messagingGatewayContext.sendGuard();
        this.interceptors = messagingGatewayContext.interceptors();
        this.receiver = messagingGatewayContext.receiver();
    }

    public NetworkFuture<MessagingProtocol> disconnect() {
        return network().disconnect();
    }

    public void onConnect() {
        this.receiver.onConnect(endpoint());
    }

    public void onDisconnect() {
        this.receiver.onDisconnect(endpoint());
        gateway().unregister(this);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void replyChunk(MessagingWorker messagingWorker, T t, MessagingProtocol.SubscribeRequest<T> subscribeRequest, SendCallback sendCallback) {
        MessagingProtocol.ResponseChunk responseChunk = new MessagingProtocol.ResponseChunk(subscribeRequest.requestId(), t);
        responseChunk.prepareSend(this, subscribeRequest);
        gateway().interceptors().serverSend(responseChunk);
        Throwable th = null;
        if (this.sendPressure != null) {
            try {
                this.sendPressure.onEnqueue();
            } catch (MessageQueueOverflowException | InterruptedException e) {
                th = e;
            }
        }
        if (th == null) {
            network().send(responseChunk, (messagingProtocol, th2) -> {
                if (this.sendPressure != null) {
                    this.sendPressure.onDequeue();
                }
                if (th2 == null) {
                    notifyResponseSendSuccess(messagingWorker, responseChunk.payload(), sendCallback);
                } else {
                    notifyResponseSendFailure(messagingWorker, responseChunk.payload(), th2, sendCallback);
                }
            });
        } else {
            notifyResponseSendFailure(messagingWorker, responseChunk.payload(), th, sendCallback);
        }
    }

    public void replyFinal(MessagingWorker messagingWorker, T t, MessagingProtocol.RequestForResponseBase<T> requestForResponseBase, SendCallback sendCallback) {
        MessagingProtocol.FinalResponse finalResponse = new MessagingProtocol.FinalResponse(requestForResponseBase.requestId(), t);
        finalResponse.prepareSend(this, requestForResponseBase);
        gateway().interceptors().serverSend(finalResponse);
        if (this.sendPressure != null) {
            this.sendPressure.onEnqueueIgnorePolicy();
        }
        network().send(finalResponse, (messagingProtocol, th) -> {
            if (this.sendPressure != null) {
                this.sendPressure.onDequeue();
            }
            if (th == null) {
                notifyResponseSendSuccess(messagingWorker, finalResponse.payload(), sendCallback);
                return;
            }
            if (th instanceof CodecException) {
                replyError(finalResponse.requestId(), th);
            }
            notifyResponseSendFailure(messagingWorker, finalResponse.payload(), th, sendCallback);
        });
    }

    public void replyVoid(MessagingProtocol.RequestBase<T> requestBase) {
        network().send(new MessagingProtocol.VoidResponse(requestBase.requestId()));
    }

    public void replyError(int i, Throwable th) {
        network().send(new MessagingProtocol.ErrorResponse(i, ErrorUtils.stackTrace(th)));
    }

    public void receive(NetworkMessage<MessagingProtocol> networkMessage, NetworkEndpoint<MessagingProtocol> networkEndpoint) {
        try {
            MessagingProtocol.Type previewType = MessagingProtocolCodec.previewType(networkMessage);
            switch (AnonymousClass1.$SwitchMap$io$hekate$messaging$internal$MessagingProtocol$Type[previewType.ordinal()]) {
                case 1:
                    MessagingWorker pooledWorker = this.async.pooledWorker();
                    if (!pooledWorker.isAsync()) {
                        receiveNotificationSync((MessagingProtocol.Notification) networkMessage.decode().cast());
                        break;
                    } else {
                        long receivedAtNanos = receivedAtNanos(networkMessage);
                        onReceiveAsyncEnqueue(networkEndpoint);
                        networkMessage.handleAsync(pooledWorker, networkMessage2 -> {
                            onReceiveAsyncDequeue();
                            try {
                                receiveNotificationAsync((MessagingProtocol.Notification) ((MessagingProtocol) networkMessage2.decode()).cast(), receivedAtNanos);
                            } catch (Throwable th) {
                                handleReceiveError(networkMessage2, th);
                            }
                        });
                        break;
                    }
                case DefaultFailureDetectorConfig.DEFAULT_FAILURE_DETECTION_QUORUM /* 2 */:
                    int previewAffinity = MessagingProtocolCodec.previewAffinity(networkMessage);
                    long receivedAtNanos2 = receivedAtNanos(networkMessage);
                    MessagingWorker workerFor = this.async.workerFor(previewAffinity);
                    if (!workerFor.isAsync()) {
                        receiveNotificationSync((MessagingProtocol.Notification) networkMessage.decode().cast());
                        break;
                    } else {
                        onReceiveAsyncEnqueue(networkEndpoint);
                        networkMessage.handleAsync(workerFor, networkMessage3 -> {
                            onReceiveAsyncDequeue();
                            try {
                                receiveNotificationAsync((MessagingProtocol.Notification) ((MessagingProtocol) networkMessage3.decode()).cast(), receivedAtNanos2);
                            } catch (Throwable th) {
                                handleReceiveError(networkMessage3, th);
                            }
                        });
                        break;
                    }
                case 3:
                case 4:
                case 5:
                    MessagingWorker pooledWorker2 = this.async.pooledWorker();
                    if (!pooledWorker2.isAsync()) {
                        receiveRequestSync((MessagingProtocol.RequestBase) networkMessage.decode().cast(), pooledWorker2);
                        break;
                    } else {
                        long receivedAtNanos3 = receivedAtNanos(networkMessage);
                        onReceiveAsyncEnqueue(networkEndpoint);
                        networkMessage.handleAsync(pooledWorker2, networkMessage4 -> {
                            onReceiveAsyncDequeue();
                            try {
                                receiveRequestAsync((MessagingProtocol.RequestBase) ((MessagingProtocol) networkMessage4.decode()).cast(), pooledWorker2, receivedAtNanos3);
                            } catch (Throwable th) {
                                handleReceiveError(networkMessage4, th);
                            }
                        });
                        break;
                    }
                case DefaultFailureDetectorConfig.DEFAULT_HEARTBEAT_LOSS_THRESHOLD /* 6 */:
                case 7:
                case 8:
                    MessagingWorker workerFor2 = this.async.workerFor(MessagingProtocolCodec.previewAffinity(networkMessage));
                    if (!workerFor2.isAsync()) {
                        receiveRequestSync((MessagingProtocol.RequestBase) networkMessage.decode().cast(), workerFor2);
                        break;
                    } else {
                        long receivedAtNanos4 = receivedAtNanos(networkMessage);
                        onReceiveAsyncEnqueue(networkEndpoint);
                        networkMessage.handleAsync(workerFor2, networkMessage5 -> {
                            onReceiveAsyncDequeue();
                            try {
                                receiveRequestAsync((MessagingProtocol.RequestBase) ((MessagingProtocol) networkMessage5.decode()).cast(), workerFor2, receivedAtNanos4);
                            } catch (Throwable th) {
                                handleReceiveError(networkMessage5, th);
                            }
                        });
                        break;
                    }
                case 9:
                case 10:
                case 11:
                case 12:
                case 13:
                default:
                    throw new IllegalArgumentException("Unexpected message type: " + previewType);
            }
        } catch (Throwable th) {
            handleReceiveError(networkMessage, th);
        }
    }

    private void receiveRequestSync(MessagingProtocol.RequestBase<T> requestBase, MessagingWorker messagingWorker) {
        receiveRequestAsync(requestBase, messagingWorker, 0L);
    }

    private void receiveRequestAsync(MessagingProtocol.RequestBase<T> requestBase, MessagingWorker messagingWorker, long j) {
        if (isExpired((MessagingProtocol.RequestBase<?>) requestBase, j)) {
            return;
        }
        try {
            requestBase.prepareReceive(messagingWorker, this);
            this.interceptors.serverReceive(requestBase);
            try {
                this.receiver.receive(requestBase);
                this.interceptors.serverReceiveComplete(requestBase);
                if (requestBase.isVoid()) {
                    replyVoid(requestBase);
                }
            } catch (Throwable th) {
                this.interceptors.serverReceiveComplete(requestBase);
                throw th;
            }
        } catch (Error | RuntimeException e) {
            if (this.log.isErrorEnabled()) {
                this.log.error("Got an unexpected runtime error during message processing [from={}, message={}]", new Object[]{requestBase.from(), requestBase, e});
            }
            replyError(requestBase.requestId(), e);
        }
    }

    private void receiveNotificationSync(MessagingProtocol.Notification<T> notification) {
        receiveNotificationAsync(notification, 0L);
    }

    private void receiveNotificationAsync(MessagingProtocol.Notification<T> notification, long j) {
        if (isExpired((MessagingProtocol.Notification<?>) notification, j)) {
            return;
        }
        try {
            notification.prepareReceive(this);
            this.interceptors.serverReceive(notification);
            try {
                this.receiver.receive(notification);
                this.interceptors.serverReceiveComplete(notification);
            } catch (Throwable th) {
                this.interceptors.serverReceiveComplete(notification);
                throw th;
            }
        } catch (Error | RuntimeException e) {
            if (this.log.isErrorEnabled()) {
                this.log.error("Got an unexpected runtime error during message processing [from={}, message={}]", new Object[]{notification.from(), notification, e});
            }
        }
    }

    private void handleReceiveError(NetworkMessage<MessagingProtocol> networkMessage, Throwable th) {
        ClusterAddress remoteAddress = endpoint().remoteAddress();
        if (th instanceof RequestPayloadDecodeException) {
            RequestPayloadDecodeException requestPayloadDecodeException = (RequestPayloadDecodeException) th;
            Throwable cause = requestPayloadDecodeException.getCause();
            if (this.log.isErrorEnabled()) {
                this.log.error("Failed to decode message [from={}]", remoteAddress, cause);
            }
            replyError(requestPayloadDecodeException.requestId(), cause);
            return;
        }
        if (th instanceof NotificationPayloadDecodeException) {
            this.log.error("Failed to decode message [from={}]", remoteAddress, th);
        } else {
            this.log.error("Got an error during message processing [from={}, message={}]", new Object[]{remoteAddress, networkMessage, th});
            disconnect();
        }
    }

    private void notifyResponseSendSuccess(MessagingWorker messagingWorker, T t, SendCallback sendCallback) {
        if (sendCallback != null) {
            if (messagingWorker.isAsync()) {
                messagingWorker.execute(() -> {
                    doNotifyOnResponseSendSuccess(t, sendCallback);
                });
            } else {
                doNotifyOnResponseSendSuccess(t, sendCallback);
            }
        }
    }

    private void doNotifyOnResponseSendSuccess(T t, SendCallback sendCallback) {
        try {
            sendCallback.onComplete(null);
        } catch (Error | RuntimeException e) {
            this.log.error("Failed to notify on successful send of a response message [to={}, message={}]", new Object[]{remoteAddress(), t, e});
        }
    }

    private void notifyResponseSendFailure(MessagingWorker messagingWorker, T t, Throwable th, SendCallback sendCallback) {
        if ((th instanceof CodecException) && this.log.isErrorEnabled()) {
            this.log.error("Failed to send message [to={}, message={}]", new Object[]{remoteAddress(), t, th});
        }
        if (sendCallback != null) {
            if (messagingWorker.isAsync()) {
                messagingWorker.execute(() -> {
                    doNotifyOnResponseSendFailure(t, th, sendCallback);
                });
            } else {
                doNotifyOnResponseSendFailure(t, th, sendCallback);
            }
        }
    }

    private void doNotifyOnResponseSendFailure(T t, Throwable th, SendCallback sendCallback) {
        try {
            sendCallback.onComplete(wrapError(th));
        } catch (Error | RuntimeException e) {
            if (this.log.isErrorEnabled()) {
                this.log.error("Failed to notify on response sending failure [to={}, failure={}, message={}]", new Object[]{remoteAddress(), th.toString(), t, e});
            }
        }
    }

    private static boolean isExpired(MessagingProtocol.RequestBase<?> requestBase, long j) {
        return j > 0 && System.nanoTime() - j >= TimeUnit.MILLISECONDS.toNanos(requestBase.timeout());
    }

    private static boolean isExpired(MessagingProtocol.Notification<?> notification, long j) {
        return j > 0 && System.nanoTime() - j >= TimeUnit.MILLISECONDS.toNanos(notification.timeout());
    }

    private static long receivedAtNanos(NetworkMessage<MessagingProtocol> networkMessage) throws IOException {
        if (MessagingProtocolCodec.previewHasTimeout(networkMessage)) {
            return System.nanoTime();
        }
        return 0L;
    }
}
