/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.hudi.streaming;

import java.io.Serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.AvroConversionUtils$;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.IncrementalRelation;
import org.apache.hudi.MergeOnReadIncrementalRelation;
import org.apache.hudi.MergeOnReadIncrementalRelation$;
import org.apache.hudi.SparkAdapterSupport;
import org.apache.hudi.cdc.CDCRelation$;
import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.hudi.SparkAdapter;
import org.apache.spark.sql.hudi.streaming.HoodieEarliestOffsetRangeLimit$;
import org.apache.spark.sql.hudi.streaming.HoodieLatestOffsetRangeLimit$;
import org.apache.spark.sql.hudi.streaming.HoodieMetadataLog;
import org.apache.spark.sql.hudi.streaming.HoodieOffsetRangeLimit;
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset;
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset$;
import org.apache.spark.sql.hudi.streaming.HoodieSpecifiedOffsetRangeLimit;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005=g\u0001B\f\u0019\u0001\u0015B\u0001B\u0012\u0001\u0003\u0002\u0003\u0006Ia\u0012\u0005\t\u0017\u0002\u0011\t\u0011)A\u0005\u0019\"Aq\u000b\u0001B\u0001B\u0003%\u0001\f\u0003\u0005b\u0001\t\u0005\t\u0015!\u0003c\u0011!)\u0007A!A!\u0002\u00131\u0007\"\u00026\u0001\t\u0003Y\u0007b\u0002:\u0001\u0005\u0004%Ia\u001d\u0005\b\u0003\u000b\u0001\u0001\u0015!\u0003u\u0011)\ty\u0001\u0001EC\u0002\u0013%\u0011\u0011\u0003\u0005\u000b\u00033\u0001\u0001R1A\u0005\n\u0005m\u0001BCA\u0017\u0001!\u0015\r\u0011\"\u0003\u00020!I\u0011Q\b\u0001C\u0002\u0013%\u0011q\b\u0005\t\u0003\u000f\u0002\u0001\u0015!\u0003\u0002B!I\u0011\u0011\n\u0001C\u0002\u0013%\u00111\n\u0005\t\u0003o\u0002\u0001\u0015!\u0003\u0002N!Q\u0011\u0011\u0010\u0001\t\u0006\u0004%I!a\u001f\t\u000f\u0005\u0015\u0005\u0001\"\u0011\u0002\b\"9\u0011\u0011\u0012\u0001\u0005\n\u0005-\u0005bBAH\u0001\u0011\u0005\u0013\u0011\u0013\u0005\b\u00037\u0003A\u0011IAO\u0011\u001d\ti\f\u0001C\u0005\u0003\u007fCq!!2\u0001\t\u0003\n9M\u0001\nI_>$\u0017.Z*ue\u0016\fWnU8ve\u000e,'BA\r\u001b\u0003%\u0019HO]3b[&twM\u0003\u0002\u001c9\u0005!\u0001.\u001e3j\u0015\tib$A\u0002tc2T!a\b\u0011\u0002\u000bM\u0004\u0018M]6\u000b\u0005\u0005\u0012\u0013AB1qC\u000eDWMC\u0001$\u0003\ry'oZ\u0002\u0001'\u0019\u0001aEL\u001b<\u0003B\u0011q\u0005L\u0007\u0002Q)\u0011\u0011FK\u0001\u0005Y\u0006twMC\u0001,\u0003\u0011Q\u0017M^1\n\u00055B#AB(cU\u0016\u001cG\u000f\u0005\u00020g5\t\u0001G\u0003\u0002\u001ac)\u0011!\u0007H\u0001\nKb,7-\u001e;j_:L!\u0001\u000e\u0019\u0003\rM{WO]2f!\t1\u0014(D\u00018\u0015\tAd$\u0001\u0005j]R,'O\\1m\u0013\tQtGA\u0004M_\u001e<\u0017N\\4\u0011\u0005qzT\"A\u001f\u000b\u0003y\nQa]2bY\u0006L!\u0001Q\u001f\u0003\u0019M+'/[1mSj\f'\r\\3\u0011\u0005\t#U\"A\"\u000b\u0005m\u0001\u0013BA#D\u0005M\u0019\u0006/\u0019:l\u0003\u0012\f\u0007\u000f^3s'V\u0004\bo\u001c:u\u0003)\u0019\u0018\u000f\\\"p]R,\u0007\u0010\u001e\t\u0003\u0011&k\u0011\u0001H\u0005\u0003\u0015r\u0011!bU)M\u0007>tG/\u001a=u\u00031iW\r^1eCR\f\u0007+\u0019;i!\tiEK\u0004\u0002O%B\u0011q*P\u0007\u0002!*\u0011\u0011\u000bJ\u0001\u0007yI|w\u000e\u001e \n\u0005Mk\u0014A\u0002)sK\u0012,g-\u0003\u0002V-\n11\u000b\u001e:j]\u001eT!aU\u001f\u0002\u0019M\u001c\u0007.Z7b\u001fB$\u0018n\u001c8\u0011\u0007qJ6,\u0003\u0002[{\t1q\n\u001d;j_:\u0004\"\u0001X0\u000e\u0003uS!A\u0018\u000f\u0002\u000bQL\b/Z:\n\u0005\u0001l&AC*ueV\u001cG\u000fV=qK\u0006Q\u0001/\u0019:b[\u0016$XM]:\u0011\t5\u001bG\nT\u0005\u0003IZ\u00131!T1q\u0003AygMZ:fiJ\u000bgnZ3MS6LG\u000f\u0005\u0002hQ6\t\u0001$\u0003\u0002j1\t1\u0002j\\8eS\u0016|eMZ:fiJ\u000bgnZ3MS6LG/\u0001\u0004=S:LGO\u0010\u000b\u0007Y6tw\u000e]9\u0011\u0005\u001d\u0004\u0001\"\u0002$\u0007\u0001\u00049\u0005\"B&\u0007\u0001\u0004a\u0005\"B,\u0007\u0001\u0004A\u0006\"B1\u0007\u0001\u0004\u0011\u0007\"B3\u0007\u0001\u00041\u0017aC:u_J\fw-Z\"p]\u001a,\u0012\u0001\u001e\t\u0004kbTX\"\u0001<\u000b\u0005]\u001c\u0015aB:u_J\fw-Z\u0005\u0003sZ\u0014Ac\u0015;pe\u0006<WmQ8oM&<WO]1uS>t\u0007cA>\u0002\u00025\tAP\u0003\u0002~}\u0006!1m\u001c8g\u0015\ty\b%\u0001\u0004iC\u0012|w\u000e]\u0005\u0004\u0003\u0007a(!D\"p]\u001aLw-\u001e:bi&|g.\u0001\u0007ti>\u0014\u0018mZ3D_:4\u0007\u0005K\u0002\t\u0003\u0013\u00012\u0001PA\u0006\u0013\r\ti!\u0010\u0002\niJ\fgn]5f]R\f\u0011\u0002^1cY\u0016\u0004\u0016\r\u001e5\u0016\u0005\u0005M\u0001cA;\u0002\u0016%\u0019\u0011q\u0003<\u0003\u0017M#xN]1hKB\u000bG\u000f[\u0001\u000b[\u0016$\u0018m\u00117jK:$XCAA\u000f!\u0011\ty\"!\u000b\u000e\u0005\u0005\u0005\"\u0002BA\u0012\u0003K\tQ\u0001^1cY\u0016T1!a\nD\u0003\u0019\u0019w.\\7p]&!\u00111FA\u0011\u0005UAun\u001c3jKR\u000b'\r\\3NKR\f7\t\\5f]R\f\u0011\u0002^1cY\u0016$\u0016\u0010]3\u0016\u0005\u0005E\u0002\u0003BA\u001a\u0003si!!!\u000e\u000b\t\u0005]\u0012QE\u0001\u0006[>$W\r\\\u0005\u0005\u0003w\t)DA\bI_>$\u0017.\u001a+bE2,G+\u001f9f\u0003)I7o\u0011#D#V,'/_\u000b\u0003\u0003\u0003\u00022\u0001PA\"\u0013\r\t)%\u0010\u0002\b\u0005>|G.Z1o\u0003-I7o\u0011#D#V,'/\u001f\u0011\u0002)!|G\u000e\\8x\u0007>lW.\u001b;IC:$G.\u001b8h+\t\ti\u0005\u0005\u0003\u0002P\u0005Ed\u0002BA)\u0003WrA!a\u0015\u0002h9!\u0011QKA3\u001d\u0011\t9&a\u0019\u000f\t\u0005e\u0013\u0011\r\b\u0005\u00037\nyFD\u0002P\u0003;J\u0011aI\u0005\u0003C\tJ!a\u0007\u0011\n\u0007\u0005\u001d2)\u0003\u0003\u0002$\u0005\u0015\u0012\u0002BA5\u0003C\t\u0001\u0002^5nK2Lg.Z\u0005\u0005\u0003[\ny'A\u0007US6,G.\u001b8f+RLGn\u001d\u0006\u0005\u0003S\n\t#\u0003\u0003\u0002t\u0005U$\u0001\u0006%pY2|woQ8n[&$\b*\u00198eY&twM\u0003\u0003\u0002n\u0005=\u0014!\u00065pY2|woQ8n[&$\b*\u00198eY&tw\rI\u0001\u000fS:LG/[1m\u001f\u001a47/\u001a;t+\t\ti\bE\u0002h\u0003\u007fJ1!!!\u0019\u0005IAun\u001c3jKN{WO]2f\u001f\u001a47/\u001a;)\u0007A\tI!\u0001\u0004tG\",W.Y\u000b\u00027\u0006yq-\u001a;MCR,7\u000f^(gMN,G/\u0006\u0002\u0002\u000eB!A(WA?\u0003%9W\r^(gMN,G/\u0006\u0002\u0002\u0014B!A(WAK!\ry\u0013qS\u0005\u0004\u00033\u0003$AB(gMN,G/\u0001\u0005hKR\u0014\u0015\r^2i)\u0019\ty*!.\u0002:B!\u0011\u0011UAX\u001d\u0011\t\u0019+a+\u000f\t\u0005\u0015\u0016\u0011\u0016\b\u0005\u00033\n9+\u0003\u0002 A%\u0011QDH\u0005\u0004\u0003[c\u0012a\u00029bG.\fw-Z\u0005\u0005\u0003c\u000b\u0019LA\u0005ECR\fgI]1nK*\u0019\u0011Q\u0016\u000f\t\u000f\u0005]F\u00031\u0001\u0002\u0014\u0006)1\u000f^1si\"9\u00111\u0018\u000bA\u0002\u0005U\u0015aA3oI\u0006y1\u000f^1si\u000e{W.\\5u)&lW\rF\u0002M\u0003\u0003Dq!a1\u0016\u0001\u0004\ti(A\u0006ti\u0006\u0014Ho\u00144gg\u0016$\u0018\u0001B:u_B$\"!!3\u0011\u0007q\nY-C\u0002\u0002Nv\u0012A!\u00168ji\u0002")
public class HoodieStreamSource
implements Source,
Logging,
scala.Serializable,
SparkAdapterSupport {
    private StoragePath tablePath;
    private HoodieTableMetaClient metaClient;
    private HoodieTableType tableType;
    private transient HoodieSourceOffset initialOffsets;
    private final SQLContext sqlContext;
    private final String metadataPath;
    private final Option<StructType> schemaOption;
    private final Map<String, String> parameters;
    private final HoodieOffsetRangeLimit offsetRangeLimit;
    private final transient StorageConfiguration<Configuration> storageConf;
    private final boolean isCDCQuery;
    private final TimelineUtils.HollowCommitHandling hollowCommitHandling;
    private SparkAdapter sparkAdapter;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile byte bitmap$0;
    private volatile transient boolean bitmap$trans$0;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public void commit(org.apache.spark.sql.execution.streaming.Offset end) {
        Source.commit$((Source)this, (org.apache.spark.sql.execution.streaming.Offset)end);
    }

    public Offset initialOffset() {
        return Source.initialOffset$((Source)this);
    }

    public Offset deserializeOffset(String json) {
        return Source.deserializeOffset$((Source)this, (String)json);
    }

    public void commit(Offset end) {
        Source.commit$((Source)this, (Offset)end);
    }

    private SparkAdapter sparkAdapter$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 8) == 0) {
                this.sparkAdapter = SparkAdapterSupport.sparkAdapter$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 8);
            }
        }
        return this.sparkAdapter;
    }

    @Override
    public SparkAdapter sparkAdapter() {
        if ((byte)(this.bitmap$0 & 8) == 0) {
            return this.sparkAdapter$lzycompute();
        }
        return this.sparkAdapter;
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private StorageConfiguration<Configuration> storageConf() {
        return this.storageConf;
    }

    private StoragePath tablePath$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                StoragePath path = new StoragePath((String)this.parameters.getOrElse((Object)"path", (Function0 & Serializable & scala.Serializable)() -> "Missing 'path' option"));
                HoodieHadoopStorage fs = new HoodieHadoopStorage(path, this.storageConf());
                this.tablePath = TablePathUtils.getTablePath(fs, path).get();
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.tablePath;
    }

    private StoragePath tablePath() {
        if ((byte)(this.bitmap$0 & 1) == 0) {
            return this.tablePath$lzycompute();
        }
        return this.tablePath;
    }

    private HoodieTableMetaClient metaClient$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.metaClient = HoodieTableMetaClient.builder().setConf(this.storageConf().newInstance()).setBasePath(this.tablePath().toString()).build();
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.metaClient;
    }

    private HoodieTableMetaClient metaClient() {
        if ((byte)(this.bitmap$0 & 2) == 0) {
            return this.metaClient$lzycompute();
        }
        return this.metaClient;
    }

    private HoodieTableType tableType$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 4) == 0) {
                this.tableType = this.metaClient().getTableType();
                this.bitmap$0 = (byte)(this.bitmap$0 | 4);
            }
        }
        return this.tableType;
    }

    private HoodieTableType tableType() {
        if ((byte)(this.bitmap$0 & 4) == 0) {
            return this.tableType$lzycompute();
        }
        return this.tableType;
    }

    private boolean isCDCQuery() {
        return this.isCDCQuery;
    }

    private TimelineUtils.HollowCommitHandling hollowCommitHandling() {
        return this.hollowCommitHandling;
    }

    private HoodieSourceOffset initialOffsets$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if (!this.bitmap$trans$0) {
                HoodieMetadataLog metadataLog = new HoodieMetadataLog(this.sqlContext.sparkSession(), this.metadataPath);
                this.initialOffsets = (HoodieSourceOffset)((Object)metadataLog.get(0L).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
                    HoodieSourceOffset hoodieSourceOffset;
                    HoodieOffsetRangeLimit hoodieOffsetRangeLimit = $this.offsetRangeLimit;
                    if (HoodieEarliestOffsetRangeLimit$.MODULE$.equals(hoodieOffsetRangeLimit)) {
                        hoodieSourceOffset = HoodieSourceOffset$.MODULE$.INIT_OFFSET();
                    } else if (HoodieLatestOffsetRangeLimit$.MODULE$.equals(hoodieOffsetRangeLimit)) {
                        hoodieSourceOffset = (HoodieSourceOffset)((Object)((Object)this.getLatestOffset().getOrElse((Function0 & Serializable & scala.Serializable)() -> HoodieSourceOffset$.MODULE$.INIT_OFFSET())));
                    } else if (hoodieOffsetRangeLimit instanceof HoodieSpecifiedOffsetRangeLimit) {
                        HoodieSpecifiedOffsetRangeLimit hoodieSpecifiedOffsetRangeLimit = (HoodieSpecifiedOffsetRangeLimit)hoodieOffsetRangeLimit;
                        String instantTime = hoodieSpecifiedOffsetRangeLimit.instantTime();
                        hoodieSourceOffset = new HoodieSourceOffset(instantTime);
                    } else {
                        throw new MatchError((Object)hoodieOffsetRangeLimit);
                    }
                    HoodieSourceOffset offset = hoodieSourceOffset;
                    metadataLog.add(0L, (Object)offset);
                    this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(22).append("The initial offset is ").append((Object)offset).toString());
                    return offset;
                }));
                this.bitmap$trans$0 = true;
            }
        }
        return this.initialOffsets;
    }

    private HoodieSourceOffset initialOffsets() {
        if (!this.bitmap$trans$0) {
            return this.initialOffsets$lzycompute();
        }
        return this.initialOffsets;
    }

    public StructType schema() {
        if (this.isCDCQuery()) {
            return CDCRelation$.MODULE$.FULL_CDC_SPARK_SCHEMA();
        }
        return (StructType)this.schemaOption.getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            TableSchemaResolver schemaUtil = new TableSchemaResolver(this.metaClient());
            return AvroConversionUtils$.MODULE$.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema());
        });
    }

    private Option<HoodieSourceOffset> getLatestOffset() {
        HoodieTimeline filteredTimeline;
        this.metaClient().reloadActiveTimeline();
        HoodieTimeline hoodieTimeline = filteredTimeline = TimelineUtils.handleHollowCommitIfNeeded(this.metaClient().getActiveTimeline().filterCompletedInstants(), this.metaClient(), this.hollowCommitHandling());
        if (!hoodieTimeline.empty()) {
            TimelineUtils.HollowCommitHandling hollowCommitHandling = this.hollowCommitHandling();
            TimelineUtils.HollowCommitHandling hollowCommitHandling2 = TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;
            String timestamp = !(hollowCommitHandling != null ? !((Object)((Object)hollowCommitHandling)).equals((Object)hollowCommitHandling2) : hollowCommitHandling2 != null) ? hoodieTimeline.getInstantsOrderedByStateTransitionTime().skip(hoodieTimeline.countInstants() - 1).findFirst().get().getStateTransitionTime() : hoodieTimeline.lastInstant().get().getTimestamp();
            return new Some((Object)new HoodieSourceOffset(timestamp));
        }
        return None$.MODULE$;
    }

    public Option<org.apache.spark.sql.execution.streaming.Offset> getOffset() {
        return this.getLatestOffset();
    }

    public Dataset<Row> getBatch(Option<org.apache.spark.sql.execution.streaming.Offset> start2, org.apache.spark.sql.execution.streaming.Offset end) {
        RDD<Row> rDD;
        HoodieSourceOffset startOffset = (HoodieSourceOffset)((Object)start2.map((Function1 & Serializable & scala.Serializable)x$1 -> HoodieSourceOffset$.MODULE$.apply((org.apache.spark.sql.execution.streaming.Offset)x$1)).getOrElse((Function0 & Serializable & scala.Serializable)() -> this.initialOffsets()));
        HoodieSourceOffset endOffset = HoodieSourceOffset$.MODULE$.apply(end);
        HoodieSourceOffset hoodieSourceOffset = startOffset;
        HoodieSourceOffset hoodieSourceOffset2 = endOffset;
        if (!(hoodieSourceOffset != null ? !((Object)((Object)hoodieSourceOffset)).equals((Object)hoodieSourceOffset2) : hoodieSourceOffset2 != null)) {
            return this.sqlContext.internalCreateDataFrame(this.sqlContext.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(InternalRow.class)).setName("empty"), this.schema(), true);
        }
        if (this.isCDCQuery()) {
            Map cdcOptions = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key()), (Object)this.startCommitTime(startOffset)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key()), (Object)endOffset.commitTime())}));
            RDD<InternalRow> rdd = CDCRelation$.MODULE$.getCDCRelation(this.sqlContext, this.metaClient(), (Map<String, String>)cdcOptions).buildScan0(HoodieCDCUtils.CDC_COLUMNS, (Filter[])Array$.MODULE$.empty(ClassTag$.MODULE$.apply(Filter.class)));
            return this.sqlContext.sparkSession().internalCreateDataFrame(rdd, CDCRelation$.MODULE$.FULL_CDC_SPARK_SCHEMA(), true);
        }
        Map incParams = this.parameters.$plus$plus((GenTraversableOnce)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE().key()), (Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key()), (Object)this.startCommitTime(startOffset)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key()), (Object)endOffset.commitTime()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key()), (Object)this.hollowCommitHandling().name())})));
        HoodieTableType hoodieTableType = this.tableType();
        if (((Object)((Object)HoodieTableType.COPY_ON_WRITE)).equals((Object)hoodieTableType)) {
            SparkRowSerDe serDe = this.sparkAdapter().createSparkRowSerDe(this.schema());
            rDD = new IncrementalRelation(this.sqlContext, (Map<String, String>)incParams, (Option<StructType>)new Some((Object)this.schema()), this.metaClient()).buildScan().map((Function1 & Serializable & scala.Serializable)x$1 -> serDe.serializeRow((Row)x$1), ClassTag$.MODULE$.apply(InternalRow.class));
        } else if (((Object)((Object)HoodieTableType.MERGE_ON_READ)).equals((Object)hoodieTableType)) {
            String[] requiredColumns = (String[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])this.schema().fields())).map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.name(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)));
            rDD = new MergeOnReadIncrementalRelation(this.sqlContext, (Map<String, String>)incParams, this.metaClient(), (Option<StructType>)new Some((Object)this.schema()), MergeOnReadIncrementalRelation$.MODULE$.$lessinit$greater$default$5()).buildScan(requiredColumns, (Filter[])Array$.MODULE$.empty(ClassTag$.MODULE$.apply(Filter.class)));
        } else {
            throw new IllegalArgumentException(new StringBuilder(21).append("UnSupport tableType: ").append((Object)this.tableType()).toString());
        }
        RDD<Row> rdd = rDD;
        return this.sqlContext.internalCreateDataFrame((RDD)rdd, this.schema(), true);
    }

    private String startCommitTime(HoodieSourceOffset startOffset) {
        HoodieSourceOffset hoodieSourceOffset = startOffset;
        HoodieSourceOffset hoodieSourceOffset2 = HoodieSourceOffset$.MODULE$.INIT_OFFSET();
        HoodieSourceOffset hoodieSourceOffset3 = hoodieSourceOffset;
        if (!(hoodieSourceOffset2 != null ? !((Object)((Object)hoodieSourceOffset2)).equals((Object)hoodieSourceOffset3) : hoodieSourceOffset3 != null)) {
            return startOffset.commitTime();
        }
        if (hoodieSourceOffset != null) {
            String commitTime = hoodieSourceOffset.commitTime();
            return commitTime;
        }
        throw new IllegalStateException("UnKnow offset type.");
    }

    public void stop() {
    }

    public HoodieStreamSource(SQLContext sqlContext, String metadataPath, Option<StructType> schemaOption, Map<String, String> parameters, HoodieOffsetRangeLimit offsetRangeLimit) {
        this.sqlContext = sqlContext;
        this.metadataPath = metadataPath;
        this.schemaOption = schemaOption;
        this.parameters = parameters;
        this.offsetRangeLimit = offsetRangeLimit;
        Source.$init$((Source)this);
        Logging.$init$((Logging)this);
        SparkAdapterSupport.$init$(this);
        this.storageConf = HadoopFSUtils.getStorageConf(sqlContext.sparkSession().sessionState().newHadoopConf());
        this.isCDCQuery = CDCRelation$.MODULE$.isCDCEnabled(this.metaClient()) && parameters.get((Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE().key()).contains((Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()) && parameters.get((Object)DataSourceReadOptions$.MODULE$.INCREMENTAL_FORMAT().key()).contains((Object)DataSourceReadOptions$.MODULE$.INCREMENTAL_FORMAT_CDC_VAL());
        this.hollowCommitHandling = (TimelineUtils.HollowCommitHandling)((Object)parameters.get((Object)DataSourceReadOptions$.MODULE$.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key()).map((Function1 & Serializable & scala.Serializable)x$1 -> TimelineUtils.HollowCommitHandling.valueOf(x$1)).getOrElse((Function0 & Serializable & scala.Serializable)() -> TimelineUtils.HollowCommitHandling.BLOCK));
    }
}

