package org.mule.extension.sftp.internal.source;

import java.io.Closeable;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.mule.extension.file.common.api.matcher.NullFilePayloadPredicate;
import org.mule.extension.sftp.api.SftpFileAttributes;
import org.mule.extension.sftp.api.SftpFileMatcher;
import org.mule.extension.sftp.internal.SftpConnector;
import org.mule.extension.sftp.internal.connection.SftpFileSystem;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.execution.OnTerminate;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.ConfigOverride;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.PollContext;
import org.mule.runtime.extension.api.runtime.source.PollingSource;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Summary("Triggers when a new file is created in a directory")
@MediaType(value = "*/*", strict = false)
@DisplayName("On New or Updated File")
@Alias("listener")
/* loaded from: input_file:org/mule/extension/sftp/internal/source/SftpDirectoryListener.class */
public class SftpDirectoryListener extends PollingSource<InputStream, SftpFileAttributes> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SftpDirectoryListener.class);
    private static final String ATTRIBUTES_CONTEXT_VAR = "attributes";
    private static final String POST_PROCESSING_GROUP_NAME = "Post processing action";

    @Config
    private SftpConnector config;

    @Connection
    private ConnectionProvider<SftpFileSystem> fileSystemProvider;

    @Optional
    @Parameter
    private String directory;

    @Optional
    @Parameter
    @DisplayName("Matcher")
    @Alias("matcher")
    private SftpFileMatcher predicateBuilder;

    @ConfigOverride
    @Parameter
    @Summary("Wait time in milliseconds between size checks to determine if a file is ready to be read.")
    private Long timeBetweenSizeCheck;

    @ConfigOverride
    @Parameter
    @Summary("Time unit to be used in the wait time between size checks")
    private TimeUnit timeBetweenSizeCheckUnit;
    private Path directoryPath;
    private Predicate<SftpFileAttributes> matcher;

    @Optional(defaultValue = "true")
    @Parameter
    @Summary("Whether or not to also catch files created on sub directories")
    private boolean recursive = true;

    @Optional(defaultValue = "false")
    @Parameter
    private boolean watermarkEnabled = false;

    protected void doStart() {
        this.matcher = this.predicateBuilder != null ? this.predicateBuilder.build() : new NullFilePayloadPredicate();
        this.directoryPath = resolveRootPath();
    }

    @OnSuccess
    public void onSuccess(@ParameterGroup(name = "Post processing action") PostActionGroup postActionGroup, SourceCallbackContext sourceCallbackContext) {
        postAction(postActionGroup, sourceCallbackContext);
    }

    @OnError
    public void onError(@ParameterGroup(name = "Post processing action") PostActionGroup postActionGroup, SourceCallbackContext sourceCallbackContext) {
        if (postActionGroup.isApplyPostActionWhenFailed()) {
            postAction(postActionGroup, sourceCallbackContext);
        }
    }

    @OnTerminate
    public void onTerminate(SourceCallbackContext sourceCallbackContext) {
        SftpFileSystem sftpFileSystem = (SftpFileSystem) sourceCallbackContext.getConnection();
        if (sftpFileSystem != null) {
            this.fileSystemProvider.disconnect(sftpFileSystem);
        }
    }

    public void poll(PollContext<InputStream, SftpFileAttributes> pollContext) {
        if (pollContext.isSourceStopping()) {
            return;
        }
        try {
            SftpFileSystem openConnection = openConnection();
            try {
                try {
                    List<Result<InputStream, SftpFileAttributes>> list = openConnection.list(this.config, this.directoryPath.toString(), this.recursive, this.matcher, (Long) this.config.getTimeBetweenSizeCheckInMillis(this.timeBetweenSizeCheck, this.timeBetweenSizeCheckUnit).orElse(null));
                    if (list.isEmpty()) {
                        return;
                    }
                    for (Result<InputStream, SftpFileAttributes> result : list) {
                        if (pollContext.isSourceStopping()) {
                            this.fileSystemProvider.disconnect(openConnection);
                            return;
                        }
                        SftpFileAttributes sftpFileAttributes = (SftpFileAttributes) result.getAttributes().get();
                        if (!sftpFileAttributes.isDirectory()) {
                            if (this.matcher.test(sftpFileAttributes)) {
                                if (!processFile(result, pollContext)) {
                                    break;
                                }
                            } else if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("Skipping file '{}' because the matcher rejected it", sftpFileAttributes.getPath());
                            }
                        }
                    }
                    this.fileSystemProvider.disconnect(openConnection);
                } catch (Exception e) {
                    LOGGER.error(String.format("Found exception trying to poll directory '%s'. Will try again on the next poll. ", this.directoryPath.toString(), e.getMessage()), e);
                    this.fileSystemProvider.disconnect(openConnection);
                }
            } finally {
                this.fileSystemProvider.disconnect(openConnection);
            }
        } catch (Exception e2) {
            if (e2.getCause() instanceof ConnectionException) {
                pollContext.onConnectionException(e2.getCause());
            }
            LOGGER.error(String.format("Could not obtain connection while trying to poll directory '%s'. %s", this.directoryPath.toString(), e2.getMessage()));
        }
    }

    private SftpFileSystem openConnection() throws Exception {
        SftpFileSystem sftpFileSystem = (SftpFileSystem) this.fileSystemProvider.connect();
        try {
            sftpFileSystem.changeToBaseDir();
            return sftpFileSystem;
        } catch (Exception e) {
            this.fileSystemProvider.disconnect(sftpFileSystem);
            throw e;
        }
    }

    private boolean processFile(Result<InputStream, SftpFileAttributes> result, PollContext<InputStream, SftpFileAttributes> pollContext) {
        SftpFileAttributes sftpFileAttributes = (SftpFileAttributes) result.getAttributes().get();
        String path = sftpFileAttributes.getPath();
        return pollContext.accept(pollItem -> {
            SourceCallbackContext sourceCallbackContext = pollItem.getSourceCallbackContext();
            try {
                sourceCallbackContext.addVariable(ATTRIBUTES_CONTEXT_VAR, sftpFileAttributes);
                pollItem.setResult(result).setId(sftpFileAttributes.getPath());
                if (this.watermarkEnabled) {
                    pollItem.setWatermark(sftpFileAttributes.getTimestamp());
                }
            } catch (Throwable th) {
                LOGGER.error(String.format("Found file '%s' but found exception trying to dispatch it for processing. %s", path, th.getMessage()), th);
                if (0 != 0) {
                    this.fileSystemProvider.disconnect((Object) null);
                }
                if (0 != 0) {
                    onRejectedItem(null, sourceCallbackContext);
                }
                throw new MuleRuntimeException(th);
            }
        }) != PollContext.PollItemStatus.SOURCE_STOPPING;
    }

    public void onRejectedItem(Result<InputStream, SftpFileAttributes> result, SourceCallbackContext sourceCallbackContext) {
        IOUtils.closeQuietly((Closeable) result.getOutput());
    }

    private void postAction(PostActionGroup postActionGroup, SourceCallbackContext sourceCallbackContext) {
        sourceCallbackContext.getVariable(ATTRIBUTES_CONTEXT_VAR).ifPresent(sftpFileAttributes -> {
            try {
                SftpFileSystem sftpFileSystem = (SftpFileSystem) this.fileSystemProvider.connect();
                sftpFileSystem.changeToBaseDir();
                postActionGroup.apply(sftpFileSystem, sftpFileAttributes, this.config);
            } catch (ConnectionException e) {
                LOGGER.error("An error occurred while retrieving a connection to apply the post processing action to the file %s , it was neither moved nor deleted.", sftpFileAttributes.getPath());
            }
        });
    }

    protected void doStop() {
    }

    private Path resolveRootPath() {
        SftpFileSystem sftpFileSystem = null;
        try {
            try {
                sftpFileSystem = (SftpFileSystem) this.fileSystemProvider.connect();
                sftpFileSystem.changeToBaseDir();
                Path resolveRootPath = new OnNewFileCommand(sftpFileSystem).resolveRootPath(this.directory);
                if (sftpFileSystem != null) {
                    this.fileSystemProvider.disconnect(sftpFileSystem);
                }
                return resolveRootPath;
            } catch (Exception e) {
                throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage(String.format("Could not resolve path to directory '%s'. %s", this.directory, e.getMessage())), e);
            }
        } catch (Throwable th) {
            if (sftpFileSystem != null) {
                this.fileSystemProvider.disconnect(sftpFileSystem);
            }
            throw th;
        }
    }
}
