package ch.streamly.chronicle.flux.replay;

import ch.streamly.domain.ReplayValue;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.LongStream;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

/* loaded from: input_file:ch/streamly/chronicle/flux/replay/ReplayInLoop.class */
public class ReplayInLoop<T> implements Function<Flux<T>, Publisher<ReplayValue<T>>> {
    private final Duration delayBeforeRestart;

    public ReplayInLoop(Duration duration) {
        this.delayBeforeRestart = duration;
    }

    @Override // java.util.function.Function
    public Publisher<ReplayValue<T>> apply(Flux<T> flux) {
        return Flux.concat(Flux.create(fluxSink -> {
            fluxSink.onRequest(j -> {
                LongStream.range(0L, j).forEach(j -> {
                    fluxSink.next(wrapValues(flux));
                });
            });
        }).limitRate(1));
    }

    private Flux<ReplayValue<T>> wrapValues(Flux<T> flux) {
        return flux.delaySubscription(this.delayBeforeRestart).map(wrapAsReplayValue(new AtomicBoolean(false)));
    }

    private Function<T, ReplayValue<T>> wrapAsReplayValue(AtomicBoolean atomicBoolean) {
        return obj -> {
            return !atomicBoolean.getAndSet(true) ? ReplayValue.newLoopRestartValue(obj) : ReplayValue.newValue(obj);
        };
    }
}
