/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.cli;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.rvesse.airline.annotations.Arguments;
import com.github.rvesse.airline.annotations.Command;
import com.github.rvesse.airline.annotations.Option;
import com.github.rvesse.airline.annotations.restrictions.Required;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import io.netty.util.SuppressForbidden;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.nio.charset.Charset;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.druid.cli.GuiceRunnable;
import org.apache.druid.cli.QueryJettyServerInitializer;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.curator.ZkEnablementConfig;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.CacheModule;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.IndexingServiceFirehoseModule;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.guice.IndexingServiceTaskLogsModule;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.ManageLifecycleServer;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.QueryablePeonModule;
import org.apache.druid.guice.SegmentWranglerModule;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.guice.annotations.AttemptId;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Parent;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.DeepStorageShuffleClient;
import org.apache.druid.indexing.common.task.batch.parallel.HttpShuffleClient;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProviderImpl;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.worker.executor.ExecutorLifecycle;
import org.apache.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.segment.handoff.CoordinatorBasedSegmentHandoffNotifierConfig;
import org.apache.druid.segment.handoff.CoordinatorBasedSegmentHandoffNotifierFactory;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.OmniDataSegmentArchiver;
import org.apache.druid.segment.loading.OmniDataSegmentKiller;
import org.apache.druid.segment.loading.OmniDataSegmentMover;
import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.PeonAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
import org.apache.druid.server.http.HistoricalResource;
import org.apache.druid.server.http.SegmentListerResource;
import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.tasklogs.TaskPayloadManager;
import org.eclipse.jetty.server.Server;

@Command(name="peon", description="Runs a Peon, this is an individual forked \"task\" used as part of the indexing service. This should rarely, if ever, be used directly. See https://druid.apache.org/docs/latest/design/peons.html for a description")
public class CliPeon
extends GuiceRunnable {
    @Required
    @Arguments(description="taskDirPath attemptId")
    public List<String> taskAndStatusFile;
    private String taskDirPath;
    private String attemptId;
    @Option(name={"--nodeType"}, title={"nodeType"}, description="Set the node type to expose on ZK")
    public String serverType = "indexer-executor";
    private boolean isZkEnabled = true;
    @Option(name={"--loadBroadcastSegments"}, title={"loadBroadcastSegments"}, description="Enable loading of broadcast segments")
    public String loadBroadcastSegments = "false";
    @Option(name={"--taskId"}, title={"taskId"}, description="TaskId for fetching task.json remotely")
    public String taskId = "";
    private static final Logger log = new Logger(CliPeon.class);
    private Properties properties;

    public CliPeon() {
        super(log);
    }

    @Inject
    public void configure(Properties properties) {
        this.properties = properties;
        this.isZkEnabled = ZkEnablementConfig.isEnabled((Properties)properties);
    }

    @Override
    protected List<? extends Module> getModules() {
        return ImmutableList.of((Object)new DruidProcessingModule(), (Object)new QueryableModule(), (Object)new QueryRunnerFactoryModule(), (Object)new SegmentWranglerModule(), (Object)new JoinableFactoryModule(), (Object)new IndexingServiceTaskLogsModule(), (Object)new Module(){

            @SuppressForbidden(reason="System#out, System#err")
            public void configure(Binder binder) {
                CliPeon.this.taskDirPath = CliPeon.this.taskAndStatusFile.get(0);
                CliPeon.this.attemptId = CliPeon.this.taskAndStatusFile.get(1);
                String serverViewType = (String)CliPeon.this.properties.getOrDefault((Object)"druid.serverview.type", "http");
                if (Boolean.parseBoolean(CliPeon.this.properties.getProperty("druid.centralizedDatasourceSchema.enabled")) && !serverViewType.equals("http")) {
                    throw DruidException.forPersona((DruidException.Persona)DruidException.Persona.ADMIN).ofCategory(DruidException.Category.UNSUPPORTED).build(StringUtils.format((String)"CentralizedDatasourceSchema feature is incompatible with config %1$s=%2$s. Please consider switching to http based segment discovery (set %1$s=%3$s) or disable the feature (set %4$s=false).", (Object[])new Object[]{"druid.serverview.type", serverViewType, "http", "druid.centralizedDatasourceSchema.enabled"}), new Object[0]);
                }
                binder.bindConstant().annotatedWith((Annotation)Names.named((String)"serviceName")).to("druid/peon");
                binder.bindConstant().annotatedWith((Annotation)Names.named((String)"servicePort")).to(0);
                binder.bindConstant().annotatedWith((Annotation)Names.named((String)"tlsServicePort")).to(-1);
                binder.bind(ResponseContextConfig.class).toInstance((Object)ResponseContextConfig.newConfig((boolean)true));
                binder.bindConstant().annotatedWith(AttemptId.class).to(CliPeon.this.attemptId);
                JsonConfigProvider.bind((Binder)binder, (String)"druid.centralizedDatasourceSchema", CentralizedDatasourceSchemaConfig.class);
                JsonConfigProvider.bind((Binder)binder, (String)"druid.task.executor", DruidNode.class, Parent.class);
                CliPeon.bindRowIngestionMeters(binder);
                CliPeon.bindChatHandler(binder);
                CliPeon.configureIntermediaryData(binder);
                CliPeon.bindTaskConfigAndClients(binder);
                CliPeon.bindPeonDataSegmentHandlers(binder);
                binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);
                LifecycleModule.register((Binder)binder, ExecutorLifecycle.class);
                ExecutorLifecycleConfig executorLifecycleConfig = new ExecutorLifecycleConfig().setTaskFile(Paths.get(CliPeon.this.taskDirPath, "task.json").toFile()).setStatusFile(Paths.get(CliPeon.this.taskDirPath, "attempt", CliPeon.this.attemptId, "status.json").toFile());
                if (CliPeon.this.properties.getProperty("druid.indexer.runner.type", "").contains("k8s")) {
                    log.info("Running peon in k8s mode", new Object[0]);
                    executorLifecycleConfig.setParentStreamDefined(false);
                }
                binder.bind(ExecutorLifecycleConfig.class).toInstance((Object)executorLifecycleConfig);
                binder.bind(TaskReportFileWriter.class).toInstance((Object)new SingleFileTaskReportFileWriter(Paths.get(CliPeon.this.taskDirPath, "attempt", CliPeon.this.attemptId, "report.json").toFile()));
                binder.bind(TaskRunner.class).to(SingleTaskBackgroundRunner.class);
                binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class);
                binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycleServer.class);
                CliPeon.bindRealtimeCache(binder);
                CliPeon.bindCoordinatorHandoffNotifer(binder);
                binder.bind(AppenderatorsManager.class).to(PeonAppenderatorsManager.class).in(LazySingleton.class);
                binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
                Jerseys.addResource((Binder)binder, SegmentListerResource.class);
                binder.bind(ServerTypeConfig.class).toInstance((Object)new ServerTypeConfig(ServerType.fromString((String)CliPeon.this.serverType)));
                LifecycleModule.register((Binder)binder, Server.class);
                if ("true".equals(CliPeon.this.loadBroadcastSegments)) {
                    binder.install((Module)new BroadcastSegmentLoadingModule());
                }
            }

            @Provides
            @LazySingleton
            @Named(value="heartbeat")
            public Supplier<Map<String, Object>> heartbeatDimensions(Task task) {
                return Suppliers.ofInstance((Object)ImmutableMap.of((Object)"taskId", (Object)task.getId(), (Object)"dataSource", (Object)task.getDataSource(), (Object)"taskType", (Object)task.getType(), (Object)"groupId", (Object)task.getGroupId()));
            }

            @Provides
            @LazySingleton
            public Task readTask(@Json ObjectMapper mapper, ExecutorLifecycleConfig config, TaskPayloadManager taskPayloadManager) {
                try {
                    if (!config.getTaskFile().exists() || config.getTaskFile().length() == 0L) {
                        log.info("Task file not found, trying to pull task payload from deep storage", new Object[0]);
                        String task = IOUtils.toString((InputStream)((InputStream)taskPayloadManager.streamTaskPayload(CliPeon.this.taskId).get()), (Charset)Charset.defaultCharset());
                        FileUtils.write((File)config.getTaskFile(), (CharSequence)task, (Charset)Charset.defaultCharset());
                    }
                    return (Task)mapper.readValue(config.getTaskFile(), Task.class);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            @Provides
            @LazySingleton
            @Named(value="druidDataSource")
            public String getDataSourceFromTask(Task task) {
                return task.getDataSource();
            }

            @Provides
            @LazySingleton
            @Named(value="druidTaskId")
            public String getTaskIDFromTask(Task task) {
                return task.getId();
            }
        }, (Object)new QueryablePeonModule(), (Object)new IndexingServiceFirehoseModule(), (Object)new IndexingServiceInputSourceModule(), (Object)new IndexingServiceTuningConfigModule(), (Object)new InputSourceModule(), (Object[])new Module[]{new ChatHandlerServerModule(this.properties), new LookupModule()});
    }

    @Override
    @SuppressForbidden(reason="System#out, System#err")
    public void run() {
        try {
            Injector injector = this.makeInjector((Set<NodeRole>)ImmutableSet.of((Object)NodeRole.PEON));
            try {
                Lifecycle lifecycle = this.initLifecycle(injector);
                Thread hook = new Thread(() -> {
                    log.info("Running shutdown hook", new Object[0]);
                    lifecycle.stop();
                });
                Runtime.getRuntime().addShutdownHook(hook);
                ((ExecutorLifecycle)injector.getInstance(ExecutorLifecycle.class)).join();
                Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
                for (Thread thread : threadSet) {
                    if (thread.isDaemon() || thread == Thread.currentThread()) continue;
                    log.info("Thread [%s] is non daemon.", new Object[]{thread});
                }
                lifecycle.stop();
                try {
                    Runtime.getRuntime().removeShutdownHook(hook);
                }
                catch (IllegalStateException e) {
                    System.err.println("Cannot remove shutdown hook, already shutting down!");
                }
            }
            catch (Throwable t) {
                System.err.println("Error!");
                System.err.println(Throwables.getStackTraceAsString((Throwable)t));
                System.exit(1);
            }
            System.out.println("Finished peon task");
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    static void bindRowIngestionMeters(Binder binder) {
        PolyBind.createChoice((Binder)binder, (String)"druid.indexer.task.rowIngestionMeters.type", (Key)Key.get(RowIngestionMetersFactory.class), (Key)Key.get(DropwizardRowIngestionMetersFactory.class));
        MapBinder rowIngestionMetersHandlerProviderBinder = PolyBind.optionBinder((Binder)binder, (Key)Key.get(RowIngestionMetersFactory.class));
        rowIngestionMetersHandlerProviderBinder.addBinding((Object)"dropwizard").to(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
        binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
    }

    static void bindChatHandler(Binder binder) {
        PolyBind.createChoice((Binder)binder, (String)"druid.indexer.task.chathandler.type", (Key)Key.get(ChatHandlerProvider.class), (Key)Key.get(ServiceAnnouncingChatHandlerProvider.class));
        MapBinder handlerProviderBinder = PolyBind.optionBinder((Binder)binder, (Key)Key.get(ChatHandlerProvider.class));
        handlerProviderBinder.addBinding((Object)"announce").to(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);
        handlerProviderBinder.addBinding((Object)"noop").to(NoopChatHandlerProvider.class).in(LazySingleton.class);
        binder.bind(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);
        binder.bind(NoopChatHandlerProvider.class).in(LazySingleton.class);
    }

    static void bindPeonDataSegmentHandlers(Binder binder) {
        Binders.dataSegmentKillerBinder((Binder)binder);
        binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class);
        Binders.dataSegmentMoverBinder((Binder)binder);
        binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class);
        Binders.dataSegmentArchiverBinder((Binder)binder);
        binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class);
    }

    private static void configureTaskActionClient(Binder binder) {
        PolyBind.createChoice((Binder)binder, (String)"druid.peon.mode", (Key)Key.get(TaskActionClientFactory.class), (Key)Key.get(RemoteTaskActionClientFactory.class));
        MapBinder taskActionBinder = PolyBind.optionBinder((Binder)binder, (Key)Key.get(TaskActionClientFactory.class));
        taskActionBinder.addBinding((Object)"local").to(LocalTaskActionClientFactory.class).in(LazySingleton.class);
        JsonConfigProvider.bind((Binder)binder, (String)"druid.indexer.storage", TaskStorageConfig.class);
        binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class);
        binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
        binder.bind(IndexerMetadataStorageCoordinator.class).to(IndexerSQLMetadataStorageCoordinator.class).in(LazySingleton.class);
        taskActionBinder.addBinding((Object)"remote").to(RemoteTaskActionClientFactory.class).in(LazySingleton.class);
        binder.bind(NodeRole.class).annotatedWith(Self.class).toInstance((Object)NodeRole.PEON);
    }

    static void bindTaskConfigAndClients(Binder binder) {
        binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
        JsonConfigProvider.bind((Binder)binder, (String)"druid.indexer.task", TaskConfig.class);
        JsonConfigProvider.bind((Binder)binder, (String)"druid.indexer.auditlog", TaskAuditLogConfig.class);
        JsonConfigProvider.bind((Binder)binder, (String)"druid.peon.taskActionClient.retry", RetryPolicyConfig.class);
        CliPeon.configureTaskActionClient(binder);
        binder.bind(ParallelIndexSupervisorTaskClientProvider.class).to(ParallelIndexSupervisorTaskClientProviderImpl.class).in(LazySingleton.class);
        binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
    }

    static void bindRealtimeCache(Binder binder) {
        JsonConfigProvider.bind((Binder)binder, (String)"druid.realtime.cache", CacheConfig.class);
        binder.install((Module)new CacheModule());
    }

    static void bindCoordinatorHandoffNotifer(Binder binder) {
        JsonConfigProvider.bind((Binder)binder, (String)"druid.segment.handoff", CoordinatorBasedSegmentHandoffNotifierConfig.class);
        binder.bind(SegmentHandoffNotifierFactory.class).to(CoordinatorBasedSegmentHandoffNotifierFactory.class).in(LazySingleton.class);
    }

    static void configureIntermediaryData(Binder binder) {
        PolyBind.createChoice((Binder)binder, (String)"druid.processing.intermediaryData.storage.type", (Key)Key.get(IntermediaryDataManager.class), (Key)Key.get(LocalIntermediaryDataManager.class));
        MapBinder intermediaryDataManagerBiddy = PolyBind.optionBinder((Binder)binder, (Key)Key.get(IntermediaryDataManager.class));
        intermediaryDataManagerBiddy.addBinding((Object)"local").to(LocalIntermediaryDataManager.class).in(LazySingleton.class);
        intermediaryDataManagerBiddy.addBinding((Object)"deepstore").to(DeepStorageIntermediaryDataManager.class).in(LazySingleton.class);
        PolyBind.createChoice((Binder)binder, (String)"druid.processing.intermediaryData.storage.type", (Key)Key.get(ShuffleClient.class), (Key)Key.get(HttpShuffleClient.class));
        MapBinder shuffleClientBiddy = PolyBind.optionBinder((Binder)binder, (Key)Key.get(ShuffleClient.class));
        shuffleClientBiddy.addBinding((Object)"local").to(HttpShuffleClient.class).in(LazySingleton.class);
        shuffleClientBiddy.addBinding((Object)"deepstore").to(DeepStorageShuffleClient.class).in(LazySingleton.class);
    }

    public class BroadcastSegmentLoadingModule
    implements Module {
        public void configure(Binder binder) {
            binder.bind(SegmentManager.class).in(LazySingleton.class);
            binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
            Jerseys.addResource((Binder)binder, HistoricalResource.class);
            if (CliPeon.this.isZkEnabled) {
                LifecycleModule.register((Binder)binder, ZkCoordinator.class);
            }
        }

        @Provides
        @LazySingleton
        public List<StorageLocation> getCliPeonStorageLocations(TaskConfig config) {
            File broadcastStorage = new File(new File(CliPeon.this.taskDirPath, "broadcast"), "segments");
            return ImmutableList.of((Object)new StorageLocation(broadcastStorage, config.getTmpStorageBytesPerTask(), null));
        }
    }
}

