package org.mule.extension.db.internal.source;

import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import org.mule.extension.db.api.param.ParameterizedStatementDefinition;
import org.mule.extension.db.api.param.QueryDefinition;
import org.mule.extension.db.api.param.QuerySettings;
import org.mule.extension.db.internal.DbConnector;
import org.mule.extension.db.internal.domain.connection.DbConnection;
import org.mule.extension.db.internal.domain.executor.SelectExecutor;
import org.mule.extension.db.internal.domain.query.Query;
import org.mule.extension.db.internal.domain.statement.QueryStatementFactory;
import org.mule.extension.db.internal.resolver.query.ParameterizedQueryResolver;
import org.mule.extension.db.internal.resolver.query.QueryResolver;
import org.mule.extension.db.internal.result.resultset.ListResultSetHandler;
import org.mule.extension.db.internal.result.row.NonStreamingInsensitiveMapRowHandler;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.metadata.MetadataKeyId;
import org.mule.runtime.extension.api.annotation.metadata.MetadataScope;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.DefaultEncoding;
import org.mule.runtime.extension.api.annotation.param.NullSafe;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.annotation.values.OfValues;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.PollContext;
import org.mule.runtime.extension.api.runtime.source.PollingSource;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Summary("Triggers a message per each row in a table")
@MetadataScope(outputResolver = RowListenerMetadataResolver.class, keysResolver = TableKeyResolver.class)
@DisplayName("On Table Row")
@Alias("listener")
/* loaded from: input_file:org/mule/extension/db/internal/source/RowListener.class */
public class RowListener extends PollingSource<Map<String, Object>, Void> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RowListener.class);
    public static final String WATERMARK_PARAM_NAME = "watermark";

    @DefaultEncoding
    String encoding;

    @Parameter
    @MetadataKeyId
    private String table;

    @Optional
    @OfValues(ColumnValueProvider.class)
    @Parameter
    @Summary("The name of the column used for watermark")
    private String watermarkColumn;

    @Optional
    @OfValues(ColumnValueProvider.class)
    @Parameter
    @Summary("The name of the column to consider as row ID")
    private String idColumn;

    @NullSafe
    @ParameterGroup(name = "Advanced")
    private QuerySettings settings;

    @Config
    private DbConnector config;

    @Connection
    private ConnectionProvider<DbConnection> connectionProvider;
    private final QueryResolver<ParameterizedStatementDefinition> queryResolver = new ParameterizedQueryResolver();
    private ItemHandler idHandler;
    private ItemHandler watermarkHandler;
    private Charset charset;

    @FunctionalInterface
    /* loaded from: input_file:org/mule/extension/db/internal/source/RowListener$ItemHandler.class */
    private interface ItemHandler extends BiConsumer<PollContext.PollItem<Map<String, Object>, Void>, Map<String, Object>> {
    }

    /* loaded from: input_file:org/mule/extension/db/internal/source/RowListener$NullItemHandler.class */
    private final class NullItemHandler implements ItemHandler {
        private NullItemHandler() {
        }

        @Override // java.util.function.BiConsumer
        public void accept(PollContext.PollItem<Map<String, Object>, Void> pollItem, Map<String, Object> map) {
        }
    }

    protected void doStart() throws MuleException {
        if (this.idColumn != null) {
            this.idHandler = (pollItem, map) -> {
                Object obj = map.get(this.idColumn);
                if (obj != null) {
                    pollItem.setId(obj.toString());
                } else if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("A null ID value was obtained for row %s. Idempotency will not be enforced for this row", map));
                }
            };
        } else {
            this.idHandler = new NullItemHandler();
        }
        if (this.watermarkColumn != null) {
            this.watermarkHandler = (pollItem2, map2) -> {
                Object obj = map2.get(this.watermarkColumn);
                if (obj == null) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug(String.format("A null watermark value was obtained for row %s. Watermark value won't be updated for this row", map2));
                    }
                } else {
                    if (!(obj instanceof Serializable)) {
                        LOGGER.error(String.format("Watermark values need to be serializable, but a value of type %s was found instead for row %s", obj.getClass().getName(), map2));
                    }
                    pollItem2.setWatermark((Serializable) obj);
                }
            };
        } else {
            this.watermarkHandler = new NullItemHandler();
        }
    }

    protected void doStop() {
    }

    public void poll(PollContext<Map<String, Object>, Void> pollContext) {
        if (pollContext.isSourceStopping()) {
            return;
        }
        try {
            DbConnection dbConnection = (DbConnection) this.connectionProvider.connect();
            try {
                try {
                    QueryDefinition queryDefinition = new QueryDefinition();
                    StringBuilder append = new StringBuilder("SELECT * FROM ").append(this.table);
                    pollContext.getWatermark().ifPresent(serializable -> {
                        append.append(" WHERE ").append(this.watermarkColumn).append(" > :").append(WATERMARK_PARAM_NAME);
                        queryDefinition.addInputParameter(WATERMARK_PARAM_NAME, serializable);
                    });
                    queryDefinition.setSql(append.toString());
                    Query resolve = this.queryResolver.resolve(queryDefinition, this.config, dbConnection, null);
                    QueryStatementFactory queryStatementFactory = new QueryStatementFactory();
                    queryStatementFactory.setFetchSize(this.settings.getFetchSize() != null ? this.settings.getFetchSize().intValue() : 10);
                    queryStatementFactory.setQueryTimeout(new Long(this.settings.getQueryTimeoutUnit().toSeconds(this.settings.getQueryTimeout())).intValue());
                    ((List) new SelectExecutor(queryStatementFactory, new ListResultSetHandler(new NonStreamingInsensitiveMapRowHandler(dbConnection, Charset.forName(this.encoding)))).execute(dbConnection, resolve)).forEach(map -> {
                        pollContext.accept(pollItem -> {
                            this.idHandler.accept(pollItem, map);
                            this.watermarkHandler.accept(pollItem, map);
                            pollItem.setResult(Result.builder().output(map).build());
                        });
                    });
                    this.connectionProvider.disconnect(dbConnection);
                } catch (Throwable th) {
                    this.connectionProvider.disconnect(dbConnection);
                    throw th;
                }
            } catch (Exception e) {
                LOGGER.error(String.format("Failed to query table '%s' for new rows. %s", this.table, e.getMessage()), e);
                this.connectionProvider.disconnect(dbConnection);
            }
        } catch (Exception e2) {
            if (e2 instanceof ConnectionException) {
                pollContext.onConnectionException(e2);
            }
            LOGGER.error(String.format("Could not obtain connection while trying to poll table '%s'. %s", this.table, e2.getMessage()), e2);
        }
    }

    public void onRejectedItem(Result<Map<String, Object>, Void> result, SourceCallbackContext sourceCallbackContext) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Row has been rejected for processing: {}", result.getOutput());
        }
    }
}
