/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.yarn.batch.partition;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.StepExecutionSplitter;
import org.springframework.yarn.am.YarnAppmaster;
import org.springframework.yarn.am.container.ContainerRequestHint;
import org.springframework.yarn.batch.am.BatchYarnAppmaster;
import org.springframework.yarn.batch.listener.PartitionedStepExecutionStateListener;
import org.springframework.yarn.listener.AppmasterStateListener;

public abstract class AbstractPartitionHandler
implements PartitionHandler {
    private static final Log log = LogFactory.getLog(AbstractPartitionHandler.class);
    private BatchYarnAppmaster batchAppmaster;
    private String stepName = "remoteStep";
    private String keySplitLocations = "splitLocations";

    public AbstractPartitionHandler() {
    }

    public AbstractPartitionHandler(BatchYarnAppmaster batchAppmaster) {
        this.batchAppmaster = batchAppmaster;
    }

    public final Collection<StepExecution> handle(StepExecutionSplitter stepSplitter, StepExecution stepExecution) throws Exception {
        log.info((Object)("Handling stepExecution=[" + stepExecution + "] with jobParameters=[" + stepExecution.getJobParameters() + "]"));
        Set<StepExecution> split = this.createSplits(stepSplitter, stepExecution);
        log.info((Object)("Created " + split.size() + " splits for stepName=" + this.stepName));
        Map<StepExecution, ContainerRequestHint> resourceRequests = this.createRequestData(split);
        log.info((Object)("Resource request map size is " + resourceRequests.size()));
        if (log.isDebugEnabled()) {
            for (Map.Entry<StepExecution, ContainerRequestHint> entry : resourceRequests.entrySet()) {
                log.debug((Object)("Entry stepExecution=[" + entry.getKey() + "] requestData=[" + entry.getValue() + "]"));
            }
        }
        this.batchAppmaster.addStepSplits(stepExecution, this.stepName, split, resourceRequests);
        this.waitCompleteState(stepExecution);
        ArrayList<StepExecution> result = new ArrayList<StepExecution>(this.batchAppmaster.getStepExecutions());
        result.addAll(this.batchAppmaster.getStepExecutions());
        log.info((Object)"Listing statuses of remote executions");
        for (StepExecution execution : result) {
            log.info((Object)("Remote stepExecution=[" + execution + "]"));
        }
        return result;
    }

    public void setBatchAppmaster(BatchYarnAppmaster batchAppmaster) {
        this.batchAppmaster = batchAppmaster;
    }

    public void setYarnAppmaster(YarnAppmaster yarnAppmaster) {
        if (yarnAppmaster instanceof BatchYarnAppmaster) {
            this.setBatchAppmaster((BatchYarnAppmaster)yarnAppmaster);
        }
    }

    public void setKeySplitLocations(String keySplitLocations) {
        this.keySplitLocations = keySplitLocations;
    }

    public String getKeySplitLocations() {
        return this.keySplitLocations;
    }

    public String getStepName() {
        return this.stepName;
    }

    public void setStepName(String stepName) {
        this.stepName = stepName;
    }

    protected abstract Set<StepExecution> createSplits(StepExecutionSplitter var1, StepExecution var2) throws Exception;

    protected Map<StepExecution, ContainerRequestHint> createRequestData(Set<StepExecution> stepExecutions) throws Exception {
        return new HashMap<StepExecution, ContainerRequestHint>();
    }

    protected void waitCompleteState(final StepExecution masterStepExecution) {
        final CountDownLatch latch = new CountDownLatch(1);
        this.batchAppmaster.addAppmasterStateListener(new AppmasterStateListener(){

            public void state(AppmasterStateListener.AppmasterState state) {
                log.info((Object)("AppmasterStateListener state=[" + state + "]"));
                if (state == AppmasterStateListener.AppmasterState.COMPLETED || state == AppmasterStateListener.AppmasterState.FAILED) {
                    latch.countDown();
                }
            }
        });
        this.batchAppmaster.addPartitionedStepExecutionStateListener(new PartitionedStepExecutionStateListener(){

            @Override
            public void state(PartitionedStepExecutionStateListener.PartitionedStepExecutionState state, StepExecution stepExecution) {
                log.info((Object)("PartitionedStepExecutionStateListener state=[" + (Object)((Object)state) + "] stepExecution=[" + stepExecution + "] masterStepExecution=[" + masterStepExecution + "]"));
                if (state == PartitionedStepExecutionStateListener.PartitionedStepExecutionState.COMPLETED && masterStepExecution.equals((Object)stepExecution)) {
                    log.info((Object)("Got complete state for stepExecution=[" + stepExecution + "]"));
                    latch.countDown();
                }
            }
        });
        try {
            latch.await();
        }
        catch (Exception e) {
            log.warn((Object)"Latch wait interrupted, we may not be finished!");
        }
        log.info((Object)"Waiting latch complete");
    }
}

