package com.netflix.conductor.dao.mysql;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.execution.ApplicationException;
import com.netflix.conductor.dao.QueueDAO;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.sql.DataSource;

@Singleton
/* loaded from: input_file:com/netflix/conductor/dao/mysql/MySQLQueueDAO.class */
public class MySQLQueueDAO extends MySQLBaseDAO implements QueueDAO {
    private static final Long UNACK_SCHEDULE_MS = 60000L;

    @Inject
    public MySQLQueueDAO(ObjectMapper objectMapper, DataSource dataSource) {
        super(objectMapper, dataSource);
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::processAllUnacks, UNACK_SCHEDULE_MS.longValue(), UNACK_SCHEDULE_MS.longValue(), TimeUnit.MILLISECONDS);
        this.logger.debug(MySQLQueueDAO.class.getName() + " is ready to serve");
    }

    public void push(String str, String str2, long j) {
        push(str, str2, 0, j);
    }

    public void push(String str, String str2, int i, long j) {
        withTransaction(connection -> {
            pushMessage(connection, str, str2, null, Integer.valueOf(i), j);
        });
    }

    public void push(String str, List<Message> list) {
        withTransaction(connection -> {
            list.forEach(message -> {
                pushMessage(connection, str, message.getId(), message.getPayload(), Integer.valueOf(message.getPriority()), 0L);
            });
        });
    }

    public boolean pushIfNotExists(String str, String str2, long j) {
        return pushIfNotExists(str, str2, 0, j);
    }

    public boolean pushIfNotExists(String str, String str2, int i, long j) {
        return ((Boolean) getWithRetriedTransactions(connection -> {
            if (existsMessage(connection, str, str2)) {
                return false;
            }
            pushMessage(connection, str, str2, null, Integer.valueOf(i), j);
            return true;
        })).booleanValue();
    }

    public List<String> pop(String str, int i, int i2) {
        List list = (List) getWithTransactionWithOutErrorPropagation(connection -> {
            return popMessages(connection, str, i, i2);
        });
        return list == null ? new ArrayList() : (List) list.stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
    }

    public List<Message> pollMessages(String str, int i, int i2) {
        List<Message> list = (List) getWithTransactionWithOutErrorPropagation(connection -> {
            return popMessages(connection, str, i, i2);
        });
        return list == null ? new ArrayList() : list;
    }

    public void remove(String str, String str2) {
        withTransaction(connection -> {
            removeMessage(connection, str, str2);
        });
    }

    public int getSize(String str) {
        return ((Integer) queryWithTransaction("SELECT COUNT(*) FROM queue_message WHERE queue_name = ?", query -> {
            return Integer.valueOf(Long.valueOf(query.addParameter(str).executeCount()).intValue());
        })).intValue();
    }

    public boolean ack(String str, String str2) {
        return ((Boolean) getWithRetriedTransactions(connection -> {
            return Boolean.valueOf(removeMessage(connection, str, str2));
        })).booleanValue();
    }

    public boolean setUnackTimeout(String str, String str2, long j) {
        long j2 = j / 1000;
        return ((Integer) queryWithTransaction("UPDATE queue_message SET offset_time_seconds = ?, deliver_on = TIMESTAMPADD(SECOND, ?, CURRENT_TIMESTAMP) WHERE queue_name = ? AND message_id = ?", query -> {
            return Integer.valueOf(query.addParameter(j2).addParameter(j2).addParameter(str).addParameter(str2).executeUpdate());
        })).intValue() == 1;
    }

    public void flush(String str) {
        executeWithTransaction("DELETE FROM queue_message WHERE queue_name = ?", query -> {
            query.addParameter(str).executeDelete();
        });
    }

    public Map<String, Long> queuesDetail() {
        return (Map) queryWithTransaction("SELECT queue_name, (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size FROM queue q", query -> {
            return (Map) query.executeAndFetch(resultSet -> {
                HashMap newHashMap = Maps.newHashMap();
                while (resultSet.next()) {
                    newHashMap.put(resultSet.getString("queue_name"), Long.valueOf(resultSet.getLong("size")));
                }
                return newHashMap;
            });
        });
    }

    public Map<String, Map<String, Map<String, Long>>> queuesDetailVerbose() {
        return (Map) queryWithTransaction("SELECT queue_name, \n       (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size,\n       (SELECT count(*) FROM queue_message WHERE popped = true AND queue_name = q.queue_name) AS uacked \nFROM queue q", query -> {
            return (Map) query.executeAndFetch(resultSet -> {
                HashMap newHashMap = Maps.newHashMap();
                while (resultSet.next()) {
                    newHashMap.put(resultSet.getString("queue_name"), ImmutableMap.of("a", ImmutableMap.of("size", Long.valueOf(resultSet.getLong("size")), "uacked", Long.valueOf(resultSet.getLong("uacked")))));
                }
                return newHashMap;
            });
        });
    }

    public void processAllUnacks() {
        this.logger.trace("processAllUnacks started");
        executeWithTransaction("UPDATE queue_message SET popped = false WHERE popped = true AND TIMESTAMPADD(SECOND,60,CURRENT_TIMESTAMP) > deliver_on", (v0) -> {
            v0.executeUpdate();
        });
    }

    public void processUnacks(String str) {
        executeWithTransaction("UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND TIMESTAMPADD(SECOND,60,CURRENT_TIMESTAMP)  > deliver_on", query -> {
            query.addParameter(str).executeUpdate();
        });
    }

    public boolean resetOffsetTime(String str, String str2) {
        long j = 0;
        return ((Boolean) queryWithTransaction("UPDATE queue_message SET offset_time_seconds = ?, deliver_on = TIMESTAMPADD(SECOND,?,CURRENT_TIMESTAMP) \nWHERE queue_name = ? AND message_id = ?", query -> {
            return Boolean.valueOf(query.addParameter(j).addParameter(j).addParameter(str).addParameter(str2).executeUpdate() == 1);
        })).booleanValue();
    }

    private boolean existsMessage(Connection connection, String str, String str2) {
        return ((Boolean) query(connection, "SELECT EXISTS(SELECT 1 FROM queue_message WHERE queue_name = ? AND message_id = ?)", query -> {
            return Boolean.valueOf(query.addParameter(str).addParameter(str2).exists());
        })).booleanValue();
    }

    private void pushMessage(Connection connection, String str, String str2, String str3, Integer num, long j) {
        createQueueIfNotExists(connection, str);
        if (((Integer) query(connection, "UPDATE queue_message SET payload=?, deliver_on=TIMESTAMPADD(SECOND,?,CURRENT_TIMESTAMP) WHERE queue_name = ? AND message_id = ?", query -> {
            return Integer.valueOf(query.addParameter(str3).addParameter(j).addParameter(str).addParameter(str2).executeUpdate());
        })).intValue() == 0) {
            execute(connection, "INSERT INTO queue_message (deliver_on, queue_name, message_id, priority, offset_time_seconds, payload) VALUES (TIMESTAMPADD(SECOND,?,CURRENT_TIMESTAMP), ?, ?,?,?,?) ON DUPLICATE KEY UPDATE payload=VALUES(payload), deliver_on=VALUES(deliver_on)", query2 -> {
                query2.addParameter(j).addParameter(str).addParameter(str2).addParameter(num.intValue()).addParameter(j).addParameter(str3).executeUpdate();
            });
        }
    }

    private boolean removeMessage(Connection connection, String str, String str2) {
        return ((Boolean) query(connection, "DELETE FROM queue_message WHERE queue_name = ? AND message_id = ?", query -> {
            return Boolean.valueOf(query.addParameter(str).addParameter(str2).executeDelete());
        })).booleanValue();
    }

    private List<Message> peekMessages(Connection connection, String str, int i) {
        return i < 1 ? Collections.emptyList() : (List) query(connection, "SELECT message_id, priority, payload FROM queue_message use index(combo_queue_message) WHERE queue_name = ? AND popped = false AND deliver_on <= TIMESTAMPADD(MICROSECOND, 1000, CURRENT_TIMESTAMP) ORDER BY priority DESC, deliver_on, created_on LIMIT ?", query -> {
            return (List) query.addParameter(str).addParameter(i).executeAndFetch(resultSet -> {
                ArrayList arrayList = new ArrayList();
                while (resultSet.next()) {
                    Message message = new Message();
                    message.setId(resultSet.getString("message_id"));
                    message.setPriority(resultSet.getInt("priority"));
                    message.setPayload(resultSet.getString("payload"));
                    arrayList.add(message);
                }
                return arrayList;
            });
        });
    }

    private List<Message> popMessages(Connection connection, String str, int i, int i2) {
        List<Message> list;
        long currentTimeMillis = System.currentTimeMillis();
        List<Message> peekMessages = peekMessages(connection, str, i);
        while (true) {
            list = peekMessages;
            if (list.size() >= i || System.currentTimeMillis() - currentTimeMillis >= i2) {
                break;
            }
            Uninterruptibles.sleepUninterruptibly(200L, TimeUnit.MILLISECONDS);
            peekMessages = peekMessages(connection, str, i);
        }
        if (list.isEmpty()) {
            return list;
        }
        List list2 = (List) list.stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        int intValue = ((Integer) query(connection, String.format("UPDATE queue_message SET popped = true WHERE queue_name = ? AND message_id IN (%s) AND popped = false", Query.generateInBindings(list.size())), query -> {
            return Integer.valueOf(query.addParameter(str).addParameters(list2).executeUpdate());
        })).intValue();
        if (intValue != list.size()) {
            throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, String.format("Could not pop all messages for given ids: %s (%d messages were popped)", list2, Integer.valueOf(intValue)));
        }
        return list;
    }

    private void createQueueIfNotExists(Connection connection, String str) {
        this.logger.trace("Creating new queue '{}'", str);
        if (((Boolean) query(connection, "SELECT EXISTS(SELECT 1 FROM queue WHERE queue_name = ?)", query -> {
            return Boolean.valueOf(query.addParameter(str).exists());
        })).booleanValue()) {
            return;
        }
        execute(connection, "INSERT IGNORE INTO queue (queue_name) VALUES (?)", query2 -> {
            query2.addParameter(str).executeUpdate();
        });
    }
}
