/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import javax.annotation.Nonnull;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcher;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl;
import org.apache.flink.runtime.checkpoint.channel.TestException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.BiConsumerWithException;
import org.junit.Assert;
import org.junit.Test;

public class ChannelStateWriteRequestExecutorImplTest {
    private static final String TASK_NAME = "test task";

    @Test(expected=IllegalStateException.class)
    public void testCloseAfterSubmit() throws Exception {
        this.testCloseAfterSubmit((BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception>)((BiConsumerWithException)ChannelStateWriteRequestExecutor::submit));
    }

    @Test(expected=IllegalStateException.class)
    public void testCloseAfterSubmitPriority() throws Exception {
        this.testCloseAfterSubmit((BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception>)((BiConsumerWithException)ChannelStateWriteRequestExecutor::submitPriority));
    }

    @Test
    public void testSubmitFailure() throws Exception {
        this.testSubmitFailure((BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception>)((BiConsumerWithException)ChannelStateWriteRequestExecutor::submit));
    }

    @Test
    public void testSubmitPriorityFailure() throws Exception {
        this.testSubmitFailure((BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception>)((BiConsumerWithException)ChannelStateWriteRequestExecutor::submitPriority));
    }

    private void testCloseAfterSubmit(BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception> requestFun) throws Exception {
        WorkerClosingDeque closingDeque = new WorkerClosingDeque();
        ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl(TASK_NAME, ChannelStateWriteRequestDispatcher.NO_OP, (BlockingDeque)closingDeque);
        closingDeque.setWorker((ChannelStateWriteRequestExecutor)worker);
        TestWriteRequest request = new TestWriteRequest();
        requestFun.accept((Object)worker, (Object)request);
        Assert.assertTrue((boolean)closingDeque.isEmpty());
        Assert.assertFalse((boolean)request.isCancelled());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testSubmitFailure(BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception> submitAction) throws Exception {
        TestWriteRequest request = new TestWriteRequest();
        LinkedBlockingDeque deque = new LinkedBlockingDeque();
        try {
            submitAction.accept((Object)new ChannelStateWriteRequestExecutorImpl(TASK_NAME, ChannelStateWriteRequestDispatcher.NO_OP, deque), (Object)request);
        }
        catch (IllegalStateException e) {
            return;
        }
        finally {
            Assert.assertTrue((boolean)request.cancelled);
            Assert.assertTrue((boolean)deque.isEmpty());
        }
        throw new RuntimeException("expected exception not thrown");
    }

    @Test
    public void testCleanup() throws IOException {
        TestWriteRequest request = new TestWriteRequest();
        LinkedBlockingDeque<TestWriteRequest> deque = new LinkedBlockingDeque<TestWriteRequest>();
        deque.add(request);
        TestRequestDispatcher requestProcessor = new TestRequestDispatcher();
        ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl(TASK_NAME, (ChannelStateWriteRequestDispatcher)requestProcessor, deque);
        worker.close();
        worker.run();
        Assert.assertTrue((boolean)requestProcessor.isStopped());
        Assert.assertTrue((boolean)deque.isEmpty());
        Assert.assertTrue((boolean)request.isCancelled());
    }

    @Test
    public void testIgnoresInterruptsWhileRunning() throws Exception {
        TestRequestDispatcher requestProcessor = new TestRequestDispatcher();
        LinkedBlockingDeque deque = new LinkedBlockingDeque();
        try (ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl(TASK_NAME, (ChannelStateWriteRequestDispatcher)requestProcessor, deque);){
            worker.start();
            worker.getThread().interrupt();
            worker.submit((ChannelStateWriteRequest)new TestWriteRequest());
            worker.getThread().interrupt();
            while (!deque.isEmpty()) {
                Thread.sleep(100L);
            }
        }
    }

    @Test
    public void testCanBeClosed() throws IOException {
        TestRequestDispatcher requestProcessor = new TestRequestDispatcher();
        try (ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl(TASK_NAME, (ChannelStateWriteRequestDispatcher)requestProcessor);){
            worker.start();
        }
    }

    @Test
    public void testRecordsException() throws IOException {
        final TestException testException = new TestException();
        TestRequestDispatcher throwingRequestProcessor = new TestRequestDispatcher(){

            @Override
            public void dispatch(ChannelStateWriteRequest request) {
                throw testException;
            }
        };
        LinkedBlockingDeque<TestWriteRequest> deque = new LinkedBlockingDeque<TestWriteRequest>(Arrays.asList(new TestWriteRequest()));
        ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl(TASK_NAME, (ChannelStateWriteRequestDispatcher)throwingRequestProcessor, deque);
        worker.run();
        try {
            worker.close();
        }
        catch (IOException e) {
            if (ExceptionUtils.findThrowable((Throwable)e, TestException.class).filter(found -> found == testException).isPresent()) {
                return;
            }
            throw e;
        }
        Assert.fail((String)"exception not thrown");
    }

    private static class TestRequestDispatcher
    implements ChannelStateWriteRequestDispatcher {
        private boolean isStopped;

        private TestRequestDispatcher() {
        }

        public void dispatch(ChannelStateWriteRequest request) {
        }

        public void fail(Throwable cause) {
            this.isStopped = true;
        }

        public boolean isStopped() {
            return this.isStopped;
        }
    }

    private static class WorkerClosingDeque
    extends LinkedBlockingDeque<ChannelStateWriteRequest> {
        private ChannelStateWriteRequestExecutor worker;

        private WorkerClosingDeque() {
        }

        @Override
        public void put(@Nonnull ChannelStateWriteRequest request) throws InterruptedException {
            super.putFirst(request);
            try {
                this.worker.close();
            }
            catch (IOException e) {
                ExceptionUtils.rethrow((Throwable)e);
            }
        }

        @Override
        public void putFirst(@Nonnull ChannelStateWriteRequest request) throws InterruptedException {
            super.putFirst(request);
            try {
                this.worker.close();
            }
            catch (IOException e) {
                ExceptionUtils.rethrow((Throwable)e);
            }
        }

        public void setWorker(ChannelStateWriteRequestExecutor worker) {
            this.worker = worker;
        }
    }

    private static class TestWriteRequest
    implements ChannelStateWriteRequest {
        private boolean cancelled = false;

        private TestWriteRequest() {
        }

        public long getCheckpointId() {
            return 0L;
        }

        public void cancel(Throwable cause) {
            this.cancelled = true;
        }

        public boolean isCancelled() {
            return this.cancelled;
        }
    }
}

