package org.mule.functional.functional;

import com.eaio.uuid.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.mule.runtime.api.el.ValidationResult;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.construct.FlowConstructAware;
import org.mule.runtime.core.api.expression.InvalidExpressionException;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.InterceptingMessageProcessor;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.exception.MessagingException;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/mule/functional/functional/ResponseAssertionMessageProcessor.class */
public class ResponseAssertionMessageProcessor extends AssertionMessageProcessor implements InterceptingMessageProcessor, FlowConstructAware, Startable {
    private static final ThreadLocal<String> taskTokenInThread = new ThreadLocal<>();
    private Processor next;
    private String requestTaskToken;
    private String responseTaskToken;
    private CountDownLatch responseLatch;
    protected String responseExpression = "#[true]";
    private int responseCount = 1;
    private boolean responseSameTask = true;
    private int responseInvocationCount = 0;
    private boolean responseResult = true;

    @Override // org.mule.functional.functional.AssertionMessageProcessor
    public void start() throws InitialisationException {
        super.start();
        ValidationResult validate = this.expressionManager.validate(this.responseExpression);
        if (!validate.isSuccess()) {
            throw new InvalidExpressionException(this.expression, (String) validate.errorMessage().orElse("Invalid expression"));
        }
        this.responseLatch = new CountDownLatch(this.responseCount);
        FlowAssert.addAssertion(this.flowConstruct.getName(), this);
    }

    @Override // org.mule.functional.functional.AssertionMessageProcessor
    public Event process(Event event) throws MuleException {
        if (event == null) {
            return null;
        }
        return processResponse(processNext(processRequest(event)));
    }

    public Publisher<Event> apply(Publisher<Event> publisher) {
        return Flux.from(Flux.from(publisher).map(event -> {
            try {
                return processRequest(event);
            } catch (MuleException e) {
                throw Exceptions.propagate(new MessagingException(event, e));
            }
        }).transform(this.next)).map(event2 -> {
            try {
                return processResponse(event2);
            } catch (MuleException e) {
                throw Exceptions.propagate(new MessagingException(event2, e));
            }
        });
    }

    private Event processRequest(Event event) throws MuleException {
        if (taskTokenInThread.get() != null) {
            this.requestTaskToken = taskTokenInThread.get();
        } else {
            this.requestTaskToken = generateTaskToken();
            taskTokenInThread.set(this.requestTaskToken);
        }
        return super.process(event);
    }

    private Event processResponse(Event event) throws MuleException {
        if (event == null) {
            return event;
        }
        if (taskTokenInThread.get() != null) {
            this.responseTaskToken = taskTokenInThread.get();
        } else {
            this.responseTaskToken = generateTaskToken();
        }
        this.responseResult = this.responseResult && this.expressionManager.evaluateBoolean(this.responseExpression, event, this.flowConstruct, false, true);
        increaseResponseCount();
        this.responseLatch.countDown();
        return event;
    }

    protected String generateTaskToken() {
        return Thread.currentThread().getName() + " - " + new UUID().toString();
    }

    private Event processNext(Event event) throws MuleException {
        return event != null ? this.next.process(event) : event;
    }

    @Override // org.mule.functional.functional.AssertionMessageProcessor
    public void verify() throws InterruptedException {
        super.verify();
        if (responseCountFailOrNullEvent().booleanValue()) {
            Assert.fail(failureMessagePrefix() + "No response message received or if responseCount attribute was set then it was no matched.");
            return;
        }
        if (responseExpressionFailed().booleanValue()) {
            Assert.fail(failureMessagePrefix() + "Response expression " + this.expression + " evaluated false.");
            return;
        }
        if (this.responseCount > 0 && this.responseSameTask) {
            Assert.assertThat(failureMessagePrefix() + "Response thread was not same as request thread", this.responseTaskToken, CoreMatchers.is(this.requestTaskToken));
        } else {
            if (this.responseCount <= 0 || this.responseSameTask) {
                return;
            }
            Assert.assertThat(failureMessagePrefix() + "Response thread was same as request thread", this.responseTaskToken, CoreMatchers.not(CoreMatchers.is(this.requestTaskToken)));
        }
    }

    public Boolean responseCountFailOrNullEvent() throws InterruptedException {
        return Boolean.valueOf(!isResponseProcessesCountCorrect());
    }

    public Boolean responseExpressionFailed() {
        return Boolean.valueOf(!this.responseResult);
    }

    public void setListener(Processor processor) {
        this.next = processor;
    }

    private void increaseResponseCount() {
        this.responseInvocationCount++;
    }

    public void setResponseExpression(String str) {
        this.responseExpression = str;
    }

    public void setResponseCount(int i) {
        this.responseCount = i;
    }

    public void setResponseSameTask(boolean z) {
        this.responseSameTask = z;
    }

    private synchronized boolean isResponseProcessesCountCorrect() throws InterruptedException {
        return this.needToMatchCount ? this.responseCount == this.responseInvocationCount : this.responseLatch.await(this.timeout, TimeUnit.MILLISECONDS);
    }

    @Override // org.mule.functional.functional.AssertionMessageProcessor
    public void setFlowConstruct(FlowConstruct flowConstruct) {
        super.setFlowConstruct(flowConstruct);
        LifecycleUtils.setFlowConstructIfNeeded(this.next, flowConstruct);
    }
}
