/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.misc;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.apache.flink.util.ExceptionUtils;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.util.ReadOnlyStringMap;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

class JobIDLoggingITCase {
    private static final Logger logger = LoggerFactory.getLogger(JobIDLoggingITCase.class);
    @RegisterExtension
    public final LoggerAuditingExtension checkpointCoordinatorLogging = new LoggerAuditingExtension(CheckpointCoordinator.class, Level.DEBUG);
    @RegisterExtension
    public final LoggerAuditingExtension streamTaskLogging = new LoggerAuditingExtension(StreamTask.class, Level.DEBUG);
    @RegisterExtension
    public final LoggerAuditingExtension taskExecutorLogging = new LoggerAuditingExtension(TaskExecutor.class, Level.DEBUG);
    @RegisterExtension
    public final LoggerAuditingExtension taskLogging = new LoggerAuditingExtension(Task.class, Level.DEBUG);
    @RegisterExtension
    public final LoggerAuditingExtension executionGraphLogging = new LoggerAuditingExtension(ExecutionGraph.class, Level.DEBUG);
    @RegisterExtension
    public final LoggerAuditingExtension jobMasterLogging = new LoggerAuditingExtension(JobMaster.class, Level.DEBUG);
    @RegisterExtension
    public final LoggerAuditingExtension adaptiveSchedulerLogging = new LoggerAuditingExtension(AdaptiveScheduler.class, Level.DEBUG);
    @RegisterExtension
    public final LoggerAuditingExtension asyncCheckpointRunnableLogging = new LoggerAuditingExtension("org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable", Level.DEBUG);
    @RegisterExtension
    public static MiniClusterExtension miniClusterResource = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setConfiguration(JobIDLoggingITCase.getConfiguration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());

    JobIDLoggingITCase() {
    }

    private static Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER, (Object)JobManagerOptions.SchedulerType.Adaptive);
        return configuration;
    }

    @Test
    void testJobIDLogging(@InjectClusterClient ClusterClient<?> clusterClient) throws Exception {
        JobID jobID = JobIDLoggingITCase.runJob(clusterClient);
        clusterClient.cancel(jobID).get();
        JobIDLoggingITCase.assertJobIDPresent(jobID, this.checkpointCoordinatorLogging, Arrays.asList("No checkpoint found during restore.", "Resetting the master hooks.", "Triggering checkpoint .*", "Received acknowledge message for checkpoint .*", "Completed checkpoint .*", "Checkpoint state: .*"), new String[0]);
        JobIDLoggingITCase.assertJobIDPresent(jobID, this.streamTaskLogging, Arrays.asList("State backend is set to .*", "Initializing Source: .*", "Invoking Source: .*", "Starting checkpoint .*", "Notify checkpoint \\d+ complete .*"), new String[0]);
        JobIDLoggingITCase.assertJobIDPresent(jobID, this.taskExecutorLogging, Arrays.asList("Received task .*", "Trigger checkpoint .*", "Confirm completed checkpoint .*"), "TaskManager received a checkpoint confirmation for unknown task.*", "TaskManager received an aborted checkpoint for unknown task.*", "Un-registering task.*", "Successful registration.*", "Establish JobManager connection.*", "Offer reserved slots.*", ".*ResourceManager.*", "Operator event.*", "Recovered slot allocation snapshots.*", ".*heartbeat.*", ".*leadership.*");
        JobIDLoggingITCase.assertJobIDPresent(jobID, this.taskLogging, Arrays.asList("Source: .* switched from CREATED to DEPLOYING.", "Source: .* switched from DEPLOYING to INITIALIZING.", "Source: .* switched from INITIALIZING to RUNNING."), new String[0]);
        JobIDLoggingITCase.assertJobIDPresent(jobID, this.executionGraphLogging, Arrays.asList("Created execution graph .*", "Deploying Source.*", "Job .* switched from state CREATED to RUNNING.", "Source: .* switched from CREATED to SCHEDULED.", "Source: .* switched from SCHEDULED to DEPLOYING.", "Source: .* switched from DEPLOYING to INITIALIZING.", "Source: .* switched from INITIALIZING to RUNNING."), new String[0]);
        JobIDLoggingITCase.assertJobIDPresent(jobID, this.adaptiveSchedulerLogging, Arrays.asList("Checkpoint storage is set to .*", "Running initialization on master for job .*", "Successfully created execution graph from job graph .*", "Successfully ran initialization on master.*"), "Registration at ResourceManager.*", "Registration with ResourceManager.*", "Resolved ResourceManager address.*");
        JobIDLoggingITCase.assertJobIDPresent(jobID, this.jobMasterLogging, Arrays.asList("Initializing job .*", "Starting execution of job .*", "Using restart back off time strategy .*"), "Registration at ResourceManager.*", "Registration with ResourceManager.*", "Resolved ResourceManager address.*");
        JobIDLoggingITCase.assertJobIDPresent(jobID, this.asyncCheckpointRunnableLogging, Arrays.asList(".* started executing asynchronous part of checkpoint .*", ".* finished asynchronous part of checkpoint .*"), new String[0]);
    }

    private static void assertJobIDPresent(JobID jobID, LoggerAuditingExtension ext, List<String> expPatterns, String ... ignPatterns) {
        ArrayList<LogEvent> eventsWithMissingJobId = new ArrayList<LogEvent>();
        ArrayList<LogEvent> eventsWithWrongJobId = new ArrayList<LogEvent>();
        ArrayList<LogEvent> ignoredEvents = new ArrayList<LogEvent>();
        List expectedPatterns = expPatterns.stream().map(Pattern::compile).collect(Collectors.toList());
        List<Pattern> ignorePatterns = Arrays.stream(ignPatterns).map(Pattern::compile).collect(Collectors.toList());
        for (LogEvent e : ext.getEvents()) {
            ReadOnlyStringMap context = e.getContextData();
            if (context.containsKey("flink-job-id")) {
                if (Objects.equals(context.getValue("flink-job-id"), jobID.toHexString())) {
                    expectedPatterns.removeIf(pattern -> pattern.matcher(e.getMessage().getFormattedMessage()).matches());
                    continue;
                }
                eventsWithWrongJobId.add(e);
                continue;
            }
            if (JobIDLoggingITCase.matchesAny(ignorePatterns, e.getMessage().getFormattedMessage())) {
                ignoredEvents.add(e);
                continue;
            }
            eventsWithMissingJobId.add(e);
        }
        logger.debug("checked events for {}:\n  {};\n  ignored: {},\n  wrong job id: {},\n  missing job id: {}", new Object[]{ext.getLoggerName(), ext.getEvents(), ignoredEvents, eventsWithWrongJobId, eventsWithMissingJobId});
        ((ListAssert)Assertions.assertThat(eventsWithWrongJobId).as("events with a wrong Job ID", new Object[0])).isEmpty();
        ((ListAssert)Assertions.assertThat(expectedPatterns).as("not all expected events logged by %s, logged:\n%s", new Object[]{ext.getLoggerName(), ext.getEvents()})).isEmpty();
        ((ListAssert)Assertions.assertThat(eventsWithMissingJobId).as("too many events without Job ID logged by %s", new Object[]{ext.getLoggerName()})).isEmpty();
    }

    private static boolean matchesAny(List<Pattern> patternStream, String message) {
        return patternStream.stream().anyMatch(p -> p.matcher(message).matches());
    }

    private static JobID runJob(ClusterClient<?> clusterClient) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).addSink((SinkFunction)new DiscardingSink());
        JobID jobId = (JobID)clusterClient.submitJob(env.getStreamGraph().getJobGraph()).get();
        Deadline deadline = Deadline.fromNow((Duration)Duration.ofMinutes(5L));
        while (deadline.hasTimeLeft() && ((Collection)clusterClient.listJobs().get()).stream().noneMatch(m -> m.getJobId().equals((Object)jobId) && m.getJobState().equals((Object)JobStatus.RUNNING))) {
            Thread.sleep(10L);
        }
        while (true) {
            try {
                clusterClient.triggerCheckpoint(jobId, CheckpointType.DEFAULT).get();
                clusterClient.triggerCheckpoint(jobId, CheckpointType.DEFAULT).get();
                return jobId;
            }
            catch (ExecutionException e) {
                if (ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class).isPresent() && !deadline.isOverdue()) {
                    Thread.sleep(10L);
                    continue;
                }
                throw e;
            }
            break;
        }
    }
}

