/*
 * Decompiled with CFR 0.152.
 */
package sttp.tapir.server.netty.sync.internal.reactivestreams;

import java.io.Serializable;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ox.CancellableFork;
import ox.Ox;
import ox.OxUnsupervised;
import ox.channels.BufferCapacity;
import ox.channels.Channel;
import ox.channels.Channel$;
import ox.channels.ChannelClosed;
import ox.channels.Source;
import ox.flow.Flow;
import ox.flow.Flow$;
import ox.flow.FlowRunOps;
import ox.race$package$;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.concurrent.Await$;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.function.JProcedure1;
import scala.util.Left;
import scala.util.control.NonFatal$;
import sttp.tapir.server.netty.sync.internal.ox.OxDispatcher;
import sttp.tapir.server.netty.sync.internal.reactivestreams.ChannelSubscription;

public class OxProcessor<A, B>
implements Processor<A, B> {
    private final OxDispatcher oxDispatcher;
    private final Function1<Flow<A>, Flow<B>> pipeline;
    private final Function1<Subscriber<? super B>, Subscriber<? super B>> wrapSubscriber;
    private final Logger logger;
    private volatile Subscription requestsSubscription;
    private final Channel<A> channel;
    private final FiniteDuration pipelineCancelationTimeout;
    private volatile Future<CancellableFork<BoxedUnit>> pipelineForkFuture;

    public OxProcessor(OxDispatcher oxDispatcher, Function1<Flow<A>, Flow<B>> pipeline, Function1<Subscriber<? super B>, Subscriber<? super B>> wrapSubscriber) {
        this.oxDispatcher = oxDispatcher;
        this.pipeline = pipeline;
        this.wrapSubscriber = wrapSubscriber;
        this.logger = LoggerFactory.getLogger((String)this.getClass().getName());
        this.channel = Channel$.MODULE$.buffered(1);
        this.pipelineCancelationTimeout = new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds();
    }

    public void onError(Throwable reason) {
        if (reason == null) {
            throw null;
        }
        Object object = this.channel.errorOrClosed(reason);
        this.cancelPipelineFork();
    }

    public void onNext(A a) {
        if (a == null) {
            throw new NullPointerException("Element cannot be null");
        }
        Object object = this.channel.sendOrClosed(a);
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
        Object object2 = object;
        if (!(boxedUnit != null ? !boxedUnit.equals(object2) : object2 != null)) {
            return;
        }
        if (object instanceof ChannelClosed) {
            this.cancelSubscription();
            this.onError(new IllegalStateException("onNext called when the channel is closed"));
            return;
        }
        throw new MatchError(object);
    }

    public void onSubscribe(Subscription s) {
        if (s == null) {
            throw new NullPointerException("Subscription cannot be null");
        }
        if (this.requestsSubscription != null) {
            s.cancel();
            return;
        }
        this.requestsSubscription = s;
        s.request(1L);
    }

    public void onComplete() {
        Object object = this.channel.doneOrClosed();
        this.cancelPipelineFork();
    }

    public void subscribe(Subscriber<? super B> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException("Subscriber cannot be null");
        }
        Subscriber wrappedSubscriber = (Subscriber)this.wrapSubscriber.apply(subscriber);
        this.pipelineForkFuture = this.oxDispatcher.runAsync((Function1<Ox, BoxedUnit>)(JProcedure1 & Serializable)evidence$1 -> {
            Source outgoingResponses = ((FlowRunOps)this.pipeline.apply((Object)Flow$.MODULE$.fromSource(this.channel).tap((Function1)(JProcedure1 & Serializable)_$3 -> this.requestsSubscription.request(1L)))).runToChannel((OxUnsupervised)evidence$1, BufferCapacity.package.BufferCapacity$.MODULE$.default());
            ChannelSubscription channelSubscription = new ChannelSubscription(wrappedSubscriber, outgoingResponses);
            subscriber.onSubscribe(channelSubscription);
            channelSubscription.runBlocking();
        }, (Function1<Throwable, BoxedUnit>)(JProcedure1 & Serializable)error -> {
            wrappedSubscriber.onError(error);
            this.onError((Throwable)error);
        });
    }

    private void cancelPipelineFork() {
        if (this.pipelineForkFuture != null) {
            try {
                CancellableFork pipelineFork = (CancellableFork)Await$.MODULE$.result(this.pipelineForkFuture, (Duration)this.pipelineCancelationTimeout);
                Future<CancellableFork<BoxedUnit>> future = this.oxDispatcher.runAsync((Function1<Ox, BoxedUnit>)(JProcedure1 & Serializable)evidence$2 -> {
                    Option option;
                    Throwable throwable;
                    Object object = race$package$.MODULE$.raceSuccess(this::cancelPipelineFork$$anonfun$1$$anonfun$1, () -> OxProcessor.cancelPipelineFork$$anonfun$1$$anonfun$2(pipelineFork));
                    if (object instanceof Left && (throwable = (Throwable)((Left)object).value()) != null && !(option = NonFatal$.MODULE$.unapply(throwable)).isEmpty()) {
                        Throwable throwable2;
                        Throwable e = throwable2 = (Throwable)option.get();
                        this.logger.error("Error when canceling pipeline fork", e);
                        return;
                    }
                }, (Function1<Throwable, BoxedUnit>)(JProcedure1 & Serializable)e -> this.logger.error("Error when canceling pipeline fork", e));
            }
            catch (Throwable throwable) {
                Option option;
                Throwable throwable2 = throwable;
                if (throwable2 != null && !(option = NonFatal$.MODULE$.unapply(throwable2)).isEmpty()) {
                    Throwable throwable3;
                    Throwable e2 = throwable3 = (Throwable)option.get();
                    this.logger.error("Error when waiting for pipeline fork to start", e2);
                }
                throw throwable;
            }
            return;
        }
    }

    private void cancelSubscription() {
        if (this.requestsSubscription != null) {
            try {
                this.requestsSubscription.cancel();
            }
            catch (Throwable t) {
                throw new IllegalStateException(new StringBuilder(78).append(this.requestsSubscription).append(" violated the Reactive Streams rule 3.15 by throwing an exception from cancel.").toString(), t);
            }
            return;
        }
    }

    private final Object cancelPipelineFork$$anonfun$1$$anonfun$1() {
        Thread.sleep(this.pipelineCancelationTimeout.toMillis());
        this.logger.error(new StringBuilder(54).append("Pipeline fork cancelation did not complete in time (").append(this.pipelineCancelationTimeout).append(").").toString());
        return BoxedUnit.UNIT;
    }

    private static final Object cancelPipelineFork$$anonfun$1$$anonfun$2(CancellableFork pipelineFork$2) {
        return pipelineFork$2.cancel();
    }
}

