package io.buybrain.hamq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import io.buybrain.util.Result;
import io.buybrain.util.function.ThrowingConsumer;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/buybrain/hamq/Channel.class */
public class Channel {

    @NonNull
    private final Connection connection;

    @NonNull
    private final Retryer retryer;
    private BackendChannel channel;
    private PrefetchSpec prefetchSpec;
    private static final Logger log = LoggerFactory.getLogger(Channel.class);
    private static final AtomicInteger tagCounter = new AtomicInteger();
    private final List<ExchangeSpec> exchanges = new ArrayList();
    private final List<QueueSpec> queues = new ArrayList();
    private final List<BindSpec> binds = new ArrayList();
    private final Map<String, ConsumeSpec> consumers = new HashMap();
    private final Lock getChannelLock = new ReentrantLock();

    public synchronized void exchangeDeclare(@NonNull ExchangeSpec exchangeSpec) {
        if (exchangeSpec == null) {
            throw new NullPointerException("spec");
        }
        doExchangeDeclare(exchangeSpec);
        this.exchanges.add(exchangeSpec);
    }

    private void doExchangeDeclare(@NonNull ExchangeSpec exchangeSpec) {
        if (exchangeSpec == null) {
            throw new NullPointerException("spec");
        }
        perform(backendChannel -> {
            backendChannel.exchangeDeclare(exchangeSpec.getName(), exchangeSpec.getType(), exchangeSpec.isDurable(), exchangeSpec.isAutoDelete(), exchangeSpec.isInternal(), exchangeSpec.getArgs());
        }, exchangeSpec);
    }

    public synchronized void queueDeclare(@NonNull QueueSpec queueSpec) {
        if (queueSpec == null) {
            throw new NullPointerException("spec");
        }
        doQueueDeclare(queueSpec);
        this.queues.add(queueSpec);
    }

    private void doQueueDeclare(@NonNull QueueSpec queueSpec) {
        if (queueSpec == null) {
            throw new NullPointerException("spec");
        }
        perform(backendChannel -> {
            backendChannel.queueDeclare(queueSpec.getName(), queueSpec.isDurable(), queueSpec.isExclusive(), queueSpec.isAutoDelete(), queueSpec.getArgs());
        }, queueSpec);
    }

    public synchronized void queueBind(@NonNull BindSpec bindSpec) {
        if (bindSpec == null) {
            throw new NullPointerException("spec");
        }
        doQueueBind(bindSpec);
        this.binds.add(bindSpec);
    }

    private void doQueueBind(@NonNull BindSpec bindSpec) {
        if (bindSpec == null) {
            throw new NullPointerException("spec");
        }
        perform(backendChannel -> {
            backendChannel.queueBind(bindSpec.getQueue(), bindSpec.getExchange(), bindSpec.getRoutingKey(), bindSpec.getArgs());
        }, bindSpec);
    }

    public void prefetch(@NonNull PrefetchSpec prefetchSpec) {
        if (prefetchSpec == null) {
            throw new NullPointerException("spec");
        }
        doPrefetch(prefetchSpec);
        this.prefetchSpec = prefetchSpec;
    }

    private void doPrefetch(PrefetchSpec prefetchSpec) {
        perform(backendChannel -> {
            backendChannel.basicQos(prefetchSpec.getAmount());
        }, prefetchSpec);
    }

    public void publish(@NonNull PublishSpec publishSpec) {
        if (publishSpec == null) {
            throw new NullPointerException("spec");
        }
        AMQP.BasicProperties.Builder deliveryMode = new AMQP.BasicProperties.Builder().deliveryMode(Integer.valueOf(publishSpec.isDurable() ? 2 : 1));
        if (!publishSpec.getHeaders().isEmpty()) {
            deliveryMode.headers(publishSpec.getHeaders());
        }
        perform(backendChannel -> {
            backendChannel.basicPublish(publishSpec.getExchange(), publishSpec.getRoutingKey(), publishSpec.isMandatory(), deliveryMode.build(), publishSpec.getBody());
        }, publishSpec);
    }

    public void consume(@NonNull ConsumeSpec consumeSpec) {
        if (consumeSpec == null) {
            throw new NullPointerException("spec");
        }
        String str = "consumer-" + tagCounter.getAndIncrement();
        doConsume(str, consumeSpec);
        this.consumers.put(str, consumeSpec);
    }

    private void doConsume(@NonNull String str, @NonNull ConsumeSpec consumeSpec) {
        if (str == null) {
            throw new NullPointerException("consumerTag");
        }
        if (consumeSpec == null) {
            throw new NullPointerException("spec");
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        perform(backendChannel -> {
            backendChannel.basicConsume(consumeSpec.getQueue(), str, consumeSpec.isNoLocal(), consumeSpec.isExclusive(), consumeSpec.getArgs(), new Consumer() { // from class: io.buybrain.hamq.Channel.1
                public void handleConsumeOk(String str2) {
                }

                public void handleCancelOk(String str2) {
                }

                public void handleCancel(String str2) throws IOException {
                }

                public void handleShutdownSignal(String str2, ShutdownSignalException shutdownSignalException) {
                    atomicBoolean.set(true);
                    Channel.this.reset(backendChannel);
                }

                public void handleRecoverOk(String str2) {
                }

                public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
                    if (atomicBoolean.get()) {
                        throw new RuntimeException("Consumer is closed");
                    }
                    ConsumeSpec consumeSpec2 = consumeSpec;
                    BackendChannel backendChannel = backendChannel;
                    Result trying = Result.trying(() -> {
                        consumeSpec2.getCallback().accept(new Delivery(backendChannel, envelope, basicProperties, bArr));
                    });
                    BackendChannel backendChannel2 = backendChannel;
                    AtomicBoolean atomicBoolean2 = atomicBoolean;
                    trying.orElse(th -> {
                        Result.trying(() -> {
                            backendChannel2.basicCancel(str2);
                        });
                        Channel.log.warn("Error while (n)acking delivery, will retry consuming", th);
                        atomicBoolean2.set(true);
                        Channel.this.reset(backendChannel2);
                    });
                }
            });
        }, consumeSpec);
    }

    private void perform(ThrowingConsumer<BackendChannel> throwingConsumer, OperationSpec operationSpec) {
        this.retryer.performWithRetry(() -> {
            throwingConsumer.accept(activeChannel());
        }, getRetryPolicy(operationSpec).withErrorHandler(th -> {
            if (Retryer.shouldReconnectToRecover(th)) {
                reset(null);
            }
        }));
    }

    private RetryPolicy getRetryPolicy(OperationSpec operationSpec) {
        return operationSpec.getRetryPolicy() != null ? operationSpec.getRetryPolicy() : this.connection.getRetryPolicy();
    }

    private BackendChannel activeChannel() {
        this.getChannelLock.lock();
        if (this.channel == null) {
            this.channel = (BackendChannel) this.retryer.performWithRetry(() -> {
                return this.connection.activeConnection().newChannel();
            }, new RetryPolicy().withRetryAll(true));
        }
        this.getChannelLock.unlock();
        return this.channel;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reset(BackendChannel backendChannel) {
        if (this.channel == null || backendChannel == null || this.channel == backendChannel) {
            if (this.channel != null) {
                this.consumers.keySet().forEach(str -> {
                    Result.trying(() -> {
                        this.channel.basicCancel(str);
                    });
                });
                BackendChannel backendChannel2 = this.channel;
                backendChannel2.getClass();
                Result.trying(backendChannel2::close);
            }
            this.connection.reset();
            this.channel = null;
            this.exchanges.forEach(this::doExchangeDeclare);
            this.queues.forEach(this::doQueueDeclare);
            this.binds.forEach(this::doQueueBind);
            if (this.prefetchSpec != null) {
                doPrefetch(this.prefetchSpec);
            }
            this.consumers.forEach(this::doConsume);
        }
    }

    @ConstructorProperties({"connection", "retryer"})
    public Channel(@NonNull Connection connection, @NonNull Retryer retryer) {
        if (connection == null) {
            throw new NullPointerException("connection");
        }
        if (retryer == null) {
            throw new NullPointerException("retryer");
        }
        this.connection = connection;
        this.retryer = retryer;
    }
}
