package ch.streamly.chronicle.flux;

import ch.streamly.chronicle.flux.replay.ReplayFlux;
import java.io.File;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import net.openhft.chronicle.bytes.BytesIn;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.WireStore;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:ch/streamly/chronicle/flux/AbstractChronicleStore.class */
public abstract class AbstractChronicleStore<I, O> implements FluxStore<I, O> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChronicleStore.class);
    protected final Function<byte[], I> deserializer;
    private final Function<I, byte[]> serializer;
    private final SingleChronicleQueue queue;
    private final RollCycle rollCycle;

    /* loaded from: input_file:ch/streamly/chronicle/flux/AbstractChronicleStore$AbstractChronicleStoreBuilder.class */
    public static abstract class AbstractChronicleStoreBuilder<B extends AbstractChronicleStoreBuilder<B, R, T>, R extends AbstractChronicleStore, T> {
        private String path;
        private Function<T, byte[]> serializer;
        private Function<byte[], T> deserializer;
        private RollCycle rollCycle = RollCycles.DAILY;

        public B path(String str) {
            this.path = str;
            return getThis();
        }

        protected abstract B getThis();

        public B serializer(Function<T, byte[]> function) {
            this.serializer = function;
            return getThis();
        }

        public B deserializer(Function<byte[], T> function) {
            this.deserializer = function;
            return getThis();
        }

        public B rollCycle(RollCycle rollCycle) {
            this.rollCycle = rollCycle;
            return getThis();
        }

        public abstract R build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ch/streamly/chronicle/flux/AbstractChronicleStore$ReaderType.class */
    public enum ReaderType {
        ALL,
        ONLY_HISTORY
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <S extends AbstractChronicleStore<I, O>, B extends AbstractChronicleStoreBuilder<B, S, I>> AbstractChronicleStore(AbstractChronicleStoreBuilder<B, S, I> abstractChronicleStoreBuilder) {
        this.serializer = ((AbstractChronicleStoreBuilder) abstractChronicleStoreBuilder).serializer;
        this.deserializer = ((AbstractChronicleStoreBuilder) abstractChronicleStoreBuilder).deserializer;
        this.rollCycle = ((AbstractChronicleStoreBuilder) abstractChronicleStoreBuilder).rollCycle;
        this.queue = createQueue(((AbstractChronicleStoreBuilder) abstractChronicleStoreBuilder).path);
    }

    SingleChronicleQueue createQueue(String str) {
        return SingleChronicleQueueBuilder.binary(str).rollCycle(this.rollCycle).build();
    }

    void close() {
        this.queue.close();
    }

    @Override // ch.streamly.chronicle.flux.FluxStore
    public Disposable store(Publisher<I> publisher) {
        ExcerptAppender acquireAppender = this.queue.acquireAppender();
        return Flux.from(publisher).doOnError(th -> {
            LOGGER.error("Error received", th);
        }).subscribe(obj -> {
            storeValue(acquireAppender, obj);
        });
    }

    private void storeValue(ExcerptAppender excerptAppender, I i) {
        byte[] serializeValue = serializeValue(i);
        excerptAppender.writeBytes(bytesOut -> {
            bytesOut.writeInt(serializeValue.length).write(serializeValue);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public byte[] serializeValue(I i) {
        return this.serializer.apply(i);
    }

    @Override // ch.streamly.chronicle.flux.FluxStore
    public void store(I i) {
        storeValue(this.queue.acquireAppender(), i);
    }

    @Override // ch.streamly.chronicle.flux.FluxStore
    public Flux<O> retrieveAll(boolean z) {
        return Flux.create(fluxSink -> {
            launchTailer(fluxSink, ReaderType.ALL, z);
        });
    }

    private void launchTailer(FluxSink<O> fluxSink, ReaderType readerType, boolean z) {
        launchTailer(fluxSink, this.queue.createTailer(), readerType, z);
    }

    private void launchTailer(FluxSink<O> fluxSink, ExcerptTailer excerptTailer, ReaderType readerType, boolean z) {
        Thread thread = new Thread(() -> {
            readTailer(excerptTailer, fluxSink, readerType, z);
        }, "ChronicleStoreRetrieve_" + excerptTailer.queue().file().getAbsolutePath());
        thread.setDaemon(true);
        thread.start();
    }

    private void readTailer(ExcerptTailer excerptTailer, FluxSink<O> fluxSink, ReaderType readerType, boolean z) {
        int i = 0;
        while (!fluxSink.isCancelled()) {
            try {
                if (fluxSink.requestedFromDownstream() <= 0) {
                    waitMillis(100L);
                } else if (!excerptTailer.readBytes(bytesIn -> {
                    fluxSink.next(deserializeValue(bytesIn));
                })) {
                    if (readerType == ReaderType.ONLY_HISTORY) {
                        fluxSink.complete();
                    } else {
                        waitMillis(10L);
                    }
                }
                int cycle = this.rollCycle.toCycle(excerptTailer.index());
                if (cycle != i) {
                    if (z) {
                        deleteFile(i);
                    }
                    i = cycle;
                }
            } catch (Exception e) {
                LOGGER.error("Error while tailing on queue {}", excerptTailer.queue().file().getAbsolutePath(), e);
                return;
            }
        }
    }

    protected abstract O deserializeValue(BytesIn bytesIn);

    private void waitMillis(long j) {
        try {
            TimeUnit.MILLISECONDS.sleep(j);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void deleteFile(int i) {
        WireStore storeForCycle = this.queue.storeForCycle(i, 0L, false);
        if (storeForCycle == null) {
            LOGGER.trace("wirestore is null for cycle {}", Integer.valueOf(i));
            return;
        }
        File file = storeForCycle.file();
        if (file != null) {
            deleteWireStore(file);
        } else {
            LOGGER.error("Could not find file for cycle {}", Integer.valueOf(i));
        }
    }

    private void deleteWireStore(File file) {
        try {
            logDeletionResult(file, file.delete());
        } catch (Exception e) {
            LOGGER.error("Could not delete file {}", file.getAbsolutePath(), e);
        }
    }

    private void logDeletionResult(File file, boolean z) {
        if (z) {
            LOGGER.trace("file {} deleted after read", file.getAbsolutePath());
        } else {
            LOGGER.error("Could not delete file {}", file.getAbsolutePath());
        }
    }

    @Override // ch.streamly.chronicle.flux.FluxStore
    public Flux<O> retrieveHistory() {
        return Flux.create(fluxSink -> {
            launchTailer(fluxSink, ReaderType.ONLY_HISTORY, false);
        });
    }

    @Override // ch.streamly.chronicle.flux.FluxStore
    public Flux<O> retrieveNewValues() {
        ExcerptTailer end = this.queue.createTailer().toEnd();
        return Flux.create(fluxSink -> {
            launchTailer(fluxSink, end, ReaderType.ALL, false);
        });
    }

    @Override // ch.streamly.chronicle.flux.FluxStore
    public ReplayFlux<O> replayHistory(Function<O, Long> function) {
        return new ReplayFlux<>(Flux.defer(this::retrieveHistory), function);
    }
}
