/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.segment.local.utils;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsistentDataPushUtils {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsistentDataPushUtils.class);
    private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient();
    private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy((int)5, (long)10000L, (double)2.0);
    public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";

    private ConsistentDataPushUtils() {
    }

    public static Map<URI, String> preUpload(SegmentGenerationJobSpec spec, List<String> segmentsTo) throws Exception {
        String rawTableName = spec.getTableSpec().getTableName();
        LOGGER.info("Start consistent push for table: " + rawTableName);
        Map<URI, List<String>> uriToExistingOfflineSegments = ConsistentDataPushUtils.getSegmentsToReplace(spec, rawTableName);
        LOGGER.info("Existing segments for table {}: " + uriToExistingOfflineSegments, (Object)rawTableName);
        LOGGER.info("New segments for table: {}: " + segmentsTo, (Object)rawTableName);
        return ConsistentDataPushUtils.startReplaceSegments(spec, uriToExistingOfflineSegments, segmentsTo);
    }

    public static void postUpload(SegmentGenerationJobSpec spec, Map<URI, String> uriToLineageEntryIdMap) throws Exception {
        String rawTableName = spec.getTableSpec().getTableName();
        if (uriToLineageEntryIdMap != null && !uriToLineageEntryIdMap.isEmpty()) {
            LOGGER.info("End consistent push for table: " + rawTableName);
            ConsistentDataPushUtils.endReplaceSegments(spec, uriToLineageEntryIdMap);
        }
    }

    public static Map<URI, URI> getStartReplaceSegmentUris(SegmentGenerationJobSpec spec, String rawTableName) {
        HashMap<URI, URI> baseUriToStartReplaceSegmentUriMap = new HashMap<URI, URI>();
        for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
            try {
                URI controllerURI = new URI(pinotClusterSpec.getControllerURI());
                baseUriToStartReplaceSegmentUriMap.put(controllerURI, FileUploadDownloadClient.getStartReplaceSegmentsURI((URI)controllerURI, (String)rawTableName, (String)TableType.OFFLINE.toString(), (boolean)true));
            }
            catch (URISyntaxException e) {
                throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'");
            }
        }
        return baseUriToStartReplaceSegmentUriMap;
    }

    public static Map<URI, String> startReplaceSegments(SegmentGenerationJobSpec spec, Map<URI, List<String>> uriToSegmentsFrom, List<String> segmentsTo) throws Exception {
        HashMap<URI, String> uriToLineageEntryIdMap = new HashMap<URI, String>();
        String rawTableName = spec.getTableSpec().getTableName();
        Map<URI, URI> segmentsUris = ConsistentDataPushUtils.getStartReplaceSegmentUris(spec, rawTableName);
        AuthProvider authProvider = AuthProviderUtils.makeAuthProvider((String)spec.getAuthToken());
        LOGGER.info("Start replace segment URIs: " + segmentsUris);
        for (Map.Entry<URI, URI> entry : segmentsUris.entrySet()) {
            URI controllerUri = entry.getKey();
            URI startSegmentUri = entry.getValue();
            List<String> segmentsFrom = uriToSegmentsFrom.get(controllerUri);
            StartReplaceSegmentsRequest startReplaceSegmentsRequest = new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo);
            DEFAULT_RETRY_POLICY.attempt(() -> {
                try {
                    SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.startReplaceSegments(startSegmentUri, startReplaceSegmentsRequest, authProvider);
                    String responseString = response.getResponse();
                    LOGGER.info("Got response {}: {} while sending start replace segment request for table: {}, uploadURI: {}, request: {}", new Object[]{response.getStatusCode(), responseString, rawTableName, startSegmentUri, startReplaceSegmentsRequest});
                    String segmentLineageEntryId = JsonUtils.stringToJsonNode((String)responseString).get("segmentLineageEntryId").asText();
                    uriToLineageEntryIdMap.put(controllerUri, segmentLineageEntryId);
                    return true;
                }
                catch (SocketTimeoutException se) {
                    return false;
                }
                catch (HttpErrorStatusException e) {
                    if (e.getStatusCode() >= 500) {
                        return false;
                    }
                    if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
                        LOGGER.error("Table: {} not found when sending request: {}", (Object)rawTableName, (Object)startSegmentUri);
                    }
                    throw e;
                }
            });
        }
        return uriToLineageEntryIdMap;
    }

    public static void endReplaceSegments(SegmentGenerationJobSpec spec, Map<URI, String> uriToLineageEntryIdMap) throws Exception {
        AuthProvider authProvider = AuthProviderUtils.makeAuthProvider((String)spec.getAuthToken());
        String rawTableName = spec.getTableSpec().getTableName();
        for (URI controllerUri : uriToLineageEntryIdMap.keySet()) {
            String segmentLineageEntryId = uriToLineageEntryIdMap.get(controllerUri);
            URI uri = FileUploadDownloadClient.getEndReplaceSegmentsURI((URI)controllerUri, (String)rawTableName, (String)TableType.OFFLINE.toString(), (String)segmentLineageEntryId);
            DEFAULT_RETRY_POLICY.attempt(() -> {
                try {
                    SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.endReplaceSegments(uri, 600000, null, authProvider);
                    LOGGER.info("Got response {}: {} while sending end replace segment request for table: {}, uploadURI: {}", new Object[]{response.getStatusCode(), response.getResponse(), rawTableName, uri});
                    return true;
                }
                catch (SocketTimeoutException se) {
                    return false;
                }
                catch (HttpErrorStatusException e) {
                    if (e.getStatusCode() >= 500) {
                        return false;
                    }
                    throw e;
                }
            });
        }
    }

    public static void handleUploadException(SegmentGenerationJobSpec spec, Map<URI, String> uriToLineageEntryIdMap, Exception exception) {
        if (uriToLineageEntryIdMap != null) {
            LOGGER.error("Exception when pushing segments. Marking segment lineage entry to 'REVERTED'.", (Throwable)exception);
            String rawTableName = spec.getTableSpec().getTableName();
            for (Map.Entry<URI, String> entry : uriToLineageEntryIdMap.entrySet()) {
                String segmentLineageEntryId = entry.getValue();
                try {
                    URI uri = FileUploadDownloadClient.getRevertReplaceSegmentsURI((URI)entry.getKey(), (String)rawTableName, (String)TableType.OFFLINE.name(), (String)segmentLineageEntryId, (boolean)true);
                    SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.revertReplaceSegments(uri);
                    LOGGER.info("Got response {}: {} while sending revert replace segment request for table: {}, uploadURI: {}", new Object[]{response.getStatusCode(), response.getResponse(), rawTableName, entry.getKey()});
                }
                catch (IOException | URISyntaxException | HttpErrorStatusException e) {
                    LOGGER.error("Exception when sending revert replace segment request to controller: {} for table: {}", new Object[]{entry.getKey(), rawTableName, e});
                }
            }
        }
    }

    public static boolean consistentDataPushEnabled(TableConfig tableConfig) {
        boolean consistentDataPushEnabled = "REFRESH".equalsIgnoreCase(IngestionConfigUtils.getBatchSegmentIngestionType((TableConfig)tableConfig)) && IngestionConfigUtils.getBatchSegmentIngestionConsistentDataPushEnabled((TableConfig)tableConfig);
        LOGGER.info("Consistent data push is: {}", (Object)(consistentDataPushEnabled ? "enabled" : "disabled"));
        return consistentDataPushEnabled;
    }

    public static Map<URI, List<String>> getSegmentsToReplace(SegmentGenerationJobSpec spec, String rawTableName) throws Exception {
        HashMap<URI, List<String>> uriToOfflineSegments = new HashMap<URI, List<String>>();
        for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
            try {
                URI controllerURI = new URI(pinotClusterSpec.getControllerURI());
                Map segments = FILE_UPLOAD_DOWNLOAD_CLIENT.getSegments(controllerURI, rawTableName, TableType.OFFLINE, true);
                List offlineSegments = (List)segments.get(TableType.OFFLINE.toString());
                uriToOfflineSegments.put(controllerURI, offlineSegments);
            }
            catch (URISyntaxException e) {
                throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'");
            }
        }
        return uriToOfflineSegments;
    }

    public static void configureSegmentPostfix(SegmentGenerationJobSpec spec) {
        SegmentNameGeneratorSpec segmentNameGeneratorSpec = spec.getSegmentNameGeneratorSpec();
        if (segmentNameGeneratorSpec == null) {
            segmentNameGeneratorSpec = new SegmentNameGeneratorSpec();
        }
        String existingPostfix = (String)segmentNameGeneratorSpec.getConfigs().get(SEGMENT_NAME_POSTFIX);
        String currentTimeStamp = Long.toString(System.currentTimeMillis());
        String newSegmentPostfix = existingPostfix == null ? currentTimeStamp : String.join((CharSequence)"_", existingPostfix, currentTimeStamp);
        LOGGER.info("Since consistent data push is enabled, appending current timestamp: {} to segment name postfix", (Object)currentTimeStamp);
        LOGGER.info("Segment postfix is now configured as: {}", (Object)newSegmentPostfix);
        segmentNameGeneratorSpec.addConfig(SEGMENT_NAME_POSTFIX, newSegmentPostfix);
        spec.setSegmentNameGeneratorSpec(segmentNameGeneratorSpec);
    }
}

