/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.DynamicFilteringInfo;
import org.apache.flink.api.connector.source.DynamicParallelismInference;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext;
import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
implements OperatorCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinator.class);
    private final WatermarkAggregator<Integer> combinedWatermark = new WatermarkAggregator();
    private final WatermarkAlignmentParams watermarkAlignmentParams;
    private final String operatorName;
    private final Source<?, SplitT, EnumChkT> source;
    private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
    private final SourceCoordinatorContext<SplitT> context;
    private final CoordinatorStore coordinatorStore;
    private SplitEnumerator<SplitT, EnumChkT> enumerator;
    private boolean started;
    @Nullable
    private final String coordinatorListeningID;

    public SourceCoordinator(String operatorName, Source<?, SplitT, EnumChkT> source, SourceCoordinatorContext<SplitT> context, CoordinatorStore coordinatorStore) {
        this(operatorName, source, context, coordinatorStore, WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, null);
    }

    public SourceCoordinator(String operatorName, Source<?, SplitT, EnumChkT> source, SourceCoordinatorContext<SplitT> context, CoordinatorStore coordinatorStore, WatermarkAlignmentParams watermarkAlignmentParams, @Nullable String coordinatorListeningID) {
        this.operatorName = operatorName;
        this.source = source;
        this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
        this.context = context;
        this.coordinatorStore = coordinatorStore;
        this.watermarkAlignmentParams = watermarkAlignmentParams;
        this.coordinatorListeningID = coordinatorListeningID;
        if (watermarkAlignmentParams.isEnabled() && context.isConcurrentExecutionAttemptsSupported()) {
            throw new IllegalArgumentException("Watermark alignment is not supported in concurrent execution attempts scenario (e.g. if speculative execution is enabled)");
        }
    }

    @VisibleForTesting
    void announceCombinedWatermark() {
        long maxAllowedWatermark;
        Preconditions.checkState(this.watermarkAlignmentParams != WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
        Watermark globalCombinedWatermark = this.coordinatorStore.apply(this.watermarkAlignmentParams.getWatermarkGroup(), value -> {
            WatermarkAggregator aggregator = (WatermarkAggregator)value;
            return new Watermark(aggregator.getAggregatedWatermark().getTimestamp());
        });
        try {
            maxAllowedWatermark = Math.addExact(globalCombinedWatermark.getTimestamp(), this.watermarkAlignmentParams.getMaxAllowedWatermarkDrift());
        }
        catch (ArithmeticException e) {
            maxAllowedWatermark = Watermark.MAX_WATERMARK.getTimestamp();
        }
        Set<Integer> subTaskIds = this.combinedWatermark.keySet();
        LOG.info("Distributing maxAllowedWatermark={} of group={} to subTaskIds={} for source {}.", new Object[]{maxAllowedWatermark, this.watermarkAlignmentParams.getWatermarkGroup(), subTaskIds, this.operatorName});
        for (Integer subtaskId : subTaskIds) {
            this.context.sendEventToSourceOperatorIfTaskReady(subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark));
        }
    }

    @Override
    public void start() throws Exception {
        LOG.info("Starting split enumerator for source {}.", (Object)this.operatorName);
        this.started = true;
        if (this.enumerator == null) {
            ClassLoader userCodeClassLoader = this.context.getCoordinatorContext().getUserCodeClassloader();
            try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(userCodeClassLoader);){
                this.enumerator = this.source.createEnumerator(this.context);
            }
            catch (Throwable t) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
                LOG.error("Failed to create Source Enumerator for source {}", (Object)this.operatorName, (Object)t);
                this.context.failJob(t);
                return;
            }
        }
        this.runInEventLoop(() -> this.enumerator.start(), "starting the SplitEnumerator.", new Object[0]);
        if (this.coordinatorListeningID != null) {
            this.coordinatorStore.compute(this.coordinatorListeningID, (key, oldValue) -> {
                if (oldValue == null || oldValue instanceof OperatorCoordinator) {
                    return this;
                }
                Preconditions.checkState(oldValue instanceof OperatorEvent, "The existing value for " + this.coordinatorStore + "is expected to be an operator event, but it is in fact " + oldValue);
                LOG.info("Handling event {} received before the source coordinator with ID {} is registered", oldValue, (Object)this.coordinatorListeningID);
                this.handleEventFromOperator(0, 0, (OperatorEvent)oldValue);
                return null;
            });
        }
        if (this.watermarkAlignmentParams.isEnabled()) {
            LOG.info("Starting schedule the period announceCombinedWatermark task");
            this.coordinatorStore.putIfAbsent(this.watermarkAlignmentParams.getWatermarkGroup(), new WatermarkAggregator());
            this.context.schedulePeriodTask(this::announceCombinedWatermark, this.watermarkAlignmentParams.getUpdateInterval(), this.watermarkAlignmentParams.getUpdateInterval(), TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void close() throws Exception {
        LOG.info("Closing SourceCoordinator for source {}.", (Object)this.operatorName);
        if (this.started) {
            IOUtils.closeQuietly(this.enumerator);
        }
        IOUtils.closeQuietly(this.context);
        LOG.info("Source coordinator for source {} closed.", (Object)this.operatorName);
    }

    @Override
    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
        this.runInEventLoop(() -> {
            if (event instanceof RequestSplitEvent) {
                this.handleRequestSplitEvent(subtask, attemptNumber, (RequestSplitEvent)event);
            } else if (event instanceof SourceEventWrapper) {
                this.handleSourceEvent(subtask, attemptNumber, ((SourceEventWrapper)event).getSourceEvent());
            } else if (event instanceof ReaderRegistrationEvent) {
                this.handleReaderRegistrationEvent(subtask, attemptNumber, (ReaderRegistrationEvent)event);
            } else if (event instanceof ReportedWatermarkEvent) {
                this.handleReportedWatermark(subtask, new Watermark(((ReportedWatermarkEvent)event).getWatermark()));
            } else {
                throw new FlinkException("Unrecognized Operator Event: " + event);
            }
        }, "handling operator event %s from subtask %d (#%d)", event, subtask, attemptNumber);
    }

    @Override
    public void executionAttemptFailed(int subtaskId, int attemptNumber, @Nullable Throwable reason) {
        this.runInEventLoop(() -> {
            LOG.info("Removing registered reader after failure for subtask {} (#{}) of source {}.", new Object[]{subtaskId, attemptNumber, this.operatorName});
            this.context.unregisterSourceReader(subtaskId, attemptNumber);
            this.context.attemptFailed(subtaskId, attemptNumber);
        }, "handling subtask %d (#%d) failure", subtaskId, attemptNumber);
    }

    @Override
    public void subtaskReset(int subtaskId, long checkpointId) {
        this.runInEventLoop(() -> {
            LOG.info("Recovering subtask {} to checkpoint {} for source {} to checkpoint.", new Object[]{subtaskId, checkpointId, this.operatorName});
            this.context.subtaskReset(subtaskId);
            List<SplitT> splitsToAddBack = this.context.getAndRemoveUncheckpointedAssignment(subtaskId, checkpointId);
            LOG.debug("Adding splits back to the split enumerator of source {}: {}", (Object)this.operatorName, splitsToAddBack);
            this.enumerator.addSplitsBack(splitsToAddBack, subtaskId);
        }, "handling subtask %d recovery to checkpoint %d", subtaskId, checkpointId);
    }

    @Override
    public void executionAttemptReady(int subtask, int attemptNumber, OperatorCoordinator.SubtaskGateway gateway) {
        Preconditions.checkArgument(subtask == gateway.getSubtask());
        Preconditions.checkArgument(attemptNumber == gateway.getExecution().getAttemptNumber());
        this.runInEventLoop(() -> this.context.attemptReady(gateway), "making event gateway to subtask %d (#%d) available", subtask, attemptNumber);
    }

    @Override
    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
        this.runInEventLoop(() -> {
            LOG.debug("Taking a state snapshot on operator {} for checkpoint {}", (Object)this.operatorName, (Object)checkpointId);
            try {
                this.context.onCheckpoint(checkpointId);
                result.complete(this.toBytes(checkpointId));
            }
            catch (Throwable e) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM(e);
                result.completeExceptionally(new CompletionException(String.format("Failed to checkpoint SplitEnumerator for source %s", this.operatorName), e));
            }
        }, "taking checkpoint %d", checkpointId);
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        this.runInEventLoop(() -> {
            LOG.info("Marking checkpoint {} as completed for source {}.", (Object)checkpointId, (Object)this.operatorName);
            this.context.onCheckpointComplete(checkpointId);
            this.enumerator.notifyCheckpointComplete(checkpointId);
        }, "notifying the enumerator of completion of checkpoint %d", checkpointId);
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId) {
        this.runInEventLoop(() -> {
            LOG.info("Marking checkpoint {} as aborted for source {}.", (Object)checkpointId, (Object)this.operatorName);
            this.enumerator.notifyCheckpointAborted(checkpointId);
        }, "calling notifyCheckpointAborted()", new Object[0]);
    }

    @Override
    public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
        Preconditions.checkState(!this.started, "The coordinator can only be reset if it was not yet started");
        assert (this.enumerator == null);
        if (checkpointData == null) {
            return;
        }
        LOG.info("Restoring SplitEnumerator of source {} from checkpoint.", (Object)this.operatorName);
        ClassLoader userCodeClassLoader = this.context.getCoordinatorContext().getUserCodeClassloader();
        try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(userCodeClassLoader);){
            EnumChkT enumeratorCheckpoint = this.deserializeCheckpoint(checkpointData);
            this.enumerator = this.source.restoreEnumerator(this.context, enumeratorCheckpoint);
        }
    }

    private void runInEventLoop(ThrowingRunnable<Throwable> action, String actionName, Object ... actionNameFormatParameters) {
        this.ensureStarted();
        if (this.enumerator == null) {
            return;
        }
        this.context.runInCoordinatorThread(() -> {
            try {
                action.run();
            }
            catch (Throwable t) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
                String actionString = String.format(actionName, actionNameFormatParameters);
                LOG.error("Uncaught exception in the SplitEnumerator for Source {} while {}. Triggering job failover.", new Object[]{this.operatorName, actionString, t});
                this.context.failJob(t);
            }
        });
    }

    @VisibleForTesting
    SplitEnumerator<SplitT, EnumChkT> getEnumerator() {
        return this.enumerator;
    }

    @VisibleForTesting
    SourceCoordinatorContext<SplitT> getContext() {
        return this.context;
    }

    private byte[] toBytes(long checkpointId) throws Exception {
        return SourceCoordinator.writeCheckpointBytes(this.enumerator.snapshotState(checkpointId), this.enumCheckpointSerializer);
    }

    /*
     * Exception decompiling
     */
    static <EnumChkT> byte[] writeCheckpointBytes(EnumChkT enumeratorCheckpoint, SimpleVersionedSerializer<EnumChkT> enumeratorCheckpointSerializer) throws Exception {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    /*
     * Exception decompiling
     */
    private EnumChkT deserializeCheckpoint(byte[] bytes) throws Exception {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void handleRequestSplitEvent(int subtask, int attemptNumber, RequestSplitEvent event) {
        LOG.info("Source {} received split request from parallel task {} (#{})", new Object[]{this.operatorName, subtask, attemptNumber});
        if (!this.context.hasNoMoreSplits(subtask)) {
            this.enumerator.handleSplitRequest(subtask, event.hostName());
        }
    }

    private void handleSourceEvent(int subtask, int attemptNumber, SourceEvent event) {
        LOG.debug("Source {} received custom event from parallel task {} (#{}): {}", new Object[]{this.operatorName, subtask, attemptNumber, event});
        if (this.context.isConcurrentExecutionAttemptsSupported()) {
            Preconditions.checkState(this.enumerator instanceof SupportsHandleExecutionAttemptSourceEvent, "The split enumerator %s must implement SupportsHandleExecutionAttemptSourceEvent to be used in concurrent execution attempts scenario (e.g. if speculative execution is enabled).", this.enumerator.getClass().getCanonicalName());
            ((SupportsHandleExecutionAttemptSourceEvent)((Object)this.enumerator)).handleSourceEvent(subtask, attemptNumber, event);
        } else {
            this.enumerator.handleSourceEvent(subtask, event);
        }
    }

    private void handleReaderRegistrationEvent(int subtask, int attemptNumber, ReaderRegistrationEvent event) {
        Preconditions.checkArgument(subtask == event.subtaskId());
        LOG.info("Source {} registering reader for parallel task {} (#{}) @ {}", new Object[]{this.operatorName, subtask, attemptNumber, event.location()});
        boolean subtaskReaderExisted = this.context.registeredReadersOfAttempts().containsKey(subtask);
        this.context.registerSourceReader(subtask, attemptNumber, event.location());
        if (!subtaskReaderExisted) {
            this.enumerator.addReader(event.subtaskId());
            Boolean isBacklog = this.context.isBacklog().getAsBoolean();
            if (isBacklog != null) {
                this.context.sendEventToSourceOperatorIfTaskReady(subtask, new IsProcessingBacklogEvent(isBacklog));
            }
        }
    }

    private void handleReportedWatermark(int subtask, Watermark watermark) throws FlinkException {
        if (this.context.isConcurrentExecutionAttemptsSupported()) {
            throw new FlinkException("ReportedWatermarkEvent is not supported in concurrent execution attempts scenario (e.g. if speculative execution is enabled)");
        }
        LOG.debug("New reported watermark={} from subTaskId={} of source {}.", new Object[]{watermark, subtask, this.operatorName});
        Preconditions.checkState(this.watermarkAlignmentParams.isEnabled());
        this.combinedWatermark.aggregate(subtask, watermark).ifPresent(newCombinedWatermark -> this.coordinatorStore.computeIfPresent(this.watermarkAlignmentParams.getWatermarkGroup(), (key, oldValue) -> {
            WatermarkAggregator watermarkAggregator = (WatermarkAggregator)oldValue;
            watermarkAggregator.aggregate(this.operatorName, (Watermark)newCombinedWatermark);
            return watermarkAggregator;
        }));
    }

    private void ensureStarted() {
        if (!this.started) {
            throw new IllegalStateException("The coordinator has not started yet.");
        }
    }

    private Optional<DynamicFilteringInfo> getSourceDynamicFilteringInfo() {
        SourceEvent sourceEvent;
        Object event;
        if (this.coordinatorListeningID != null && this.coordinatorStore.containsKey(this.coordinatorListeningID) && (event = this.coordinatorStore.get(this.coordinatorListeningID)) instanceof SourceEventWrapper && (sourceEvent = ((SourceEventWrapper)event).getSourceEvent()) instanceof DynamicFilteringInfo) {
            return Optional.of((DynamicFilteringInfo)((Object)sourceEvent));
        }
        return Optional.empty();
    }

    public CompletableFuture<Integer> inferSourceParallelismAsync(final int parallelismInferenceUpperBound, final long dataVolumePerTask) {
        return this.context.supplyAsync(() -> {
            if (!(this.source instanceof DynamicParallelismInference)) {
                return -1;
            }
            DynamicParallelismInference parallelismInference = (DynamicParallelismInference)((Object)this.source);
            try {
                return parallelismInference.inferParallelism(new DynamicParallelismInference.Context(){

                    @Override
                    public Optional<DynamicFilteringInfo> getDynamicFilteringInfo() {
                        return SourceCoordinator.this.getSourceDynamicFilteringInfo();
                    }

                    @Override
                    public int getParallelismInferenceUpperBound() {
                        return parallelismInferenceUpperBound;
                    }

                    @Override
                    public long getDataVolumePerTask() {
                        return dataVolumePerTask;
                    }
                });
            }
            catch (Throwable e) {
                LOG.error("Unexpected error occurred when dynamically inferring source parallelism.", e);
                return -1;
            }
        }).thenApply(future -> (Integer)future);
    }

    static class WatermarkAggregator<T> {
        private final Map<T, WatermarkElement> watermarks = new HashMap<T, WatermarkElement>();
        private final HeapPriorityQueue<WatermarkElement> orderedWatermarks = new HeapPriorityQueue(PriorityComparator.forPriorityComparableObjects(), 10);
        private static final Watermark DEFAULT_WATERMARK = new Watermark(Long.MIN_VALUE);

        WatermarkAggregator() {
        }

        public Optional<Watermark> aggregate(T key, Watermark watermark) {
            Watermark oldAggregatedWatermark = this.getAggregatedWatermark();
            WatermarkElement watermarkElement = new WatermarkElement(watermark);
            WatermarkElement oldWatermarkElement = this.watermarks.put(key, watermarkElement);
            if (oldWatermarkElement != null) {
                this.orderedWatermarks.remove(oldWatermarkElement);
            }
            this.orderedWatermarks.add(watermarkElement);
            Watermark newAggregatedWatermark = this.getAggregatedWatermark();
            if (newAggregatedWatermark.equals(oldAggregatedWatermark)) {
                return Optional.empty();
            }
            return Optional.of(newAggregatedWatermark);
        }

        public Set<T> keySet() {
            return this.watermarks.keySet();
        }

        public Watermark getAggregatedWatermark() {
            WatermarkElement aggregatedWatermarkElement = (WatermarkElement)this.orderedWatermarks.peek();
            return aggregatedWatermarkElement == null ? DEFAULT_WATERMARK : aggregatedWatermarkElement.watermark;
        }
    }

    public static class WatermarkElement
    extends AbstractHeapPriorityQueueElement
    implements PriorityComparable<WatermarkElement> {
        private final Watermark watermark;

        public WatermarkElement(Watermark watermark) {
            this.watermark = watermark;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o instanceof WatermarkElement) {
                return this.watermark.equals(((WatermarkElement)o).watermark);
            }
            return false;
        }

        public int hashCode() {
            return this.watermark.hashCode();
        }

        @Override
        public int comparePriorityTo(@Nonnull WatermarkElement other) {
            return Long.compare(this.watermark.getTimestamp(), other.watermark.getTimestamp());
        }
    }
}

