package io.hekate.messaging.internal;

import io.hekate.cluster.ClusterNode;
import io.hekate.core.internal.util.ArgAssert;
import io.hekate.messaging.loadbalance.UnknownRouteException;
import io.hekate.messaging.operation.Aggregate;
import io.hekate.messaging.operation.AggregateFuture;
import io.hekate.messaging.operation.AggregateRepeatCondition;
import io.hekate.messaging.operation.AggregateRetryConfigurer;
import io.hekate.messaging.operation.AggregateRetryPolicy;
import io.hekate.messaging.retry.RetryBackoffPolicy;
import io.hekate.messaging.retry.RetryCallback;
import io.hekate.messaging.retry.RetryCondition;
import io.hekate.messaging.retry.RetryErrorPredicate;
import io.hekate.messaging.retry.RetryResponsePredicate;
import java.util.List;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/hekate/messaging/internal/AggregateOperationBuilder.class */
class AggregateOperationBuilder<T> extends MessageOperationBuilder<T> implements Aggregate<T>, AggregateRetryPolicy<T> {
    private Object affinity;
    private long timeout;
    private RetryErrorPredicate retryErr;
    private RetryResponsePredicate<T> retryResp;
    private RetryCondition retryCondition;
    private RetryBackoffPolicy retryBackoff;
    private RetryCallback retryCallback;
    private int maxAttempts;
    private AggregateRepeatCondition<T> repeat;

    public AggregateOperationBuilder(T t, MessagingGatewayContext<T> messagingGatewayContext, MessageOperationOpts<T> messageOperationOpts) {
        super(t, messagingGatewayContext, messageOperationOpts);
        this.timeout = messagingGatewayContext.messagingTimeout();
    }

    @Override // io.hekate.messaging.operation.Aggregate
    public Aggregate<T> withAffinity(Object obj) {
        this.affinity = obj;
        return this;
    }

    @Override // io.hekate.messaging.operation.Aggregate
    public Aggregate<T> withTimeout(long j, TimeUnit timeUnit) {
        this.timeout = timeUnit.toMillis(j);
        return this;
    }

    @Override // io.hekate.messaging.operation.Aggregate
    public Aggregate<T> withRetry(AggregateRetryConfigurer<T> aggregateRetryConfigurer) {
        ArgAssert.notNull(aggregateRetryConfigurer, "Retry policy");
        this.retryErr = RetryErrorPredicate.acceptAll();
        aggregateRetryConfigurer.configure(this);
        return this;
    }

    @Override // io.hekate.messaging.operation.Aggregate
    public Aggregate<T> withRepeat(AggregateRepeatCondition<T> aggregateRepeatCondition) {
        ArgAssert.notNull(aggregateRepeatCondition, "Repeat condition");
        this.repeat = aggregateRepeatCondition;
        return this;
    }

    @Override // io.hekate.messaging.operation.Aggregate
    public AggregateFuture<T> submit() {
        AggregateFuture<T> aggregateFuture = new AggregateFuture<>();
        doSubmit(message(), this.affinity, this.timeout, this.maxAttempts, this.retryErr, this.retryResp, this.retryCondition, this.retryBackoff, this.retryCallback, this.repeat, gateway(), opts(), aggregateFuture);
        return aggregateFuture;
    }

    @Override // io.hekate.messaging.retry.RetryPolicy
    public AggregateRetryPolicy<T> withBackoff(RetryBackoffPolicy retryBackoffPolicy) {
        ArgAssert.notNull(retryBackoffPolicy, "Backoff policy");
        this.retryBackoff = retryBackoffPolicy;
        return this;
    }

    @Override // io.hekate.messaging.retry.RetryPolicy
    public AggregateRetryPolicy<T> whileTrue(RetryCondition retryCondition) {
        this.retryCondition = retryCondition;
        return this;
    }

    @Override // io.hekate.messaging.retry.RetryPolicy
    public AggregateRetryPolicy<T> whileError(RetryErrorPredicate retryErrorPredicate) {
        this.retryErr = retryErrorPredicate;
        return this;
    }

    @Override // io.hekate.messaging.retry.RetryResponseSupport
    public AggregateRetryPolicy<T> whileResponse(RetryResponsePredicate<T> retryResponsePredicate) {
        this.retryResp = retryResponsePredicate;
        return this;
    }

    @Override // io.hekate.messaging.retry.RetryPolicy
    public AggregateRetryPolicy<T> onRetry(RetryCallback retryCallback) {
        this.retryCallback = retryCallback;
        return this;
    }

    @Override // io.hekate.messaging.retry.RetryPolicy
    public AggregateRetryPolicy<T> maxAttempts(int i) {
        this.maxAttempts = i;
        return this;
    }

    private static <T> void doSubmit(T t, Object obj, long j, int i, RetryErrorPredicate retryErrorPredicate, RetryResponsePredicate<T> retryResponsePredicate, RetryCondition retryCondition, RetryBackoffPolicy retryBackoffPolicy, RetryCallback retryCallback, AggregateRepeatCondition<T> aggregateRepeatCondition, MessagingGatewayContext<T> messagingGatewayContext, MessageOperationOpts<T> messageOperationOpts, AggregateFuture<T> aggregateFuture) {
        List<ClusterNode> nodesForBroadcast = BroadcastOperationBuilder.nodesForBroadcast(obj, messageOperationOpts);
        if (nodesForBroadcast.isEmpty()) {
            aggregateFuture.complete(new EmptyAggregateResult(t));
        } else {
            AggregateContext aggregateContext = new AggregateContext(t, nodesForBroadcast, aggregateFuture);
            nodesForBroadcast.forEach(clusterNode -> {
                AggregateOperation aggregateOperation = new AggregateOperation(t, obj, j, i, retryErrorPredicate, retryResponsePredicate, retryCondition, retryBackoffPolicy, retryCallback, messagingGatewayContext, messageOperationOpts, clusterNode);
                messagingGatewayContext.submit(aggregateOperation);
                aggregateOperation.future().whenComplete((response, th) -> {
                    if (th == null ? aggregateContext.onReplySuccess(clusterNode, response) : th instanceof UnknownRouteException ? aggregateContext.forgetNode(clusterNode) : aggregateContext.onReplyFailure(clusterNode, th)) {
                        if (aggregateRepeatCondition == null || messagingGatewayContext.isClosed() || aggregateContext.isTimedOut() || !aggregateRepeatCondition.shouldRepeat(aggregateContext)) {
                            aggregateContext.complete();
                        } else {
                            doSubmit(t, obj, j, i, retryErrorPredicate, retryResponsePredicate, retryCondition, retryBackoffPolicy, retryCallback, aggregateRepeatCondition, messagingGatewayContext, messageOperationOpts, aggregateFuture);
                        }
                    }
                });
            });
        }
    }
}
