package sttp.tapir.server.netty.sync.internal.reactivestreams;

import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ox.CancellableFork;
import ox.channels.BufferCapacity$package$BufferCapacity$;
import ox.channels.Channel;
import ox.channels.Channel$;
import ox.channels.ChannelClosed;
import ox.flow.Flow;
import ox.flow.Flow$;
import ox.flow.FlowRunOps;
import ox.race$package$;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.concurrent.Await$;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.BoxedUnit;
import scala.util.Left;
import scala.util.control.NonFatal$;
import sttp.tapir.server.netty.sync.internal.ox.OxDispatcher;

/* compiled from: OxProcessor.scala */
/* loaded from: input_file:sttp/tapir/server/netty/sync/internal/reactivestreams/OxProcessor.class */
public class OxProcessor<A, B> implements Processor<A, B> {
    private final OxDispatcher oxDispatcher;
    private final Function1<Flow<A>, Flow<B>> pipeline;
    private final Function1<Subscriber<? super B>, Subscriber<? super B>> wrapSubscriber;
    private volatile Subscription requestsSubscription;
    private volatile Future<CancellableFork<BoxedUnit>> pipelineForkFuture;
    private final Logger logger = LoggerFactory.getLogger(getClass().getName());
    private final Channel<A> channel = Channel$.MODULE$.buffered(1);
    private final FiniteDuration pipelineCancelationTimeout = new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds();

    public OxProcessor(OxDispatcher oxDispatcher, Function1<Flow<A>, Flow<B>> function1, Function1<Subscriber<? super B>, Subscriber<? super B>> function12) {
        this.oxDispatcher = oxDispatcher;
        this.pipeline = function1;
        this.wrapSubscriber = function12;
    }

    public void onError(Throwable th) {
        if (th == null) {
            throw null;
        }
        this.channel.errorOrClosed(th);
        cancelPipelineFork();
    }

    public void onNext(A a) {
        if (a == null) {
            throw new NullPointerException("Element cannot be null");
        }
        Object sendOrClosed = this.channel.sendOrClosed(a);
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
        if (boxedUnit == null) {
            if (sendOrClosed == null) {
                return;
            }
        } else if (boxedUnit.equals(sendOrClosed)) {
            return;
        }
        if (!(sendOrClosed instanceof ChannelClosed)) {
            throw new MatchError(sendOrClosed);
        }
        cancelSubscription();
        onError(new IllegalStateException("onNext called when the channel is closed"));
    }

    public void onSubscribe(Subscription subscription) {
        if (subscription == null) {
            throw new NullPointerException("Subscription cannot be null");
        }
        if (this.requestsSubscription != null) {
            subscription.cancel();
        } else {
            this.requestsSubscription = subscription;
            subscription.request(1L);
        }
    }

    public void onComplete() {
        this.channel.doneOrClosed();
        cancelPipelineFork();
    }

    public void subscribe(Subscriber<? super B> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException("Subscriber cannot be null");
        }
        Subscriber subscriber2 = (Subscriber) this.wrapSubscriber.apply(subscriber);
        this.pipelineForkFuture = this.oxDispatcher.runAsync(ox -> {
            ChannelSubscription channelSubscription = new ChannelSubscription(subscriber2, ((FlowRunOps) this.pipeline.apply(Flow$.MODULE$.fromSource(this.channel).tap(obj -> {
                this.requestsSubscription.request(1L);
            }))).runToChannel(ox, BufferCapacity$package$BufferCapacity$.MODULE$.default()));
            subscriber.onSubscribe(channelSubscription);
            channelSubscription.runBlocking();
        }, th -> {
            subscriber2.onError(th);
            onError(th);
        });
    }

    private void cancelPipelineFork() {
        if (this.pipelineForkFuture != null) {
            try {
                CancellableFork cancellableFork = (CancellableFork) Await$.MODULE$.result(this.pipelineForkFuture, this.pipelineCancelationTimeout);
                this.oxDispatcher.runAsync(ox -> {
                    Throwable th;
                    Object raceSuccess = race$package$.MODULE$.raceSuccess(this::cancelPipelineFork$$anonfun$1$$anonfun$1, () -> {
                        return cancelPipelineFork$$anonfun$1$$anonfun$2(r2);
                    });
                    if (!(raceSuccess instanceof Left) || (th = (Throwable) ((Left) raceSuccess).value()) == null) {
                        return;
                    }
                    Option unapply = NonFatal$.MODULE$.unapply(th);
                    if (unapply.isEmpty()) {
                        return;
                    }
                    this.logger.error("Error when canceling pipeline fork", (Throwable) unapply.get());
                }, th -> {
                    this.logger.error("Error when canceling pipeline fork", th);
                });
            } catch (Throwable th2) {
                if (th2 != null) {
                    Option unapply = NonFatal$.MODULE$.unapply(th2);
                    if (!unapply.isEmpty()) {
                        this.logger.error("Error when waiting for pipeline fork to start", (Throwable) unapply.get());
                        return;
                    }
                }
                throw th2;
            }
        }
    }

    /* JADX WARN: Unreachable blocks removed: 1, instructions: 1 */
    private void cancelSubscription() {
        if (this.requestsSubscription != null) {
            try {
                this.requestsSubscription.cancel();
            } catch (Throwable th) {
                throw new IllegalStateException(new StringBuilder(78).append(this.requestsSubscription).append(" violated the Reactive Streams rule 3.15 by throwing an exception from cancel.").toString(), th);
            }
        }
    }

    private final Object cancelPipelineFork$$anonfun$1$$anonfun$1() {
        Thread.sleep(this.pipelineCancelationTimeout.toMillis());
        this.logger.error(new StringBuilder(54).append("Pipeline fork cancelation did not complete in time (").append(this.pipelineCancelationTimeout).append(").").toString());
        return BoxedUnit.UNIT;
    }

    private static final Object cancelPipelineFork$$anonfun$1$$anonfun$2(CancellableFork cancellableFork) {
        return cancellableFork.cancel();
    }
}
