/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.postgres.source.fetch;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresErrorHandler;
import io.debezium.connector.postgresql.PostgresEventDispatcher;
import io.debezium.connector.postgresql.PostgresObjectUtils;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.PostgresTopicSelector;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.TopicSelector;
import java.sql.SQLException;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.handler.PostgresSchemaChangeEventHandler;
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetUtils;
import org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSourceFetchTaskContext
extends JdbcSourceFetchTaskContext {
    private static final String CONNECTION_NAME = "postgres-fetch-task-connection";
    private static final Logger LOG = LoggerFactory.getLogger(PostgresSourceFetchTaskContext.class);
    private PostgresTaskContext taskContext;
    private ChangeEventQueue<DataChangeEvent> queue;
    private PostgresConnection jdbcConnection;
    private ReplicationConnection replicationConnection;
    private PostgresOffsetContext offsetContext;
    private PostgresPartition partition;
    private PostgresSchema schema;
    private ErrorHandler errorHandler;
    private JdbcSourceEventDispatcher<PostgresPartition> dispatcher;
    private PostgresEventDispatcher<TableId> postgresDispatcher;
    private EventMetadataProvider metadataProvider;
    private SnapshotChangeEventSourceMetrics<PostgresPartition> snapshotChangeEventSourceMetrics;
    private Snapshotter snapShotter;

    public PostgresSourceFetchTaskContext(JdbcSourceConfig sourceConfig, PostgresDialect dataSourceDialect) {
        super(sourceConfig, dataSourceDialect);
    }

    @Override
    public PostgresConnectorConfig getDbzConnectorConfig() {
        return (PostgresConnectorConfig)super.getDbzConnectorConfig();
    }

    private PostgresOffsetContext loadStartingOffsetState(PostgresOffsetContext.Loader loader, SourceSplitBase sourceSplitBase) {
        Offset offset = sourceSplitBase.isSnapshotSplit() ? new PostgresOffsetFactory().createInitialOffset() : sourceSplitBase.asStreamSplit().getStartingOffset();
        return PostgresOffsetUtils.getPostgresOffsetContext(loader, offset);
    }

    @Override
    public void configure(SourceSplitBase sourceSplitBase) {
        LOG.debug("Configuring PostgresSourceFetchTaskContext for split: {}", (Object)sourceSplitBase);
        PostgresConnectorConfig dbzConfig = this.getDbzConnectorConfig();
        dbzConfig = sourceSplitBase instanceof SnapshotSplit ? new PostgresConnectorConfig(((Configuration.Builder)((Configuration.Builder)dbzConfig.getConfig().edit().with("table.include.list", ((SnapshotSplit)sourceSplitBase).getTableId().toString()).with(PostgresConnectorConfig.SLOT_NAME.name(), ((PostgresSourceConfig)this.sourceConfig).getSlotNameForBackfillTask()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP.name(), true)).with(Heartbeat.HEARTBEAT_INTERVAL, 0)).build()) : new PostgresConnectorConfig(((Configuration.Builder)dbzConfig.getConfig().edit().with(PostgresConnectorConfig.DROP_SLOT_ON_STOP.name(), false)).build());
        LOG.info("PostgresConnectorConfig is ", (Object)dbzConfig.getConfig().asProperties().toString());
        this.setDbzConnectorConfig(dbzConfig);
        PostgresConnectorConfig.SnapshotMode snapshotMode = PostgresConnectorConfig.SnapshotMode.parse(dbzConfig.getConfig().getString(PostgresConnectorConfig.SNAPSHOT_MODE));
        this.snapShotter = snapshotMode.getSnapshotter(dbzConfig.getConfig());
        PostgresConnection.PostgresValueConverterBuilder valueConverterBuilder = PostgresObjectUtils.newPostgresValueConverterBuilder(dbzConfig);
        this.jdbcConnection = new PostgresConnection(dbzConfig.getJdbcConfig(), valueConverterBuilder, CONNECTION_NAME);
        TopicSelector<TableId> topicSelector = PostgresTopicSelector.create(dbzConfig);
        EmbeddedFlinkDatabaseHistory.registerHistory(this.sourceConfig.getDbzConfiguration().getString("database.history.instance.name"), sourceSplitBase.getTableSchemas().values());
        try {
            this.schema = PostgresObjectUtils.newSchema(this.jdbcConnection, dbzConfig, this.jdbcConnection.getTypeRegistry(), topicSelector, valueConverterBuilder.build(this.jdbcConnection.getTypeRegistry()));
        }
        catch (SQLException e) {
            throw new RuntimeException("Failed to initialize PostgresSchema", e);
        }
        this.offsetContext = this.loadStartingOffsetState(new PostgresOffsetContext.Loader(dbzConfig), sourceSplitBase);
        this.partition = new PostgresPartition(dbzConfig.getLogicalName());
        this.taskContext = PostgresObjectUtils.newTaskContext(dbzConfig, this.schema, topicSelector);
        if (this.replicationConnection == null) {
            this.replicationConnection = PostgresObjectUtils.createReplicationConnection(this.taskContext, this.jdbcConnection, this.snapShotter.shouldSnapshot(), dbzConfig);
        }
        this.queue = new ChangeEventQueue.Builder().pollInterval(dbzConfig.getPollInterval()).maxBatchSize(dbzConfig.getMaxBatchSize()).maxQueueSize(dbzConfig.getMaxQueueSize()).maxQueueSizeInBytes(dbzConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> this.taskContext.configureLoggingContext("postgres-cdc-connector-task")).build();
        this.errorHandler = new PostgresErrorHandler(this.getDbzConnectorConfig(), this.queue);
        this.metadataProvider = PostgresObjectUtils.newEventMetadataProvider();
        this.dispatcher = new JdbcSourceEventDispatcher((CommonConnectorConfig)dbzConfig, topicSelector, this.schema, this.queue, dbzConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, this.metadataProvider, this.schemaNameAdjuster, new PostgresSchemaChangeEventHandler());
        this.postgresDispatcher = new PostgresEventDispatcher<TableId>(dbzConfig, topicSelector, this.schema, this.queue, dbzConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, this.metadataProvider, this.schemaNameAdjuster);
        DefaultChangeEventSourceMetricsFactory metricsFactory = new DefaultChangeEventSourceMetricsFactory();
        this.snapshotChangeEventSourceMetrics = metricsFactory.getSnapshotMetrics(this.taskContext, this.queue, this.metadataProvider);
    }

    @Override
    public PostgresSchema getDatabaseSchema() {
        return this.schema;
    }

    @Override
    public RowType getSplitType(Table table) {
        Column splitColumn = ChunkUtils.getSplitColumn(table, this.sourceConfig.getChunkKeyColumn());
        return ChunkUtils.getSplitType(splitColumn);
    }

    @Override
    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    @Override
    public JdbcSourceEventDispatcher<PostgresPartition> getDispatcher() {
        return this.dispatcher;
    }

    public PostgresEventDispatcher<TableId> getPostgresDispatcher() {
        return this.postgresDispatcher;
    }

    @Override
    public PostgresOffsetContext getOffsetContext() {
        return this.offsetContext;
    }

    @Override
    public PostgresPartition getPartition() {
        return this.partition;
    }

    @Override
    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.queue;
    }

    @Override
    public Tables.TableFilter getTableFilter() {
        return this.getDbzConnectorConfig().getTableFilters().dataCollectionFilter();
    }

    @Override
    public TableId getTableId(SourceRecord record) {
        Struct value = (Struct)record.value();
        Struct source = value.getStruct("source");
        String schemaName = source.getString("schema");
        String tableName = source.getString("table");
        return new TableId(null, schemaName, tableName);
    }

    @Override
    public Offset getStreamOffset(SourceRecord sourceRecord) {
        return PostgresOffset.of(sourceRecord);
    }

    @Override
    public void close() throws Exception {
        if (this.jdbcConnection != null) {
            this.jdbcConnection.close();
        }
        if (this.replicationConnection != null) {
            this.replicationConnection.close();
        }
    }

    public PostgresConnection getConnection() {
        return this.jdbcConnection;
    }

    public PostgresTaskContext getTaskContext() {
        return this.taskContext;
    }

    public ReplicationConnection getReplicationConnection() {
        return this.replicationConnection;
    }

    public SnapshotChangeEventSourceMetrics<PostgresPartition> getSnapshotChangeEventSourceMetrics() {
        return this.snapshotChangeEventSourceMetrics;
    }

    public Snapshotter getSnapShotter() {
        return this.snapShotter;
    }

    public String getSlotName() {
        return this.sourceConfig.getDbzProperties().getProperty(PostgresConnectorConfig.SLOT_NAME.name());
    }

    public String getPluginName() {
        return PostgresConnectorConfig.LogicalDecoder.parse(this.sourceConfig.getDbzProperties().getProperty(PostgresConnectorConfig.PLUGIN_NAME.name())).getPostgresPluginName();
    }
}

