/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.table.transform;

import alluxio.client.job.JobMasterClient;
import alluxio.collections.Pair;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.status.NotFoundException;
import alluxio.exception.status.UnavailableException;
import alluxio.heartbeat.FixedIntervalSupplier;
import alluxio.heartbeat.HeartbeatExecutor;
import alluxio.heartbeat.HeartbeatThread;
import alluxio.job.JobConfig;
import alluxio.job.wire.JobInfo;
import alluxio.job.wire.Status;
import alluxio.job.workflow.composite.CompositeConfig;
import alluxio.master.journal.DelegatingJournaled;
import alluxio.master.journal.JournalContext;
import alluxio.master.journal.Journaled;
import alluxio.master.journal.checkpoint.CheckpointName;
import alluxio.master.table.AlluxioCatalog;
import alluxio.master.table.transform.TransformJobInfo;
import alluxio.proto.journal.Journal;
import alluxio.proto.journal.Table;
import alluxio.resource.CloseableIterator;
import alluxio.security.user.UserState;
import alluxio.table.common.Layout;
import alluxio.table.common.transform.TransformDefinition;
import alluxio.table.common.transform.TransformPlan;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransformManager
implements DelegatingJournaled {
    private static final Logger LOG = LoggerFactory.getLogger(TransformManager.class);
    private static final long INVALID_JOB_ID = -1L;
    private final ThrowingSupplier<JournalContext, UnavailableException> mCreateJournalContext;
    private final AlluxioCatalog mCatalog;
    private final JobMasterClient mJobMasterClient;
    private final Cache<Long, TransformJobInfo> mJobHistory = CacheBuilder.newBuilder().expireAfterWrite(Configuration.getMs((PropertyKey)PropertyKey.TABLE_TRANSFORM_MANAGER_JOB_HISTORY_RETENTION_TIME), TimeUnit.MILLISECONDS).build();
    private final State mState = new State();

    public TransformManager(ThrowingSupplier<JournalContext, UnavailableException> createJournalContext, AlluxioCatalog catalog, JobMasterClient jobMasterClient) {
        this.mCreateJournalContext = createJournalContext;
        this.mCatalog = catalog;
        this.mJobMasterClient = jobMasterClient;
    }

    public void start(ExecutorService executorService, UserState userState) {
        executorService.submit((Runnable)new HeartbeatThread("Master Table Transformation Monitor", (HeartbeatExecutor)new JobMonitor(), () -> new FixedIntervalSupplier(Configuration.getMs((PropertyKey)PropertyKey.TABLE_TRANSFORM_MANAGER_JOB_MONITOR_INTERVAL)), Configuration.global(), userState));
    }

    public long execute(String dbName, String tableName, TransformDefinition definition) throws IOException {
        long jobId;
        List<TransformPlan> plans = this.mCatalog.getTransformPlan(dbName, tableName, definition);
        if (plans.isEmpty()) {
            throw new IOException(ExceptionMessage.TABLE_ALREADY_TRANSFORMED.getMessage(new Object[]{dbName, tableName, definition.getDefinition()}));
        }
        Pair dbTable = new Pair((Object)dbName, (Object)tableName);
        Long existingJobId = this.mState.acquireJobPermit((Pair<String, String>)dbTable);
        if (existingJobId != null) {
            if (existingJobId == -1L) {
                throw new IOException("A concurrent transformation request is going to be executed");
            }
            throw new IOException(ExceptionMessage.TABLE_BEING_TRANSFORMED.getMessage(new Object[]{existingJobId.toString(), tableName, dbName}));
        }
        ArrayList<CompositeConfig> concurrentJobs = new ArrayList<CompositeConfig>(plans.size());
        for (TransformPlan plan : plans) {
            concurrentJobs.add(new CompositeConfig(plan.getJobConfigs(), Boolean.valueOf(true)));
        }
        CompositeConfig transformJob = new CompositeConfig(concurrentJobs, Boolean.valueOf(false));
        try {
            jobId = this.mJobMasterClient.run((JobConfig)transformJob);
        }
        catch (IOException e) {
            this.mState.releaseJobPermit((Pair<String, String>)dbTable);
            String error = String.format("Fails to start job to transform table %s in database %s", tableName, dbName);
            LOG.error(error, (Throwable)e);
            throw new IOException(error, e);
        }
        HashMap<String, Layout> transformedLayouts = new HashMap<String, Layout>(plans.size());
        for (TransformPlan plan : plans) {
            transformedLayouts.put(plan.getBaseLayout().getSpec(), plan.getTransformedLayout());
        }
        Table.AddTransformJobInfoEntry journalEntry = Table.AddTransformJobInfoEntry.newBuilder().setDbName(dbName).setTableName(tableName).setDefinition(definition.getDefinition()).setJobId(jobId).putAllTransformedLayouts(Maps.transformValues(transformedLayouts, Layout::toProto)).build();
        try (JournalContext journalContext = this.mCreateJournalContext.apply();){
            this.applyAndJournal((Supplier)journalContext, Journal.JournalEntry.newBuilder().setAddTransformJobInfo(journalEntry).build());
        }
        return jobId;
    }

    public Optional<TransformJobInfo> getTransformJobInfo(long jobId) {
        TransformJobInfo job = this.mState.getRunningJob(jobId);
        if (job == null) {
            job = (TransformJobInfo)this.mJobHistory.getIfPresent((Object)jobId);
        }
        return job == null ? Optional.empty() : Optional.of(job);
    }

    public List<TransformJobInfo> getAllTransformJobInfo() {
        ArrayList jobs = Lists.newArrayList(this.mJobHistory.asMap().values());
        jobs.addAll(this.mState.getRunningJobs());
        jobs.sort(Comparator.comparing(TransformJobInfo::getJobId));
        return jobs;
    }

    public Journaled getDelegate() {
        return this.mState;
    }

    @FunctionalInterface
    public static interface ThrowingSupplier<R, E extends Throwable> {
        public R apply() throws E;
    }

    private final class State
    implements Journaled {
        private final ConcurrentHashMap<Pair<String, String>, Long> mRunningJobIds = new ConcurrentHashMap();
        private final ConcurrentHashMap<Long, TransformJobInfo> mRunningJobs = new ConcurrentHashMap();

        private State() {
        }

        public Collection<TransformJobInfo> getRunningJobs() {
            return this.mRunningJobs.values();
        }

        public TransformJobInfo getRunningJob(long jobId) {
            return this.mRunningJobs.get(jobId);
        }

        public Long acquireJobPermit(Pair<String, String> dbTable) {
            return this.mRunningJobIds.putIfAbsent(dbTable, -1L);
        }

        public void releaseJobPermit(Pair<String, String> dbTable) {
            this.mRunningJobIds.remove(dbTable);
        }

        public boolean processJournalEntry(Journal.JournalEntry entry) {
            if (entry.hasAddTransformJobInfo()) {
                this.applyAddTransformJobInfoEntry(entry.getAddTransformJobInfo());
            } else if (entry.hasRemoveTransformJobInfo()) {
                this.applyRemoveTransformJobInfoEntry(entry.getRemoveTransformJobInfo());
            } else {
                return false;
            }
            return true;
        }

        private void applyAddTransformJobInfoEntry(Table.AddTransformJobInfoEntry entry) {
            Map layouts = entry.getTransformedLayoutsMap();
            Map transformedLayouts = Maps.transformValues((Map)layouts, layout -> TransformManager.this.mCatalog.getLayoutRegistry().create(layout));
            TransformJobInfo job = new TransformJobInfo(entry.getDbName(), entry.getTableName(), entry.getDefinition(), entry.getJobId(), transformedLayouts);
            this.mRunningJobIds.put(job.getDbTable(), job.getJobId());
            this.mRunningJobs.put(job.getJobId(), job);
        }

        private void applyRemoveTransformJobInfoEntry(Table.RemoveTransformJobInfoEntry entry) {
            Pair dbTable = new Pair((Object)entry.getDbName(), (Object)entry.getTableName());
            long jobId = this.mRunningJobIds.get(dbTable);
            this.mRunningJobs.remove(jobId);
            this.mRunningJobIds.remove(dbTable);
        }

        public void resetState() {
            this.mRunningJobs.clear();
            this.mRunningJobIds.clear();
            TransformManager.this.mJobHistory.invalidateAll();
            TransformManager.this.mJobHistory.cleanUp();
        }

        public CloseableIterator<Journal.JournalEntry> getJournalEntryIterator() {
            return CloseableIterator.noopCloseable((Iterator)Iterators.transform(this.mRunningJobs.values().iterator(), job -> {
                Table.AddTransformJobInfoEntry journal = Table.AddTransformJobInfoEntry.newBuilder().setDbName(job.getDb()).setTableName(job.getTable()).setDefinition(job.getDefinition()).setJobId(job.getJobId()).putAllTransformedLayouts(Maps.transformValues(job.getTransformedLayouts(), Layout::toProto)).build();
                return Journal.JournalEntry.newBuilder().setAddTransformJobInfo(journal).build();
            }));
        }

        public CheckpointName getCheckpointName() {
            return CheckpointName.TABLE_MASTER_TRANSFORM_MANAGER;
        }
    }

    private final class JobMonitor
    implements HeartbeatExecutor {
        private JobMonitor() {
        }

        private void onFinish(TransformJobInfo job) {
            TransformManager.this.mJobHistory.put((Object)job.getJobId(), (Object)job);
            Table.RemoveTransformJobInfoEntry journalEntry = Table.RemoveTransformJobInfoEntry.newBuilder().setDbName(job.getDb()).setTableName(job.getTable()).build();
            try (JournalContext journalContext = TransformManager.this.mCreateJournalContext.apply();){
                TransformManager.this.applyAndJournal((Supplier)journalContext, Journal.JournalEntry.newBuilder().setRemoveTransformJobInfo(journalEntry).build());
            }
            catch (UnavailableException e) {
                LOG.error("Failed to create journal for RemoveTransformJobInfo for database {} table {}", (Object)job.getDb(), (Object)job.getTable());
            }
        }

        private void handleJobError(TransformJobInfo job, Status status, String error) {
            job.setJobStatus(status);
            job.setJobErrorMessage(error);
            this.onFinish(job);
        }

        private void handleJobSuccess(TransformJobInfo job) {
            try (JournalContext journalContext = TransformManager.this.mCreateJournalContext.apply();){
                TransformManager.this.mCatalog.completeTransformTable(journalContext, job.getDb(), job.getTable(), job.getDefinition(), job.getTransformedLayouts());
                job.setJobStatus(Status.COMPLETED);
            }
            catch (IOException e) {
                String error = String.format("Failed to update partition layouts for database %s table %s", job.getDb(), job.getTable());
                LOG.error(error);
                job.setJobStatus(Status.FAILED);
                job.setJobErrorMessage(error);
            }
            this.onFinish(job);
        }

        public void heartbeat(long timeLimitMs) throws InterruptedException {
            for (TransformJobInfo job : TransformManager.this.mState.getRunningJobs()) {
                if (Thread.currentThread().isInterrupted()) {
                    throw new InterruptedException("TransformManager's heartbeat was interrupted");
                }
                long jobId = job.getJobId();
                try {
                    LOG.debug("Polling for status of transformation job {}", (Object)jobId);
                    JobInfo jobInfo = TransformManager.this.mJobMasterClient.getJobStatus(jobId);
                    switch (jobInfo.getStatus()) {
                        case FAILED: 
                        case CANCELED: {
                            LOG.warn("Transformation job {} for database {} table {} {}: {}", new Object[]{jobId, job.getDb(), job.getTable(), jobInfo.getStatus() == Status.FAILED ? "failed" : "canceled", jobInfo.getErrorMessage()});
                            this.handleJobError(job, jobInfo.getStatus(), jobInfo.getErrorMessage());
                            break;
                        }
                        case COMPLETED: {
                            LOG.info("Transformation job {} for database {} table {} succeeds", new Object[]{jobId, job.getDb(), job.getTable()});
                            this.handleJobSuccess(job);
                            break;
                        }
                        case RUNNING: 
                        case CREATED: {
                            break;
                        }
                        default: {
                            throw new IllegalStateException("Unrecognized job status: " + jobInfo.getStatus());
                        }
                    }
                }
                catch (NotFoundException e) {
                    String error = ExceptionMessage.TRANSFORM_JOB_ID_NOT_FOUND_IN_JOB_SERVICE.getMessage(new Object[]{jobId, job.getDb(), job.getTable(), e.getMessage()});
                    LOG.warn(error);
                    this.handleJobError(job, Status.FAILED, error);
                }
                catch (IOException e) {
                    LOG.error("Failed to get status for job (id={})", (Object)jobId, (Object)e);
                }
            }
        }

        public void close() {
        }
    }
}

