package io.druid.indexing.common.task;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;

/* loaded from: input_file:io/druid/indexing/common/task/MergeTaskBase.class */
public abstract class MergeTaskBase extends AbstractFixedIntervalTask {

    @JsonIgnore
    private final List<DataSegment> segments;
    private static final EmittingLogger log = new EmittingLogger(MergeTaskBase.class);

    /* JADX INFO: Access modifiers changed from: protected */
    public MergeTaskBase(String str, final String str2, List<DataSegment> list, Map<String, Object> map) {
        super(str != null ? str : String.format("merge_%s_%s", computeProcessingID(str2, list), new DateTime().toString()), str2, computeMergedInterval(list), map);
        Preconditions.checkArgument(list.size() > 0, "segments nonempty");
        Preconditions.checkArgument(Iterables.size(Iterables.filter(list, new Predicate<DataSegment>() { // from class: io.druid.indexing.common.task.MergeTaskBase.1
            public boolean apply(@Nullable DataSegment dataSegment) {
                return dataSegment == null || !dataSegment.getDataSource().equalsIgnoreCase(str2);
            }
        })) == 0, "segments in the wrong datasource");
        Preconditions.checkArgument(Iterables.size(Iterables.filter(list, new Predicate<DataSegment>() { // from class: io.druid.indexing.common.task.MergeTaskBase.2
            public boolean apply(@Nullable DataSegment dataSegment) {
                return dataSegment == null || !(dataSegment.getShardSpec() instanceof NoneShardSpec);
            }
        })) == 0, "segments without NoneShardSpec");
        this.segments = list;
    }

    @Override // io.druid.indexing.common.task.Task
    public TaskStatus run(TaskToolbox taskToolbox) throws Exception {
        TaskLock taskLock = (TaskLock) Iterables.getOnlyElement(getTaskLocks(taskToolbox));
        ServiceEmitter emitter = taskToolbox.getEmitter();
        ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
        DataSegment computeMergedSegment = computeMergedSegment(getDataSource(), taskLock.getVersion(), this.segments);
        File taskWorkDir = taskToolbox.getTaskWorkDir();
        try {
            long currentTimeMillis = System.currentTimeMillis();
            log.info("Starting merge of id[%s], segments: %s", new Object[]{getId(), Lists.transform(this.segments, new Function<DataSegment, String>() { // from class: io.druid.indexing.common.task.MergeTaskBase.3
                public String apply(DataSegment dataSegment) {
                    return dataSegment.getIdentifier();
                }
            })});
            File merge = merge(taskToolbox, taskToolbox.fetchSegments(this.segments), new File(taskWorkDir, "merged"));
            emitter.emit(builder.build("merger/numMerged", Integer.valueOf(this.segments.size())));
            emitter.emit(builder.build("merger/mergeTime", Long.valueOf(System.currentTimeMillis() - currentTimeMillis)));
            log.info("[%s] : Merged %d segments in %,d millis", new Object[]{computeMergedSegment.getDataSource(), Integer.valueOf(this.segments.size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
            long currentTimeMillis2 = System.currentTimeMillis();
            DataSegment push = taskToolbox.getSegmentPusher().push(merge, computeMergedSegment);
            emitter.emit(builder.build("merger/uploadTime", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2)));
            emitter.emit(builder.build("merger/mergeSize", Long.valueOf(push.getSize())));
            taskToolbox.publishSegments(ImmutableList.of(push));
            return TaskStatus.success(getId());
        } catch (Exception e) {
            log.makeAlert(e, "Exception merging[%s]", new Object[]{computeMergedSegment.getDataSource()}).addData("interval", computeMergedSegment.getInterval()).emit();
            return TaskStatus.failure(getId());
        }
    }

    @Override // io.druid.indexing.common.task.AbstractFixedIntervalTask, io.druid.indexing.common.task.Task
    public boolean isReady(TaskActionClient taskActionClient) throws Exception {
        if (!super.isReady(taskActionClient)) {
            return false;
        }
        Function<DataSegment, String> function = new Function<DataSegment, String>() { // from class: io.druid.indexing.common.task.MergeTaskBase.4
            public String apply(DataSegment dataSegment) {
                return dataSegment.getIdentifier();
            }
        };
        ImmutableSet copyOf = ImmutableSet.copyOf(Iterables.transform((Iterable) taskActionClient.submit(new SegmentListUsedAction(getDataSource(), getInterval(), null)), function));
        ImmutableSet copyOf2 = ImmutableSet.copyOf(Iterables.transform(this.segments, function));
        Sets.SetView difference = Sets.difference(copyOf, copyOf2);
        if (!difference.isEmpty()) {
            throw new ISE("Merge is invalid: current segment(s) are not in the requested set: %s", new Object[]{Joiner.on(", ").join(difference)});
        }
        Sets.SetView difference2 = Sets.difference(copyOf2, copyOf);
        if (difference2.isEmpty()) {
            return true;
        }
        throw new ISE("Merge is invalid: requested segment(s) are not in the current set: %s", new Object[]{Joiner.on(", ").join(difference2)});
    }

    protected abstract File merge(TaskToolbox taskToolbox, Map<DataSegment, File> map, File file) throws Exception;

    @JsonProperty
    public List<DataSegment> getSegments() {
        return this.segments;
    }

    @Override // io.druid.indexing.common.task.AbstractTask
    public String toString() {
        return Objects.toStringHelper(this).add("id", getId()).add("dataSource", getDataSource()).add("interval", getInterval()).add("segments", this.segments).toString();
    }

    private static String computeProcessingID(String str, List<DataSegment> list) {
        return String.format("%s_%s", str, Hashing.sha1().hashString(Joiner.on("_").join(Iterables.transform(Ordering.natural().sortedCopy(list), new Function<DataSegment, String>() { // from class: io.druid.indexing.common.task.MergeTaskBase.5
            public String apply(DataSegment dataSegment) {
                return String.format("%s_%s_%s_%s", dataSegment.getInterval().getStart(), dataSegment.getInterval().getEnd(), dataSegment.getVersion(), Integer.valueOf(dataSegment.getShardSpec().getPartitionNum()));
            }
        })), Charsets.UTF_8).toString());
    }

    private static Interval computeMergedInterval(List<DataSegment> list) {
        Preconditions.checkArgument(list.size() > 0, "segments.size() > 0");
        ReadableInstant readableInstant = null;
        ReadableInstant readableInstant2 = null;
        for (DataSegment dataSegment : list) {
            if (readableInstant == null || dataSegment.getInterval().getStart().isBefore(readableInstant)) {
                readableInstant = dataSegment.getInterval().getStart();
            }
            if (readableInstant2 == null || dataSegment.getInterval().getEnd().isAfter(readableInstant2)) {
                readableInstant2 = dataSegment.getInterval().getEnd();
            }
        }
        return new Interval(readableInstant, readableInstant2);
    }

    private static DataSegment computeMergedSegment(String str, String str2, List<DataSegment> list) {
        Interval computeMergedInterval = computeMergedInterval(list);
        TreeSet newTreeSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
        TreeSet newTreeSet2 = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
        for (DataSegment dataSegment : list) {
            newTreeSet.addAll(dataSegment.getDimensions());
            newTreeSet2.addAll(dataSegment.getMetrics());
        }
        return DataSegment.builder().dataSource(str).interval(computeMergedInterval).version(str2).binaryVersion(9).shardSpec(NoneShardSpec.instance()).dimensions(Lists.newArrayList(newTreeSet)).metrics(Lists.newArrayList(newTreeSet2)).build();
    }
}
