package com.kuliginstepan.mongration;

import com.kuliginstepan.mongration.annotation.Changelog;
import com.kuliginstepan.mongration.configuration.MongrationProperties;
import com.kuliginstepan.mongration.entity.ChangesetEntity;
import com.kuliginstepan.mongration.service.AbstractChangeSetService;
import com.kuliginstepan.mongration.service.IndexCreator;
import com.kuliginstepan.mongration.service.LockService;
import com.kuliginstepan.mongration.utils.ChangelogUtils;
import java.lang.reflect.Method;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
import org.springframework.util.StringUtils;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/* loaded from: input_file:com/kuliginstepan/mongration/AbstractMongration.class */
public abstract class AbstractMongration {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractMongration.class);
    private final AbstractChangeSetService changesetService;
    private final IndexCreator indexCreator;
    private final LockService lockService;
    private final MongrationProperties properties;
    protected ApplicationContext context;

    @EventListener
    public void start(ApplicationReadyEvent applicationReadyEvent) {
        this.context = applicationReadyEvent.getApplicationContext();
        log.info("mongration started");
        findMigrationsForExecution().flatMap(list -> {
            return withLockAcquired(Mono.defer(() -> {
                return executeMigration(list);
            }));
        }).block();
        log.info("mongration finished its work");
    }

    protected Mono<Void> withLockAcquired(Mono<Void> mono) {
        Mono<Void> acquireLock = acquireLock();
        Mono onErrorResume = mono.onErrorResume(th -> {
            return this.lockService.releaseLock().onErrorResume(th -> {
                th.addSuppressed(th);
                return Mono.empty();
            }).then(Mono.error(th));
        });
        LockService lockService = this.lockService;
        Objects.requireNonNull(lockService);
        return acquireLock.then(onErrorResume.then(Mono.defer(lockService::releaseLock)));
    }

    protected Mono<Void> acquireLock() {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        LockService lockService = this.lockService;
        Objects.requireNonNull(lockService);
        return Mono.defer(lockService::acquireLock).retryWhen(flux -> {
            return flux.zipWith(Flux.range(1, this.properties.getRetryCount() + 1), this::handleFailedTry).flatMap(num -> {
                return Mono.delay(this.properties.getRetryDelay());
            }).doOnNext(l -> {
                log.warn("mongration retried {} time at {}", Integer.valueOf(atomicInteger.incrementAndGet()), LocalTime.now());
            });
        });
    }

    private Integer handleFailedTry(Throwable th, Integer num) {
        if (!(th instanceof MongrationException) || num.intValue() > this.properties.getRetryCount()) {
            throw Exceptions.propagate(th);
        }
        return num;
    }

    protected abstract Mono<Object> executeChangeSetMethod(Object obj, Method method);

    protected List<String> doValidateChangelog(Class<?> cls) {
        List<Method> findChangeSetMethods = ChangelogUtils.findChangeSetMethods(cls);
        long count = findChangeSetMethods.stream().map(ChangelogUtils::extractChangeset).map((v0) -> {
            return v0.order();
        }).distinct().count();
        long count2 = findChangeSetMethods.stream().map(method -> {
            Optional filter = Optional.of(method).map(ChangelogUtils::extractChangeset).map((v0) -> {
                return v0.id();
            }).filter(StringUtils::hasText);
            Objects.requireNonNull(method);
            return (String) filter.orElseGet(method::getName);
        }).distinct().count();
        ArrayList arrayList = new ArrayList();
        if (count != findChangeSetMethods.size()) {
            arrayList.add("Several change sets have same order");
        }
        if (count2 != findChangeSetMethods.size()) {
            arrayList.add("Several change sets have same id's");
        }
        return arrayList;
    }

    private Mono<Void> validateChangelog(Class<?> cls) {
        List<String> doValidateChangelog = doValidateChangelog(cls);
        return doValidateChangelog.isEmpty() ? Mono.empty() : Mono.error(() -> {
            return new MongrationException(String.join(",", doValidateChangelog));
        });
    }

    private Mono<List<Tuple2<Object, List<Method>>>> findMigrationsForExecution() {
        return Flux.fromIterable(this.context.getBeansWithAnnotation(Changelog.class).values()).flatMap(obj -> {
            return validateChangelog(ChangelogUtils.getChangelogClass(obj)).thenMany(Flux.fromIterable(ChangelogUtils.findChangeSetMethods(ChangelogUtils.getChangelogClass(obj)))).collectSortedList(Comparator.comparingInt(method -> {
                return ChangelogUtils.extractChangeset(method).order();
            })).map(list -> {
                return Tuples.of(obj, list);
            });
        }).sort((tuple2, tuple22) -> {
            return AnnotationAwareOrderComparator.INSTANCE.compare(tuple2.getT1(), tuple22.getT1());
        }).collectList();
    }

    private Mono<Void> executeMigration(List<Tuple2<Object, List<Method>>> list) {
        return this.indexCreator.createIndexes(ChangesetEntity.class).then(Mono.defer(() -> {
            log.info("started executing migrations");
            list.forEach(this::executeChangelogMigrations);
            return this.indexCreator.createIndexes();
        }));
    }

    private void executeChangelogMigrations(Tuple2<Object, List<Method>> tuple2) {
        ((List) tuple2.getT2()).forEach(method -> {
            Mono.just(tuple2.getT1()).filterWhen(obj -> {
                return this.changesetService.needExecuteChangeset(method, obj);
            }).flatMap(obj2 -> {
                return executeMigration(obj2, method);
            }).block();
        });
    }

    private Mono<Void> executeMigration(Object obj, Method method) {
        return this.changesetService.validateChangesetMethodSignature(method).then(Mono.defer(() -> {
            return executeChangeSetMethod(obj, method);
        })).then(Mono.defer(() -> {
            return this.changesetService.saveChangeset(method, obj);
        })).onErrorMap(th -> {
            return new MongrationException("Could't execute changeset: " + method.getName(), th);
        });
    }

    @Generated
    public AbstractMongration(AbstractChangeSetService abstractChangeSetService, IndexCreator indexCreator, LockService lockService, MongrationProperties mongrationProperties) {
        this.changesetService = abstractChangeSetService;
        this.indexCreator = indexCreator;
        this.lockService = lockService;
        this.properties = mongrationProperties;
    }
}
