package akka.persistence.typed.internal;

import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Scheduler;
import akka.actor.typed.scaladsl.AskPattern$;
import akka.actor.typed.scaladsl.AskPattern$Askable$;
import akka.annotation.InternalApi;
import akka.persistence.query.EventEnvelope;
import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.PersistenceQuery$;
import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.ReplicationId;
import akka.persistence.typed.ReplicationId$;
import akka.persistence.typed.internal.EventSourcedBehaviorImpl;
import akka.persistence.typed.internal.InternalProtocol;
import akka.persistence.typed.internal.Running;
import akka.stream.RestartSettings$;
import akka.stream.SystemMaterializer$;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.RestartSource$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import akka.stream.typed.scaladsl.ActorFlow$;
import akka.util.OptionVal$;
import akka.util.OptionVal$Some$;
import akka.util.Timeout;
import akka.util.Timeout$;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.atomic.AtomicReference;
import scala.Some;
import scala.collection.immutable.Map;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: Running.scala */
@InternalApi
/* loaded from: input_file:akka/persistence/typed/internal/Running$.class */
public final class Running$ {
    public static final Running$ MODULE$ = new Running$();
    private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
    private static final ZoneId UTC = ZoneId.of("UTC");

    public <C, E, S> Running.RunningState<S> startReplicationStream(BehaviorSetup<C, E, S> behaviorSetup, Running.RunningState<S> runningState, ReplicationSetup replicationSetup) {
        ActorSystem system = behaviorSetup.context().system();
        ActorRef self = behaviorSetup.context().self();
        PersistenceQuery apply = PersistenceQuery$.MODULE$.apply(system);
        return (Running.RunningState) replicationSetup.allReplicas().foldLeft(runningState, (runningState2, replicaId) -> {
            ReplicaId replicaId = replicationSetup.replicaId();
            if (replicaId != null ? replicaId.equals(replicaId) : replicaId == null) {
                return runningState2;
            }
            ReplicationId apply2 = ReplicationId$.MODULE$.apply(replicationSetup.replicationContext().replicationId().typeName(), replicationSetup.replicationContext().entityId(), replicaId);
            String str = (String) replicationSetup.allReplicasAndQueryPlugins().apply(replicaId);
            EventsByPersistenceIdQuery readJournalFor = apply.readJournalFor(str);
            Timeout durationToTimeout = Timeout$.MODULE$.durationToTimeout(new package.DurationInt(package$.MODULE$.DurationInt(30)).seconds());
            Scheduler scheduler = behaviorSetup.context().system().scheduler();
            ExecutionContextExecutor executionContext = behaviorSetup.context().system().executionContext();
            final AtomicReference atomicReference = new AtomicReference();
            RestartSource$.MODULE$.withBackoff(RestartSettings$.MODULE$.apply(new package.DurationInt(package$.MODULE$.DurationInt(2)).seconds(), new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds(), 0.2d), () -> {
                return Source$.MODULE$.futureSource(AskPattern$Askable$.MODULE$.ask$extension(AskPattern$.MODULE$.Askable(behaviorSetup.context().self()), actorRef -> {
                    return new EventSourcedBehaviorImpl.GetSeenSequenceNr(replicaId, actorRef);
                }, durationToTimeout, scheduler).map(obj -> {
                    return $anonfun$startReplicationStream$4(readJournalFor, apply2, replicaId, behaviorSetup, str, atomicReference, BoxesRunTime.unboxToLong(obj));
                }, executionContext));
            }).via(ActorFlow$.MODULE$.ask(self, (eventEnvelope, actorRef) -> {
                ReplicatedEventMetadata replicatedEventMetadata = (ReplicatedEventMetadata) eventEnvelope.eventMetadata().get();
                return new InternalProtocol.ReplicatedEventEnvelope(new ReplicatedEvent(eventEnvelope.event(), replicatedEventMetadata.originReplica(), replicatedEventMetadata.originSequenceNr(), replicatedEventMetadata.version()), actorRef);
            }, durationToTimeout).recoverWithRetries(1, new Running$$anonfun$1())).runWith(Sink$.MODULE$.ignore(), SystemMaterializer$.MODULE$.apply(system).materializer());
            return runningState2.copy(runningState2.copy$default$1(), runningState2.copy$default$2(), runningState2.copy$default$3(), runningState2.copy$default$4(), runningState2.copy$default$5(), (Map) runningState2.replicationControl().updated(replicaId, new ReplicationStreamControl(atomicReference, behaviorSetup, replicaId) { // from class: akka.persistence.typed.internal.Running$$anon$1
                private final AtomicReference controlRef$1;
                private final BehaviorSetup setup$1;
                private final ReplicaId replicaId$1;

                @Override // akka.persistence.typed.internal.ReplicationStreamControl
                public void fastForward(long j) {
                    BoxedUnit boxedUnit;
                    ReplicationStreamControl replicationStreamControl = (ReplicationStreamControl) OptionVal$Some$.MODULE$.unapply((ReplicationStreamControl) OptionVal$.MODULE$.apply(this.controlRef$1.get()));
                    if (OptionVal$.MODULE$.isEmpty$extension(replicationStreamControl)) {
                        if (this.setup$1.internalLogger().isDebugEnabled()) {
                            this.setup$1.internalLogger().debug("Ignoring fast forward replica [{}] to [{}], stream not started yet", this.replicaId$1, BoxesRunTime.boxToLong(j));
                            boxedUnit = BoxedUnit.UNIT;
                        } else {
                            boxedUnit = BoxedUnit.UNIT;
                        }
                        return;
                    }
                    ReplicationStreamControl replicationStreamControl2 = (ReplicationStreamControl) OptionVal$.MODULE$.get$extension(replicationStreamControl);
                    if (this.setup$1.internalLogger().isDebugEnabled()) {
                        this.setup$1.internalLogger().debug("Fast forward replica [{}] to [{}]", this.replicaId$1, BoxesRunTime.boxToLong(j));
                    }
                    replicationStreamControl2.fastForward(j);
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }

                {
                    this.controlRef$1 = atomicReference;
                    this.setup$1 = behaviorSetup;
                    this.replicaId$1 = replicaId;
                }
            }));
        });
    }

    private DateTimeFormatter timestampFormatter() {
        return timestampFormatter;
    }

    private ZoneId UTC() {
        return UTC;
    }

    public String formatTimestamp(long j) {
        return timestampFormatter().format(LocalDateTime.ofInstant(Instant.ofEpochMilli(j), UTC()));
    }

    public static final /* synthetic */ boolean $anonfun$startReplicationStream$5(ReplicaId replicaId, BehaviorSetup behaviorSetup, String str, EventEnvelope eventEnvelope) {
        Some eventMetadata = eventEnvelope.eventMetadata();
        if (eventMetadata instanceof Some) {
            Object value = eventMetadata.value();
            if (value instanceof ReplicatedEventMetadata) {
                ReplicaId originReplica = ((ReplicatedEventMetadata) value).originReplica();
                return originReplica != null ? originReplica.equals(replicaId) : replicaId == null;
            }
        }
        throw new IllegalArgumentException(new StringBuilder(53).append(new StringBuilder(53).append("Replication stream from replica ").append(replicaId).append(" for ").append(behaviorSetup.persistenceId()).append(" contains event ").toString()).append(new StringBuilder(45).append("(sequence nr ").append(eventEnvelope.sequenceNr()).append(") without replication metadata. ").toString()).append(new StringBuilder(101).append("Is the persistence id used by a regular event sourced actor there or the journal for that replica (").append(str).append(") ").toString()).append("used that does not support Replicated Event Sourcing?").toString());
    }

    public static final /* synthetic */ Source $anonfun$startReplicationStream$4(EventsByPersistenceIdQuery eventsByPersistenceIdQuery, ReplicationId replicationId, ReplicaId replicaId, BehaviorSetup behaviorSetup, String str, AtomicReference atomicReference, long j) {
        return eventsByPersistenceIdQuery.eventsByPersistenceId(replicationId.persistenceId().id(), j + 1, Long.MAX_VALUE).filter(eventEnvelope -> {
            return BoxesRunTime.boxToBoolean($anonfun$startReplicationStream$5(replicaId, behaviorSetup, str, eventEnvelope));
        }).viaMat(new FastForwardingFilter(), Keep$.MODULE$.right()).mapMaterializedValue(replicationStreamControl -> {
            atomicReference.set(replicationStreamControl);
            return BoxedUnit.UNIT;
        });
    }

    private Running$() {
    }
}
