package io.debezium.pipeline.signal.channels;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Field;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.pipeline.signal.SignalRecord;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/pipeline/signal/channels/FileSignalChannel.class */
public class FileSignalChannel implements SignalChannelReader {
    public static final String CONFIGURATION_FIELD_PREFIX_STRING = "signal.";
    public static final String CHANNEL_NAME = "file";
    ObjectMapper mapper = new ObjectMapper();
    private File signalFile;
    public static final Field SIGNAL_FILE = Field.create("signal.file").withDisplayName("Signal file name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("The name of the file for the signals to the connector").withValidation(Field::isRequired);
    private static final Logger LOGGER = LoggerFactory.getLogger(FileSignalChannel.class);

    @Override // io.debezium.pipeline.signal.channels.SignalChannelReader
    public String name() {
        return CHANNEL_NAME;
    }

    @Override // io.debezium.pipeline.signal.channels.SignalChannelReader
    public void init(CommonConnectorConfig commonConnectorConfig) {
        this.signalFile = new File(commonConnectorConfig.getConfig().subset("signal.", false).edit().withDefault(SIGNAL_FILE, "file-signals.txt").build().getString(SIGNAL_FILE));
        LOGGER.info("Reading '{}' file for signals", this.signalFile.getAbsolutePath());
    }

    @Override // io.debezium.pipeline.signal.channels.SignalChannelReader
    public void reset(Object obj) {
        try {
            FileChannel open = FileChannel.open(Paths.get(this.signalFile.getPath(), new String[0]), StandardOpenOption.WRITE);
            try {
                open.truncate(0L);
                if (open != null) {
                    open.close();
                }
            } finally {
            }
        } catch (IOException e) {
            LOGGER.error("Unable to truncate file '{}'", this.signalFile.getAbsolutePath());
        }
    }

    @Override // io.debezium.pipeline.signal.channels.SignalChannelReader
    public List<SignalRecord> read() {
        ArrayList arrayList = new ArrayList();
        if (!this.signalFile.exists() || this.signalFile.isDirectory()) {
            LOGGER.trace("Signal file not found '{}'", this.signalFile.getAbsolutePath());
            return arrayList;
        }
        try {
            List<String> readAllLines = Files.readAllLines(this.signalFile.toPath());
            if (!readAllLines.isEmpty()) {
                new FileWriter(this.signalFile, false).close();
            }
            Iterator<String> it = readAllLines.iterator();
            while (it.hasNext()) {
                String next = it.next();
                if (next == null || next.isBlank()) {
                    LOGGER.debug("Ignoring empty signal line: `{}`", next);
                    it.remove();
                } else {
                    try {
                        SignalRecord readSignalString = readSignalString(next);
                        arrayList.add(readSignalString);
                        LOGGER.info("Processing signal: {}, {}, {}, {}", new Object[]{readSignalString.getId(), readSignalString.getType(), readSignalString.getData(), readSignalString.getAdditionalData()});
                    } catch (Exception e) {
                        LOGGER.warn("Skipped signal due to an error '{}'", next, e);
                    }
                    it.remove();
                }
            }
            return arrayList;
        } catch (Exception e2) {
            throw new DebeziumException("Failed to read signal file " + this.signalFile.getAbsolutePath(), e2);
        }
    }

    private SignalRecord readSignalString(String str) throws JsonProcessingException {
        LOGGER.trace("Processing signal line: {}", str);
        JsonNode readTree = this.mapper.readTree(str);
        Map of = readTree.has("additionalData") ? (Map) this.mapper.convertValue(readTree.get("additionalData"), new TypeReference<Map<String, Object>>() { // from class: io.debezium.pipeline.signal.channels.FileSignalChannel.1
        }) : Map.of();
        Long.valueOf(readTree.has(KafkaSignalChannel.CHANNEL_OFFSET) ? readTree.get(KafkaSignalChannel.CHANNEL_OFFSET).asLong(0L) : 0L);
        return new SignalRecord(readTree.get("id").asText(), readTree.get("type").asText(), readTree.get(CloudEventsMaker.FieldName.DATA).toString(), of);
    }

    @Override // io.debezium.pipeline.signal.channels.SignalChannelReader
    public void close() {
    }
}
