package com.spotify.styx.api;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import com.google.common.io.Closer;
import com.spotify.apollo.Client;
import com.spotify.apollo.Request;
import com.spotify.apollo.RequestContext;
import com.spotify.apollo.Response;
import com.spotify.apollo.Status;
import com.spotify.apollo.StatusType;
import com.spotify.apollo.entity.EntityMiddleware;
import com.spotify.apollo.entity.JacksonEntityCodec;
import com.spotify.apollo.route.AsyncHandler;
import com.spotify.apollo.route.Middleware;
import com.spotify.apollo.route.Route;
import com.spotify.futures.CompletableFutures;
import com.spotify.styx.api.Api;
import com.spotify.styx.api.RunStateDataPayload;
import com.spotify.styx.model.Backfill;
import com.spotify.styx.model.BackfillBuilder;
import com.spotify.styx.model.BackfillInput;
import com.spotify.styx.model.EditableBackfillInput;
import com.spotify.styx.model.Event;
import com.spotify.styx.model.Schedule;
import com.spotify.styx.model.Workflow;
import com.spotify.styx.model.WorkflowId;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.serialization.Json;
import com.spotify.styx.state.RunState;
import com.spotify.styx.state.StateData;
import com.spotify.styx.storage.Storage;
import com.spotify.styx.util.CloserUtil;
import com.spotify.styx.util.ParameterUtil;
import com.spotify.styx.util.RandomGenerator;
import com.spotify.styx.util.ReplayEvents;
import com.spotify.styx.util.ResourceNotFoundException;
import com.spotify.styx.util.TimeUtil;
import com.spotify.styx.util.WorkflowValidator;
import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import okio.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/styx/api/BackfillResource.class */
public final class BackfillResource implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(WorkflowResource.class);
    static final String BASE = "/backfills";
    private static final String SCHEDULER_BASE_PATH = "/api/v0";
    private static final String UNKNOWN = "UNKNOWN";
    private static final String WAITING = "WAITING";
    private static final int CONCURRENCY = 64;
    private final Storage storage;
    private final String schedulerServiceBaseUrl;
    private final WorkflowValidator workflowValidator;
    private final Closer closer = Closer.create();
    private final ForkJoinPool forkJoinPool = (ForkJoinPool) CloserUtil.register(this.closer, new ForkJoinPool(CONCURRENCY), "backfill-resource");

    public BackfillResource(String str, Storage storage, WorkflowValidator workflowValidator) {
        this.schedulerServiceBaseUrl = (String) Objects.requireNonNull(str);
        this.storage = (Storage) Objects.requireNonNull(storage);
        this.workflowValidator = (WorkflowValidator) Objects.requireNonNull(workflowValidator, "workflowValidator");
    }

    public Stream<Route<AsyncHandler<Response<ByteString>>>> routes() {
        EntityMiddleware forCodec = EntityMiddleware.forCodec(JacksonEntityCodec.forMapper(Json.OBJECT_MAPPER));
        return Streams.concat(new Stream[]{Api.prefixRoutes((List) Stream.of((Object[]) new Route[]{Route.with(forCodec.serializerDirect(BackfillsPayload.class), "GET", BASE, this::getBackfills), Route.with(forCodec.response(BackfillInput.class, Backfill.class), "POST", BASE, requestContext -> {
            return this::postBackfill;
        }), Route.with(forCodec.serializerResponse(BackfillPayload.class), "GET", "/backfills/<bid>", requestContext2 -> {
            return getBackfill(requestContext2, (String) requestContext2.pathArgs().get("bid"));
        }), Route.with(forCodec.response(EditableBackfillInput.class, Backfill.class), "PUT", "/backfills/<bid>", requestContext3 -> {
            return editableBackfillInput -> {
                return updateBackfill((String) requestContext3.pathArgs().get("bid"), editableBackfillInput);
            };
        })}).map(route -> {
            return route.withMiddleware(Middleware::syncToAsync);
        }).collect(Collectors.toList()), new Api.Version[]{Api.Version.V3}), Api.prefixRoutes(Collections.singletonList(Route.async("DELETE", "/backfills/<bid>", requestContext4 -> {
            return haltBackfill((String) requestContext4.pathArgs().get("bid"), requestContext4);
        })), new Api.Version[]{Api.Version.V3})});
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closer.close();
    }

    private BackfillsPayload getBackfills(RequestContext requestContext) {
        Stream stream;
        Optional parameter = requestContext.request().parameter("component");
        Optional parameter2 = requestContext.request().parameter("workflow");
        boolean equals = ((String) requestContext.request().parameter("status").orElse("false")).equals("true");
        boolean equals2 = ((String) requestContext.request().parameter("showAll").orElse("false")).equals("true");
        try {
            if (parameter.isPresent() && parameter2.isPresent()) {
                stream = this.storage.backfillsForWorkflowId(equals2, WorkflowId.create((String) parameter.get(), (String) parameter2.get())).stream();
            } else if (parameter.isPresent()) {
                stream = this.storage.backfillsForComponent(equals2, (String) parameter.get()).stream();
            } else if (parameter2.isPresent()) {
                stream = this.storage.backfillsForWorkflow(equals2, (String) parameter2.get()).stream();
            } else {
                stream = this.storage.backfills(equals2).stream();
            }
            return BackfillsPayload.create((List) ((List) stream.map(backfill -> {
                return this.forkJoinPool.submit(() -> {
                    return BackfillPayload.create(backfill, equals ? Optional.of(RunStateDataPayload.create(retrieveBackfillStatuses(backfill))) : Optional.empty());
                });
            }).collect(Collectors.toList())).stream().map((v0) -> {
                return v0.join();
            }).collect(Collectors.toList()));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private Response<BackfillPayload> getBackfill(RequestContext requestContext, String str) {
        boolean equals = ((String) requestContext.request().parameter("status").orElse("true")).equals("true");
        try {
            Optional backfill = this.storage.backfill(str);
            if (!backfill.isPresent()) {
                return Response.forStatus(Status.NOT_FOUND);
            }
            Backfill backfill2 = (Backfill) backfill.get();
            return equals ? Response.forPayload(BackfillPayload.create(backfill2, Optional.of(RunStateDataPayload.create(retrieveBackfillStatuses(backfill2))))) : Response.forPayload(BackfillPayload.create(backfill2, Optional.empty()));
        } catch (IOException e) {
            log.warn(String.format("Couldn't read backfill %s. ", str), e);
            return Response.forStatus(Status.INTERNAL_SERVER_ERROR.withReasonPhrase("Error in internal storage"));
        }
    }

    private String schedulerApiUrl(CharSequence... charSequenceArr) {
        return this.schedulerServiceBaseUrl + SCHEDULER_BASE_PATH + "/" + String.join("/", charSequenceArr);
    }

    private CompletionStage<Response<ByteString>> haltBackfill(String str, RequestContext requestContext) {
        try {
            Optional backfill = this.storage.backfill(str);
            if (!backfill.isPresent()) {
                return CompletableFuture.completedFuture(Response.forStatus(Status.NOT_FOUND.withReasonPhrase("backfill not found")));
            }
            Backfill backfill2 = (Backfill) backfill.get();
            this.storage.storeBackfill(backfill2.builder().halted(true).build());
            return haltActiveBackfillInstances(backfill2, requestContext.requestScopedClient());
        } catch (IOException e) {
            return CompletableFuture.completedFuture(Response.forStatus(Status.INTERNAL_SERVER_ERROR.withReasonPhrase("could not halt backfill: " + e.getMessage())));
        }
    }

    private CompletionStage<Response<ByteString>> haltActiveBackfillInstances(Backfill backfill, Client client) {
        return CompletableFutures.allAsList((List) retrieveBackfillStatuses(backfill).stream().filter(BackfillResource::isActiveState).map((v0) -> {
            return v0.workflowInstance();
        }).map(workflowInstance -> {
            return haltActiveBackfillInstance(workflowInstance, client);
        }).collect(Collectors.toList())).handle((list, th) -> {
            return (th != null || list.contains(Boolean.FALSE)) ? Response.forStatus(Status.INTERNAL_SERVER_ERROR.withReasonPhrase("some active instances cannot be halted, however no new ones will be triggered")) : Response.ok();
        });
    }

    private CompletionStage<Boolean> haltActiveBackfillInstance(WorkflowInstance workflowInstance, Client client) {
        try {
            return client.send(Request.forUri(schedulerApiUrl("events"), "POST").withPayload(Json.serialize(Event.halt(workflowInstance)))).thenApply(response -> {
                return Boolean.valueOf(response.status().family().equals(StatusType.Family.SUCCESSFUL));
            });
        } catch (JsonProcessingException e) {
            return CompletableFuture.completedFuture(false);
        }
    }

    private static boolean isActiveState(RunStateDataPayload.RunStateData runStateData) {
        String state = runStateData.state();
        boolean z = -1;
        switch (state.hashCode()) {
            case 433141802:
                if (state.equals(UNKNOWN)) {
                    z = false;
                    break;
                }
                break;
            case 1834295853:
                if (state.equals(WAITING)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return false;
            case true:
                return false;
            default:
                return !RunState.State.valueOf(state).isTerminal();
        }
    }

    private Response<Backfill> postBackfill(BackfillInput backfillInput) {
        BackfillBuilder newBuilder = Backfill.newBuilder();
        String generateUniqueId = RandomGenerator.DEFAULT.generateUniqueId("backfill");
        WorkflowId create = WorkflowId.create(backfillInput.component(), backfillInput.workflow());
        try {
            Set keySet = this.storage.readActiveStates(backfillInput.component()).keySet();
            Optional workflow = this.storage.workflow(create);
            if (!workflow.isPresent()) {
                return Response.forStatus(Status.NOT_FOUND.withReasonPhrase("workflow not found"));
            }
            Workflow workflow2 = (Workflow) workflow.get();
            Schedule schedule = workflow2.configuration().schedule();
            if (!workflow2.configuration().dockerImage().isPresent()) {
                return Response.forStatus(Status.BAD_REQUEST.withReasonPhrase("Workflow is missing docker image"));
            }
            List validateWorkflow = this.workflowValidator.validateWorkflow(workflow2);
            if (!validateWorkflow.isEmpty()) {
                return Response.forStatus(Status.BAD_REQUEST.withReasonPhrase("Invalid workflow configuration: " + String.join(", ", validateWorkflow)));
            }
            if (!backfillInput.start().isBefore(backfillInput.end())) {
                return Response.forStatus(Status.BAD_REQUEST.withReasonPhrase("start must be before end"));
            }
            if (!TimeUtil.isAligned(backfillInput.start(), schedule)) {
                return Response.forStatus(Status.BAD_REQUEST.withReasonPhrase("start parameter not aligned with schedule"));
            }
            if (!TimeUtil.isAligned(backfillInput.end(), schedule)) {
                return Response.forStatus(Status.BAD_REQUEST.withReasonPhrase("end parameter not aligned with schedule"));
            }
            List instantsInRange = TimeUtil.instantsInRange(backfillInput.start(), backfillInput.end(), schedule);
            Stream map = instantsInRange.stream().map(instant -> {
                return WorkflowInstance.create(create, ParameterUtil.toParameter(schedule, instant));
            });
            Objects.requireNonNull(keySet);
            List list = (List) map.filter((v1) -> {
                return r1.contains(v1);
            }).collect(Collectors.toList());
            if (!list.isEmpty()) {
                return Response.forStatus(Status.CONFLICT.withReasonPhrase("these partitions are already active: " + ((String) list.stream().map((v0) -> {
                    return v0.parameter();
                }).collect(Collectors.joining(", ")))));
            }
            newBuilder.id(generateUniqueId).allTriggered(false).workflowId(create).concurrency(backfillInput.concurrency()).start(backfillInput.start()).end(backfillInput.end()).schedule(schedule).nextTrigger(backfillInput.reverse() ? (Instant) Iterables.getLast(instantsInRange) : backfillInput.start()).description(backfillInput.description()).reverse(backfillInput.reverse()).triggerParameters(backfillInput.triggerParameters()).halted(false);
            Backfill build = newBuilder.build();
            try {
                this.storage.storeBackfill(build);
                return Response.forPayload(build);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    private Response<Backfill> updateBackfill(String str, EditableBackfillInput editableBackfillInput) {
        if (!editableBackfillInput.id().equals(str)) {
            return Response.forStatus(Status.BAD_REQUEST.withReasonPhrase("ID of payload does not match ID in uri."));
        }
        try {
            return Response.forStatus(Status.OK).withPayload((Backfill) this.storage.runInTransaction(storageTransaction -> {
                Optional backfill = storageTransaction.backfill(str);
                if (!backfill.isPresent()) {
                    throw new ResourceNotFoundException(String.format("Backfill %s not found.", str));
                }
                BackfillBuilder builder = ((Backfill) backfill.get()).builder();
                Optional concurrency = editableBackfillInput.concurrency();
                Objects.requireNonNull(builder);
                concurrency.ifPresent((v1) -> {
                    r1.concurrency(v1);
                });
                Optional description = editableBackfillInput.description();
                Objects.requireNonNull(builder);
                description.ifPresent(builder::description);
                return storageTransaction.store(builder.build());
            }));
        } catch (IOException e) {
            return Response.forStatus(Status.INTERNAL_SERVER_ERROR.withReasonPhrase("Failed to store backfill."));
        } catch (ResourceNotFoundException e2) {
            return Response.forStatus(Status.NOT_FOUND.withReasonPhrase(e2.getMessage()));
        }
    }

    private List<RunStateDataPayload.RunStateData> retrieveBackfillStatuses(Backfill backfill) {
        try {
            Map readActiveStatesByTriggerId = this.storage.readActiveStatesByTriggerId(backfill.id());
            List list = (List) ((List) (backfill.reverse() ? TimeUtil.instantsInRange(TimeUtil.nextInstant(backfill.nextTrigger(), backfill.schedule()), backfill.end(), backfill.schedule()) : TimeUtil.instantsInRange(backfill.start(), backfill.nextTrigger(), backfill.schedule())).stream().map(instant -> {
                return this.forkJoinPool.submit(() -> {
                    return getRunStateData(backfill, readActiveStatesByTriggerId, instant);
                });
            }).collect(Collectors.toList())).stream().map((v0) -> {
                return v0.join();
            }).collect(Collectors.toList());
            List list2 = (List) (backfill.reverse() ? TimeUtil.instantsInRange(backfill.start(), TimeUtil.nextInstant(backfill.nextTrigger(), backfill.schedule()), backfill.schedule()) : TimeUtil.instantsInRange(backfill.nextTrigger(), backfill.end(), backfill.schedule())).stream().map(instant2 -> {
                return RunStateDataPayload.RunStateData.create(WorkflowInstance.create(backfill.workflowId(), ParameterUtil.toParameter(backfill.schedule(), instant2)), WAITING, StateData.zero());
            }).collect(Collectors.toList());
            return backfill.reverse() ? (List) Stream.concat(list2.stream(), list.stream()).collect(Collectors.toList()) : (List) Stream.concat(list.stream(), list2.stream()).collect(Collectors.toList());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private RunStateDataPayload.RunStateData getRunStateData(Backfill backfill, Map<WorkflowInstance, RunState> map, Instant instant) {
        WorkflowInstance create = WorkflowInstance.create(backfill.workflowId(), ParameterUtil.toParameter(backfill.schedule(), instant));
        if (map.containsKey(create)) {
            RunState runState = map.get(create);
            return RunStateDataPayload.RunStateData.create(runState.workflowInstance(), runState.state().name(), runState.data());
        }
        Optional backfillRunState = ReplayEvents.getBackfillRunState(create, this.storage, backfill.id());
        if (!backfillRunState.isPresent()) {
            return RunStateDataPayload.RunStateData.create(create, UNKNOWN, StateData.zero());
        }
        RunState runState2 = (RunState) backfillRunState.get();
        return RunStateDataPayload.RunStateData.create(runState2.workflowInstance(), runState2.state().name(), runState2.data());
    }
}
