/*
 * Decompiled with CFR 0.152.
 */
package sttp.tapir.server.netty.sync.internal.ws;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.Semaphore;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ox.UnsupervisedFork;
import ox.channels.ChannelClosedException;
import ox.flow.Flow;
import ox.flow.Flow$;
import ox.fork$package$;
import ox.unsupervised$package$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.Some;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.ModuleSerializationProxy;
import scala.runtime.function.JProcedure1;
import sttp.tapir.DecodeResult;
import sttp.tapir.WebSocketBodyOutput;
import sttp.tapir.model.WebSocketFrameDecodeFailure;
import sttp.tapir.server.netty.internal.ws.WebSocketFrameConverters$;
import sttp.tapir.server.netty.sync.OxStreams;
import sttp.tapir.server.netty.sync.internal.ox.OxDispatcher;
import sttp.tapir.server.netty.sync.internal.reactivestreams.OxProcessor;
import sttp.ws.WebSocketFrame;

public final class OxSourceWebSocketProcessor$
implements Serializable {
    public static final Logger sttp$tapir$server$netty$sync$internal$ws$OxSourceWebSocketProcessor$$$logger;
    private static final FiniteDuration outgoingCloseAfterCloseTimeout;
    public static final OxSourceWebSocketProcessor$ MODULE$;

    private OxSourceWebSocketProcessor$() {
    }

    static {
        MODULE$ = new OxSourceWebSocketProcessor$();
        sttp$tapir$server$netty$sync$internal$ws$OxSourceWebSocketProcessor$$$logger = LoggerFactory.getLogger((String)MODULE$.getClass().getName());
        outgoingCloseAfterCloseTimeout = new package.DurationInt(package$.MODULE$.DurationInt(1)).second();
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(OxSourceWebSocketProcessor$.class);
    }

    public <REQ, RESP> Processor<io.netty.handler.codec.http.websocketx.WebSocketFrame, io.netty.handler.codec.http.websocketx.WebSocketFrame> apply(OxDispatcher oxDispatcher, Function1<Flow<REQ>, Flow<RESP>> processingPipe, WebSocketBodyOutput<Function1<Flow<REQ>, Flow<RESP>>, REQ, RESP, ?, OxStreams> o, ChannelHandlerContext ctx) {
        Function1 & Serializable frame2FramePipe = (Function1 & Serializable)incoming -> {
            Semaphore closeSignal = new Semaphore(0);
            Flow flow = incoming.map((Function1 & Serializable)f -> {
                WebSocketFrame sttpFrame = WebSocketFrameConverters$.MODULE$.nettyFrameToFrame(f);
                f.release();
                return sttpFrame;
            });
            Flow<WebSocketFrame> flow2 = this.takeUntilCloseFrame(o.decodeCloseRequests(), closeSignal, (Flow<WebSocketFrame>)flow);
            Flow flow3 = (Flow)processingPipe.apply((Object)this.optionallyConcatenateFrames(o.concatenateFragmentedFrames(), flow2).map((Function1 & Serializable)f -> this.decodeFrame$1(o, (WebSocketFrame)f)));
            return this.monitorOutgoingClosedAfterClientClose(closeSignal, flow3).map((Function1 & Serializable)r -> WebSocketFrameConverters$.MODULE$.frameToNettyFrame((WebSocketFrame)o.responses().encode(r)));
        };
        return new OxProcessor<io.netty.handler.codec.http.websocketx.WebSocketFrame, io.netty.handler.codec.http.websocketx.WebSocketFrame>(oxDispatcher, frame2FramePipe, (Function1 & Serializable)sub -> this.wrapSubscriberWithNettyCallback$1(ctx, (Subscriber)sub));
    }

    private Flow<WebSocketFrame> optionallyConcatenateFrames(boolean doConcatenate, Flow<WebSocketFrame> f) {
        if (doConcatenate) {
            return f.mapStateful(this::optionallyConcatenateFrames$$anonfun$1, WebSocketFrameConverters$.MODULE$.accumulateFrameState(), f.mapStateful$default$3(this::optionallyConcatenateFrames$$anonfun$2)).collect((PartialFunction)new Serializable(){

                public final boolean isDefinedAt(Option x) {
                    WebSocketFrame webSocketFrame;
                    Option option = x;
                    if (option instanceof Some && (webSocketFrame = (WebSocketFrame)((Some)option).value()) != null) {
                        WebSocketFrame f = webSocketFrame;
                        return true;
                    }
                    return false;
                }

                public final Object applyOrElse(Option x, Function1 function1) {
                    WebSocketFrame webSocketFrame;
                    Option option = x;
                    if (option instanceof Some && (webSocketFrame = (WebSocketFrame)((Some)option).value()) != null) {
                        WebSocketFrame f = webSocketFrame;
                        return f;
                    }
                    return function1.apply((Object)x);
                }
            });
        }
        return f;
    }

    private Flow<WebSocketFrame> takeUntilCloseFrame(boolean passAlongCloseFrame, Semaphore closeSignal, Flow<WebSocketFrame> f) {
        return f.takeWhile((Function1 & Serializable)x$1 -> {
            WebSocketFrame webSocketFrame = x$1;
            if (webSocketFrame instanceof WebSocketFrame.Close) {
                closeSignal.release();
                return false;
            }
            WebSocketFrame f = webSocketFrame;
            return true;
        }, passAlongCloseFrame);
    }

    private <T> Flow<T> monitorOutgoingClosedAfterClientClose(Semaphore closeSignal, Flow<T> outgoing) {
        return Flow$.MODULE$.usingEmit((Function1)(JProcedure1 & Serializable)emit -> unsupervised$package$.MODULE$.unsupervised((Function1)(JProcedure1 & Serializable)evidence$1 -> {
            UnsupervisedFork unsupervisedFork = fork$package$.MODULE$.forkUnsupervised((Function0 & Serializable)() -> {
                OxSourceWebSocketProcessor$.monitorOutgoingClosedAfterClientClose$$anonfun$1$$anonfun$1$$anonfun$1(closeSignal);
                return BoxedUnit.UNIT;
            }, evidence$1);
            outgoing.runToEmit(emit);
        }));
    }

    private final Object decodeFrame$1(WebSocketBodyOutput o$1, WebSocketFrame f) {
        DecodeResult decodeResult = o$1.requests().decode((Object)f);
        if (decodeResult instanceof DecodeResult.Failure) {
            DecodeResult.Failure failure = (DecodeResult.Failure)decodeResult;
            throw new WebSocketFrameDecodeFailure(f, failure);
        }
        if (decodeResult instanceof DecodeResult.Value) {
            DecodeResult.Value x = (DecodeResult.Value)decodeResult;
            return x.v();
        }
        throw new MatchError((Object)decodeResult);
    }

    private final Subscriber wrapSubscriberWithNettyCallback$1(ChannelHandlerContext ctx$2, Subscriber sub) {
        return new Subscriber<B>(sub, ctx$2){
            private final Subscriber sub$1;
            private final ChannelHandlerContext ctx$1;
            {
                this.sub$1 = sub$2;
                this.ctx$1 = ctx$3;
            }

            public void onSubscribe(Subscription s) {
                this.sub$1.onSubscribe(s);
            }

            public void onNext(Object t) {
                this.sub$1.onNext(t);
            }

            public void onError(Throwable t) {
                ChannelClosedException.Error error;
                Throwable throwable;
                Throwable throwable2 = t;
                if (throwable2 instanceof ChannelClosedException.Error && (throwable = (error = ChannelClosedException.Error$.MODULE$.unapply((ChannelClosedException.Error)throwable2))._1()) instanceof IOException) {
                    IOException e = (IOException)throwable;
                    OxSourceWebSocketProcessor$.sttp$tapir$server$netty$sync$internal$ws$OxSourceWebSocketProcessor$$$logger.info("Web Socket channel closed abnormally", (Throwable)e);
                } else {
                    Throwable e = throwable2;
                    OxSourceWebSocketProcessor$.sttp$tapir$server$netty$sync$internal$ws$OxSourceWebSocketProcessor$$$logger.error("Web Socket channel closed abnormally", e);
                }
                ChannelFuture channelFuture = this.ctx$1.writeAndFlush((Object)new CloseWebSocketFrame(WebSocketCloseStatus.INTERNAL_SERVER_ERROR, "Internal Server Error"));
                this.sub$1.onError(t);
            }

            public void onComplete() {
                ChannelFuture channelFuture = this.ctx$1.writeAndFlush((Object)new CloseWebSocketFrame(WebSocketCloseStatus.NORMAL_CLOSURE, "normal closure"));
                this.sub$1.onComplete();
            }
        };
    }

    private final Option initializeState$1$1() {
        return None$.MODULE$;
    }

    private final Option optionallyConcatenateFrames$$anonfun$1() {
        return this.initializeState$1$1();
    }

    private final Option optionallyConcatenateFrames$$anonfun$2() {
        return this.initializeState$1$1();
    }

    private static final void monitorOutgoingClosedAfterClientClose$$anonfun$1$$anonfun$1$$anonfun$1(Semaphore closeSignal$4) {
        closeSignal$4.acquire();
        Thread.sleep(outgoingCloseAfterCloseTimeout.toMillis());
        sttp$tapir$server$netty$sync$internal$ws$OxSourceWebSocketProcessor$$$logger.error(new StringBuilder(157).append("WebSocket outgoing messages flow either not drained, or not closed, ").append(new StringBuilder(48).append(outgoingCloseAfterCloseTimeout).append(" after receiving a close frame from the client! ").toString()).append("Make sure to complete the outgoing flow in your pipeline, once the incoming ").append("flow is done!").toString());
    }
}

