package io.debezium.server.pravega;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TransactionalEventStreamWriter;
import io.pravega.client.stream.TxnFailedException;
import io.pravega.client.stream.impl.ByteArraySerializer;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.inject.Named;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Dependent
@Named("pravega")
/* loaded from: input_file:io/debezium/server/pravega/PravegaChangeConsumer.class */
public class PravegaChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PravegaChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.pravega.";
    private static final String PROP_CONTROLLER = "debezium.sink.pravega.controller.uri";
    private static final String PROP_SCOPE = "debezium.sink.pravega.scope";
    private static final String PROP_TXN = "debezium.sink.pravega.transaction";

    @ConfigProperty(name = PROP_CONTROLLER, defaultValue = "tcp://localhost:9090")
    URI controllerUri;
    private String scope;

    @ConfigProperty(name = PROP_TXN, defaultValue = "false")
    boolean txn;
    private ClientConfig clientConfig;
    private EventStreamClientFactory factory;
    private EventWriterConfig writerConfig;

    /* loaded from: input_file:io/debezium/server/pravega/PravegaChangeConsumer$PravegaSinkImpl.class */
    class PravegaSinkImpl implements PravegaSink {
        private final Map<String, EventStreamWriter<byte[]>> writers = new HashMap();

        PravegaSinkImpl() {
        }

        public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
            for (ChangeEvent<Object, Object> changeEvent : list) {
                EventStreamWriter<byte[]> computeIfAbsent = this.writers.computeIfAbsent(PravegaChangeConsumer.this.streamNameMapper.map(changeEvent.destination()), str -> {
                    return createWriter(str);
                });
                if (changeEvent.key() != null) {
                    computeIfAbsent.writeEvent(PravegaChangeConsumer.this.getString(changeEvent.key()), PravegaChangeConsumer.this.getBytes(changeEvent.value()));
                } else {
                    computeIfAbsent.writeEvent(PravegaChangeConsumer.this.getBytes(changeEvent.value()));
                }
                recordCommitter.markProcessed(changeEvent);
            }
            recordCommitter.markBatchFinished();
        }

        private EventStreamWriter<byte[]> createWriter(String str) {
            PravegaChangeConsumer.LOGGER.debug("Creating writer for stream {}", str);
            return PravegaChangeConsumer.this.factory.createEventWriter(str, new ByteArraySerializer(), PravegaChangeConsumer.this.writerConfig);
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            PravegaChangeConsumer.LOGGER.debug("Closing {} writer(s)", Integer.valueOf(this.writers.size()));
            this.writers.values().forEach((v0) -> {
                v0.close();
            });
        }
    }

    /* loaded from: input_file:io/debezium/server/pravega/PravegaChangeConsumer$PravegaTxnSinkImpl.class */
    class PravegaTxnSinkImpl implements PravegaSink {
        private final Map<String, TransactionalEventStreamWriter<byte[]>> writers = new HashMap();
        private final Map<String, Transaction<byte[]>> txns = new HashMap();

        PravegaTxnSinkImpl() {
        }

        public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
            for (ChangeEvent<Object, Object> changeEvent : list) {
                Transaction<byte[]> computeIfAbsent = this.txns.computeIfAbsent(PravegaChangeConsumer.this.streamNameMapper.map(changeEvent.destination()), str -> {
                    return createTxn(str);
                });
                try {
                    if (changeEvent.key() != null) {
                        computeIfAbsent.writeEvent(PravegaChangeConsumer.this.getString(changeEvent.key()), PravegaChangeConsumer.this.getBytes(changeEvent.value()));
                    } else {
                        computeIfAbsent.writeEvent(PravegaChangeConsumer.this.getBytes(changeEvent.value()));
                    }
                    recordCommitter.markProcessed(changeEvent);
                } catch (TxnFailedException e) {
                    throw new RuntimeException((Throwable) e);
                }
            }
            this.txns.values().forEach(transaction -> {
                try {
                    transaction.commit();
                } catch (TxnFailedException e2) {
                    throw new RuntimeException((Throwable) e2);
                }
            });
            recordCommitter.markBatchFinished();
            this.txns.clear();
        }

        private Transaction<byte[]> createTxn(String str) {
            TransactionalEventStreamWriter<byte[]> computeIfAbsent = this.writers.computeIfAbsent(str, str2 -> {
                return createWriter(str2);
            });
            PravegaChangeConsumer.LOGGER.debug("Creating transaction for stream {}", str);
            return computeIfAbsent.beginTxn();
        }

        private TransactionalEventStreamWriter<byte[]> createWriter(String str) {
            PravegaChangeConsumer.LOGGER.debug("Creating writer for stream {}", str);
            return PravegaChangeConsumer.this.factory.createTransactionalEventWriter(str, new ByteArraySerializer(), PravegaChangeConsumer.this.writerConfig);
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            PravegaChangeConsumer.LOGGER.debug("Closing {} writer(s)", Integer.valueOf(this.writers.size()));
            this.writers.values().forEach((v0) -> {
                v0.close();
            });
        }
    }

    @PostConstruct
    void constructor() {
        this.scope = (String) ConfigProvider.getConfig().getValue(PROP_SCOPE, String.class);
        this.clientConfig = ClientConfig.builder().controllerURI(this.controllerUri).build();
        LOGGER.debug("Creating client factory for scope {} with controller {}", this.scope, this.controllerUri);
        this.factory = EventStreamClientFactory.withScope(this.scope, this.clientConfig);
        this.writerConfig = EventWriterConfig.builder().build();
    }

    @PreDestroy
    void destructor() {
        LOGGER.debug("Closing client factory");
        this.factory.close();
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        try {
            PravegaSink pravegaTxnSinkImpl = this.txn ? new PravegaTxnSinkImpl() : new PravegaSinkImpl();
            try {
                pravegaTxnSinkImpl.handleBatch(list, recordCommitter);
                if (pravegaTxnSinkImpl != null) {
                    pravegaTxnSinkImpl.close();
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
