package io.debezium.pipeline;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.data.Envelope;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.TopicSelector;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/pipeline/EventDispatcher.class */
public class EventDispatcher<T extends DataCollectionId> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
    private final TopicSelector<T> topicSelector;
    private final DatabaseSchema<T> schema;
    private final HistorizedDatabaseSchema<T> historizedSchema;
    private final ChangeEventQueue<DataChangeEvent> queue;
    private final DataCollectionFilters.DataCollectionFilter<T> filter;
    private final ChangeEventCreator changeEventCreator;
    private final Heartbeat heartbeat;
    private DataChangeEventListener eventListener;
    private final boolean emitTombstonesOnDelete;
    private final InconsistentSchemaHandler<T> inconsistentSchemaHandler;
    private final CommonConnectorConfig connectorConfig;
    private final EventDispatcher<T>.StreamingChangeRecordReceiver streamingReceiver;

    /* loaded from: input_file:io/debezium/pipeline/EventDispatcher$BufferingSnapshotChangeRecordReceiver.class */
    private final class BufferingSnapshotChangeRecordReceiver implements SnapshotReceiver {
        private Supplier<DataChangeEvent> bufferedEvent;

        private BufferingSnapshotChangeRecordReceiver() {
        }

        @Override // io.debezium.pipeline.spi.ChangeRecordEmitter.Receiver
        public void changeRecord(DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object obj, Struct struct, OffsetContext offsetContext) throws InterruptedException {
            Objects.requireNonNull(struct, "value must not be null");
            EventDispatcher.LOGGER.trace("Received change record for {} operation on key {}", operation, obj);
            if (this.bufferedEvent != null) {
                EventDispatcher.this.queue.enqueue(this.bufferedEvent.get());
            }
            Schema keySchema = dataCollectionSchema.keySchema();
            String str = EventDispatcher.this.topicSelector.topicNameFor(dataCollectionSchema.id());
            this.bufferedEvent = () -> {
                return EventDispatcher.this.changeEventCreator.createDataChangeEvent(new SourceRecord(offsetContext.getPartition(), offsetContext.getOffset(), str, (Integer) null, keySchema, obj, dataCollectionSchema.getEnvelopeSchema().schema(), struct));
            };
        }

        @Override // io.debezium.pipeline.EventDispatcher.SnapshotReceiver
        public void completeSnapshot() throws InterruptedException {
            if (this.bufferedEvent != null) {
                DataChangeEvent dataChangeEvent = this.bufferedEvent.get();
                Struct struct = (Struct) dataChangeEvent.getRecord().value();
                if (struct.schema().field("source") != null) {
                    Struct struct2 = struct.getStruct("source");
                    if (SnapshotRecord.fromSource(struct2) == SnapshotRecord.TRUE) {
                        SnapshotRecord.LAST.toSource(struct2);
                    }
                }
                EventDispatcher.this.queue.enqueue(dataChangeEvent);
                this.bufferedEvent = null;
            }
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:io/debezium/pipeline/EventDispatcher$InconsistentSchemaHandler.class */
    public interface InconsistentSchemaHandler<T extends DataCollectionId> {
        Optional<DataCollectionSchema> handle(T t, ChangeRecordEmitter changeRecordEmitter);
    }

    /* loaded from: input_file:io/debezium/pipeline/EventDispatcher$SchemaChangeEventReceiver.class */
    private final class SchemaChangeEventReceiver implements SchemaChangeEventEmitter.Receiver {
        private SchemaChangeEventReceiver() {
        }

        @Override // io.debezium.pipeline.spi.SchemaChangeEventEmitter.Receiver
        public void schemaChangeEvent(SchemaChangeEvent schemaChangeEvent) throws InterruptedException {
            EventDispatcher.this.historizedSchema.applySchemaChange(schemaChangeEvent);
        }
    }

    /* loaded from: input_file:io/debezium/pipeline/EventDispatcher$SnapshotReceiver.class */
    public interface SnapshotReceiver extends ChangeRecordEmitter.Receiver {
        void completeSnapshot() throws InterruptedException;
    }

    /* loaded from: input_file:io/debezium/pipeline/EventDispatcher$StreamingChangeRecordReceiver.class */
    private final class StreamingChangeRecordReceiver implements ChangeRecordEmitter.Receiver {
        private StreamingChangeRecordReceiver() {
        }

        @Override // io.debezium.pipeline.spi.ChangeRecordEmitter.Receiver
        public void changeRecord(DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object obj, Struct struct, OffsetContext offsetContext) throws InterruptedException {
            Objects.requireNonNull(struct, "value must not be null");
            EventDispatcher.LOGGER.trace("Received change record for {} operation on key {}", operation, obj);
            Schema keySchema = dataCollectionSchema.keySchema();
            SourceRecord sourceRecord = new SourceRecord(offsetContext.getPartition(), offsetContext.getOffset(), EventDispatcher.this.topicSelector.topicNameFor(dataCollectionSchema.id()), (Integer) null, keySchema, obj, dataCollectionSchema.getEnvelopeSchema().schema(), struct);
            EventDispatcher.this.queue.enqueue(EventDispatcher.this.changeEventCreator.createDataChangeEvent(sourceRecord));
            if (EventDispatcher.this.emitTombstonesOnDelete && operation == Envelope.Operation.DELETE) {
                EventDispatcher.this.queue.enqueue(EventDispatcher.this.changeEventCreator.createDataChangeEvent(sourceRecord.newRecord(sourceRecord.topic(), sourceRecord.kafkaPartition(), sourceRecord.keySchema(), sourceRecord.key(), (Schema) null, (Object) null, sourceRecord.timestamp())));
            }
        }
    }

    public EventDispatcher(CommonConnectorConfig commonConnectorConfig, TopicSelector<T> topicSelector, DatabaseSchema<T> databaseSchema, ChangeEventQueue<DataChangeEvent> changeEventQueue, DataCollectionFilters.DataCollectionFilter<T> dataCollectionFilter, ChangeEventCreator changeEventCreator) {
        this(commonConnectorConfig, topicSelector, databaseSchema, changeEventQueue, dataCollectionFilter, changeEventCreator, null);
    }

    public EventDispatcher(CommonConnectorConfig commonConnectorConfig, TopicSelector<T> topicSelector, DatabaseSchema<T> databaseSchema, ChangeEventQueue<DataChangeEvent> changeEventQueue, DataCollectionFilters.DataCollectionFilter<T> dataCollectionFilter, ChangeEventCreator changeEventCreator, InconsistentSchemaHandler<T> inconsistentSchemaHandler) {
        this.eventListener = DataChangeEventListener.NO_OP;
        this.connectorConfig = commonConnectorConfig;
        this.topicSelector = topicSelector;
        this.schema = databaseSchema;
        this.historizedSchema = databaseSchema instanceof HistorizedDatabaseSchema ? (HistorizedDatabaseSchema) databaseSchema : null;
        this.queue = changeEventQueue;
        this.filter = dataCollectionFilter;
        this.changeEventCreator = changeEventCreator;
        this.streamingReceiver = new StreamingChangeRecordReceiver();
        this.emitTombstonesOnDelete = commonConnectorConfig.isEmitTombstoneOnDelete();
        this.inconsistentSchemaHandler = inconsistentSchemaHandler != null ? inconsistentSchemaHandler : this::errorOnMissingSchema;
        this.heartbeat = Heartbeat.create(commonConnectorConfig.getConfig(), topicSelector.getHeartbeatTopic(), commonConnectorConfig.getLogicalName());
    }

    public void dispatchSnapshotEvent(T t, ChangeRecordEmitter changeRecordEmitter, final SnapshotReceiver snapshotReceiver) throws InterruptedException {
        final DataCollectionSchema schemaFor = this.schema.schemaFor(t);
        if (schemaFor == null) {
            errorOnMissingSchema(t, changeRecordEmitter);
        }
        changeRecordEmitter.emitChangeRecords(schemaFor, new ChangeRecordEmitter.Receiver() { // from class: io.debezium.pipeline.EventDispatcher.1
            @Override // io.debezium.pipeline.spi.ChangeRecordEmitter.Receiver
            public void changeRecord(DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object obj, Struct struct, OffsetContext offsetContext) throws InterruptedException {
                EventDispatcher.this.eventListener.onEvent(schemaFor.id(), offsetContext, obj, struct);
                snapshotReceiver.changeRecord(schemaFor, operation, obj, struct, offsetContext);
            }
        });
    }

    public SnapshotReceiver getSnapshotChangeEventReceiver() {
        return new BufferingSnapshotChangeRecordReceiver();
    }

    public boolean dispatchDataChangeEvent(final T t, ChangeRecordEmitter changeRecordEmitter) throws InterruptedException {
        try {
            boolean z = false;
            if (this.filter.isIncluded(t)) {
                DataCollectionSchema schemaFor = this.schema.schemaFor(t);
                if (schemaFor == null) {
                    Optional<DataCollectionSchema> handle = this.inconsistentSchemaHandler.handle(t, changeRecordEmitter);
                    if (!handle.isPresent()) {
                        return false;
                    }
                    schemaFor = handle.get();
                }
                changeRecordEmitter.emitChangeRecords(schemaFor, new ChangeRecordEmitter.Receiver() { // from class: io.debezium.pipeline.EventDispatcher.2
                    @Override // io.debezium.pipeline.spi.ChangeRecordEmitter.Receiver
                    public void changeRecord(DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object obj, Struct struct, OffsetContext offsetContext) throws InterruptedException {
                        EventDispatcher.this.eventListener.onEvent(t, offsetContext, obj, struct);
                        EventDispatcher.this.streamingReceiver.changeRecord(dataCollectionSchema, operation, obj, struct, offsetContext);
                    }
                });
                z = true;
            } else {
                LOGGER.trace("Filtered data change event for {}", t);
                this.eventListener.onFilteredEvent("source = " + t);
            }
            this.heartbeat.heartbeat(changeRecordEmitter.getOffset().getPartition(), changeRecordEmitter.getOffset().getOffset(), this::enqueueHeartbeat);
            return z;
        } catch (Exception e) {
            switch (this.connectorConfig.getEventProcessingFailureHandlingMode()) {
                case FAIL:
                    throw new ConnectException("Error while processing event at offset " + changeRecordEmitter.getOffset().getOffset(), e);
                case WARN:
                    LOGGER.warn("Error while processing event at offset {}", changeRecordEmitter.getOffset().getOffset());
                    return false;
                case SKIP:
                    LOGGER.debug("Error while processing event at offset {}", changeRecordEmitter.getOffset().getOffset());
                    return false;
                default:
                    return false;
            }
        }
    }

    public Optional<DataCollectionSchema> errorOnMissingSchema(T t, ChangeRecordEmitter changeRecordEmitter) {
        this.eventListener.onErroneousEvent("source = " + t);
        throw new IllegalArgumentException("No metadata registered for captured table " + t);
    }

    public Optional<DataCollectionSchema> ignoreMissingSchema(T t, ChangeRecordEmitter changeRecordEmitter) {
        return Optional.empty();
    }

    public void dispatchSchemaChangeEvent(T t, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException {
        if (this.filter.isIncluded(t)) {
            schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
        } else {
            LOGGER.trace("Filtering schema change event for {}", t);
        }
    }

    public void alwaysDispatchHeartbeatEvent(OffsetContext offsetContext) throws InterruptedException {
        this.heartbeat.forcedBeat(offsetContext.getPartition(), offsetContext.getOffset(), this::enqueueHeartbeat);
    }

    public void dispatchHeartbeatEvent(OffsetContext offsetContext) throws InterruptedException {
        this.heartbeat.heartbeat(offsetContext.getPartition(), offsetContext.getOffset(), this::enqueueHeartbeat);
    }

    public boolean heartbeatsEnabled() {
        return this.heartbeat.isEnabled();
    }

    private void enqueueHeartbeat(SourceRecord sourceRecord) throws InterruptedException {
        this.queue.enqueue(new DataChangeEvent(sourceRecord));
    }

    public void setEventListener(DataChangeEventListener dataChangeEventListener) {
        this.eventListener = dataChangeEventListener;
    }
}
