/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.hub.legacy.flow.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.marklogic.client.DatabaseClient;
import com.marklogic.client.FailedRequestException;
import com.marklogic.client.datamovement.Batcher;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.JobTicket;
import com.marklogic.client.datamovement.QueryBatcher;
import com.marklogic.client.datamovement.impl.JobTicketImpl;
import com.marklogic.client.extensions.ResourceManager;
import com.marklogic.client.extensions.ResourceServices;
import com.marklogic.client.io.Format;
import com.marklogic.client.io.StringHandle;
import com.marklogic.client.io.marker.AbstractReadHandle;
import com.marklogic.client.io.marker.AbstractWriteHandle;
import com.marklogic.client.util.RequestParameters;
import com.marklogic.hub.DatabaseKind;
import com.marklogic.hub.HubConfig;
import com.marklogic.hub.legacy.collector.DiskQueue;
import com.marklogic.hub.legacy.collector.LegacyCollector;
import com.marklogic.hub.legacy.flow.CodeFormat;
import com.marklogic.hub.legacy.flow.LegacyFlow;
import com.marklogic.hub.legacy.flow.LegacyFlowFinishedListener;
import com.marklogic.hub.legacy.flow.LegacyFlowItemCompleteListener;
import com.marklogic.hub.legacy.flow.LegacyFlowItemFailureListener;
import com.marklogic.hub.legacy.flow.LegacyFlowRunner;
import com.marklogic.hub.legacy.flow.LegacyFlowStatusListener;
import com.marklogic.hub.legacy.flow.RunFlowResponse;
import com.marklogic.hub.legacy.job.Job;
import com.marklogic.hub.legacy.job.JobStatus;
import com.marklogic.hub.legacy.job.LegacyJobManager;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;

public class LegacyFlowRunnerImpl
implements LegacyFlowRunner {
    private static final int DEFAULT_BATCH_SIZE = 100;
    private static final int DEFAULT_THREAD_COUNT = 4;
    private static final int MAX_ERROR_MESSAGES = 10;
    private LegacyFlow flow;
    private int batchSize = 100;
    private int threadCount = 4;
    private DatabaseClient stagingClient;
    private String destinationDatabase;
    private Map<String, Object> options;
    private int previousPercentComplete;
    private boolean stopOnFailure = false;
    private List<LegacyFlowItemCompleteListener> flowItemCompleteListeners = new ArrayList<LegacyFlowItemCompleteListener>();
    private List<LegacyFlowItemFailureListener> flowItemFailureListeners = new ArrayList<LegacyFlowItemFailureListener>();
    private List<LegacyFlowStatusListener> flowStatusListeners = new ArrayList<LegacyFlowStatusListener>();
    private List<LegacyFlowFinishedListener> flowFinishedListeners = new ArrayList<LegacyFlowFinishedListener>();
    private HubConfig hubConfig;
    private Thread runningThread = null;

    public LegacyFlowRunnerImpl(HubConfig hubConfig) {
        this.hubConfig = hubConfig;
        this.stagingClient = hubConfig.newStagingClient();
        this.destinationDatabase = hubConfig.getDbName(DatabaseKind.FINAL);
    }

    protected LegacyFlowRunnerImpl() {
    }

    @Override
    public LegacyFlowRunner withFlow(LegacyFlow flow) {
        this.flow = flow;
        return this;
    }

    @Override
    public LegacyFlowRunner withBatchSize(int batchSize) {
        this.batchSize = batchSize;
        return this;
    }

    @Override
    public LegacyFlowRunner withThreadCount(int threadCount) {
        this.threadCount = threadCount;
        return this;
    }

    @Override
    public LegacyFlowRunner withSourceClient(DatabaseClient stagingClient) {
        this.stagingClient = stagingClient;
        return this;
    }

    @Override
    public LegacyFlowRunner withDestinationDatabase(String destinationDatabase) {
        this.destinationDatabase = destinationDatabase;
        return this;
    }

    @Override
    public LegacyFlowRunner withStopOnFailure(boolean stopOnFailure) {
        this.stopOnFailure = stopOnFailure;
        return this;
    }

    @Override
    public LegacyFlowRunner withOptions(Map<String, Object> options) {
        this.options = options;
        return this;
    }

    @Override
    public LegacyFlowRunner onItemComplete(LegacyFlowItemCompleteListener listener) {
        this.flowItemCompleteListeners.add(listener);
        return this;
    }

    @Override
    public LegacyFlowRunner onItemFailed(LegacyFlowItemFailureListener listener) {
        this.flowItemFailureListeners.add(listener);
        return this;
    }

    @Override
    public LegacyFlowRunner onStatusChanged(LegacyFlowStatusListener listener) {
        this.flowStatusListeners.add(listener);
        return this;
    }

    @Override
    public LegacyFlowRunner onFinished(LegacyFlowFinishedListener listener) {
        this.flowFinishedListeners.add(listener);
        return this;
    }

    @Override
    public void awaitCompletion() {
        try {
            this.awaitCompletion(Long.MAX_VALUE, TimeUnit.DAYS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @Override
    public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
        if (this.runningThread != null) {
            this.runningThread.join(unit.convert(timeout, TimeUnit.MILLISECONDS));
        }
    }

    @Override
    @Deprecated
    public JobTicket run() {
        DiskQueue<String> uris;
        String jobId = UUID.randomUUID().toString();
        LegacyJobManager jobManager = LegacyJobManager.create(this.hubConfig.newJobDbClient());
        Job job = Job.withFlow(this.flow).withJobId(jobId);
        jobManager.saveJob(job);
        LegacyCollector c = this.flow.getCollector();
        c.setHubConfig(this.hubConfig);
        c.setClient(this.stagingClient);
        AtomicLong successfulEvents = new AtomicLong(0L);
        AtomicLong failedEvents = new AtomicLong(0L);
        AtomicLong successfulBatches = new AtomicLong(0L);
        AtomicLong failedBatches = new AtomicLong(0L);
        if (this.options == null) {
            this.options = new HashMap<String, Object>();
        }
        this.options.put("entity", this.flow.getEntityName());
        this.options.put("flow", this.flow.getName());
        this.options.put("flowType", this.flow.getType().toString());
        this.flowStatusListeners.forEach(listener -> listener.onStatusChange(jobId, 0, "running collector"));
        jobManager.saveJob(job.withStatus(JobStatus.RUNNING_COLLECTOR));
        try {
            uris = c.run(jobId, this.flow.getEntityName(), this.flow.getName(), this.threadCount, this.options);
        }
        catch (Exception e) {
            job.setCounts(0L, 0L, 0L, 0L).withStatus(JobStatus.FAILED).withEndTime(new Date());
            StringWriter errors = new StringWriter();
            e.printStackTrace(new PrintWriter(errors));
            job.withJobOutput(errors.toString());
            jobManager.saveJob(job);
            return new JobTicketImpl(jobId, JobTicket.JobType.QUERY_BATCHER);
        }
        this.flowStatusListeners.forEach(listener -> listener.onStatusChange(jobId, 0, "starting harmonization"));
        Vector errorMessages = new Vector();
        DataMovementManager dataMovementManager = this.stagingClient.newDataMovementManager();
        int uriCount = uris.size();
        double batchCount = Math.ceil((double)uris.size() / (double)this.batchSize);
        HashMap<String, JobTicket> ticketWrapper = new HashMap<String, JobTicket>();
        ConcurrentHashMap databaseClientMap = new ConcurrentHashMap();
        QueryBatcher queryBatcher = dataMovementManager.newQueryBatcher(uris.iterator()).withBatchSize(this.batchSize).withThreadCount(this.threadCount).withJobId(jobId).onUrisReady(batch -> {
            block11: {
                try {
                    JobTicket jobTicket;
                    FlowResource flowResource;
                    if (databaseClientMap.containsKey(batch.getClient())) {
                        flowResource = (FlowResource)((Object)((Object)databaseClientMap.get(batch.getClient())));
                    } else {
                        flowResource = new FlowResource(batch.getClient(), this.destinationDatabase, this.flow);
                        databaseClientMap.put(batch.getClient(), flowResource);
                    }
                    RunFlowResponse response = flowResource.run(jobId, (String[])batch.getItems(), this.options);
                    failedEvents.addAndGet(response.errorCount);
                    successfulEvents.addAndGet(response.totalCount - response.errorCount);
                    if (response.errors != null && errorMessages.size() < 10) {
                        errorMessages.addAll(response.errors.stream().map(jsonNode -> this.jsonToString((JsonNode)jsonNode)).collect(Collectors.toList()));
                    }
                    if (response.errorCount < response.totalCount) {
                        successfulBatches.addAndGet(1L);
                    } else {
                        failedBatches.addAndGet(1L);
                    }
                    int percentComplete = (int)((double)successfulBatches.get() / batchCount * 100.0);
                    if (percentComplete != this.previousPercentComplete && percentComplete % 5 == 0) {
                        this.previousPercentComplete = percentComplete;
                        this.flowStatusListeners.forEach(listener -> listener.onStatusChange(jobId, percentComplete, ""));
                    }
                    if (this.flowItemCompleteListeners.size() > 0) {
                        response.completedItems.forEach(item -> this.flowItemCompleteListeners.forEach(listener -> listener.processCompletion(jobId, (String)item)));
                    }
                    if (this.flowItemFailureListeners.size() > 0) {
                        response.failedItems.forEach(item -> this.flowItemFailureListeners.forEach(listener -> listener.processFailure(jobId, (String)item)));
                    }
                    if (this.stopOnFailure && response.errorCount > 0L && (jobTicket = (JobTicket)ticketWrapper.get("jobTicket")) != null) {
                        dataMovementManager.stopJob(jobTicket);
                    }
                }
                catch (Exception e) {
                    if (errorMessages.size() >= 10) break block11;
                    errorMessages.add(e.toString());
                }
            }
        }).onQueryFailure(failure -> {
            failedBatches.addAndGet(1L);
            failedEvents.addAndGet(this.batchSize);
        });
        JobTicket jobTicket = dataMovementManager.startJob(queryBatcher);
        ticketWrapper.put("jobTicket", jobTicket);
        jobManager.saveJob(job.withStatus(JobStatus.RUNNING_HARMONIZE));
        this.runningThread = new Thread(() -> {
            queryBatcher.awaitCompletion();
            this.flowStatusListeners.forEach(listener -> listener.onStatusChange(jobId, 100, ""));
            this.flowFinishedListeners.forEach(LegacyFlowFinishedListener::onFlowFinished);
            dataMovementManager.stopJob((Batcher)queryBatcher);
            JobStatus status = failedEvents.get() > 0L && this.stopOnFailure ? JobStatus.STOP_ON_ERROR : (failedEvents.get() + successfulEvents.get() != (long)uriCount ? JobStatus.CANCELED : (failedEvents.get() > 0L && successfulEvents.get() > 0L ? JobStatus.FINISHED_WITH_ERRORS : (failedEvents.get() == 0L && successfulEvents.get() > 0L || uriCount == 0 ? JobStatus.FINISHED : JobStatus.FAILED)));
            job.setCounts(successfulEvents.get(), failedEvents.get(), successfulBatches.get(), failedBatches.get()).withStatus(status).withEndTime(new Date());
            if (errorMessages.size() > 0) {
                job.withJobOutput(errorMessages);
            }
            jobManager.saveJob(job);
        });
        this.runningThread.start();
        return jobTicket;
    }

    private String jsonToString(JsonNode node) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            return objectMapper.writeValueAsString((Object)node);
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    @Deprecated
    protected RunFlowResponse handleFlowRunnerException(Exception e) {
        ObjectMapper objectMapper = new ObjectMapper();
        RunFlowResponse resp = null;
        if (e instanceof FailedRequestException && StringUtils.containsIgnoreCase((CharSequence)((FailedRequestException)e).getFailedRequest().getStatus(), (CharSequence)"Plugin error")) {
            try {
                resp = (RunFlowResponse)objectMapper.readValue(((FailedRequestException)e).getFailedRequest().getMessage(), RunFlowResponse.class);
            }
            catch (IOException ex) {
                throw new RuntimeException("Unexpected IO error while parsing exception from running flow; original exception: " + e.getMessage());
            }
        } else {
            throw new RuntimeException(e);
        }
        return resp;
    }

    class FlowResource
    extends ResourceManager {
        private DatabaseClient srcClient;
        private String targetDatabase;
        private LegacyFlow flow;

        public FlowResource(DatabaseClient srcClient, String targetDatabase, LegacyFlow flow) {
            this.flow = flow;
            this.srcClient = srcClient;
            this.targetDatabase = targetDatabase;
            this.srcClient.init(flow.getCodeFormat().equals((Object)CodeFormat.JAVASCRIPT) ? "mlSjsFlow" : "mlFlow", (ResourceManager)this);
        }

        public RunFlowResponse run(String jobId, String[] items) {
            return this.run(jobId, items, null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public RunFlowResponse run(String jobId, String[] items, Map<String, Object> options) {
            RunFlowResponse resp = null;
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                RequestParameters params = new RequestParameters();
                params.add("entity-name", this.flow.getEntityName());
                params.add("flow-name", this.flow.getName());
                params.put("job-id", jobId);
                params.put("identifiers", items);
                params.put("target-database", this.targetDatabase);
                if (options != null) {
                    params.put("options", objectMapper.writeValueAsString(options));
                }
                try (ResourceServices.ServiceResultIterator resultItr = this.getServices().post(params, (AbstractWriteHandle)new StringHandle("{}").withFormat(Format.JSON), new String[0]);){
                    if (resultItr == null || !resultItr.hasNext()) {
                        resp = new RunFlowResponse();
                    } else {
                        ResourceServices.ServiceResult res = (ResourceServices.ServiceResult)resultItr.next();
                        StringHandle handle = new StringHandle();
                        resp = (RunFlowResponse)objectMapper.readValue(((StringHandle)res.getContent((AbstractReadHandle)handle)).get(), RunFlowResponse.class);
                    }
                }
            }
            catch (Exception e) {
                resp = LegacyFlowRunnerImpl.this.handleFlowRunnerException(e);
            }
            return resp;
        }
    }
}

