package sttp.tapir.server.netty.sync.internal.ws;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.Semaphore;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ox.channels.ChannelClosedException;
import ox.channels.ChannelClosedException$Error$;
import ox.flow.Flow;
import ox.flow.Flow$;
import ox.fork$package$;
import ox.unsupervised$package$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.ModuleSerializationProxy;
import sttp.tapir.DecodeResult;
import sttp.tapir.WebSocketBodyOutput;
import sttp.tapir.model.WebSocketFrameDecodeFailure;
import sttp.tapir.server.netty.internal.ws.WebSocketFrameConverters$;
import sttp.tapir.server.netty.sync.OxStreams;
import sttp.tapir.server.netty.sync.internal.ox.OxDispatcher;
import sttp.tapir.server.netty.sync.internal.reactivestreams.OxProcessor;
import sttp.ws.WebSocketFrame;

/* compiled from: OxSourceWebSocketProcessor.scala */
/* loaded from: input_file:sttp/tapir/server/netty/sync/internal/ws/OxSourceWebSocketProcessor$.class */
public final class OxSourceWebSocketProcessor$ implements Serializable {
    public static final OxSourceWebSocketProcessor$ MODULE$ = new OxSourceWebSocketProcessor$();
    public static final Logger sttp$tapir$server$netty$sync$internal$ws$OxSourceWebSocketProcessor$$$logger = LoggerFactory.getLogger(MODULE$.getClass().getName());
    private static final FiniteDuration outgoingCloseAfterCloseTimeout = new package.DurationInt(package$.MODULE$.DurationInt(1)).second();

    private OxSourceWebSocketProcessor$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(OxSourceWebSocketProcessor$.class);
    }

    public <REQ, RESP> Processor<WebSocketFrame, WebSocketFrame> apply(OxDispatcher oxDispatcher, Function1<Flow<REQ>, Flow<RESP>> function1, WebSocketBodyOutput<Function1<Flow<REQ>, Flow<RESP>>, REQ, RESP, ?, OxStreams> webSocketBodyOutput, ChannelHandlerContext channelHandlerContext) {
        return new OxProcessor(oxDispatcher, flow -> {
            Semaphore semaphore = new Semaphore(0);
            return monitorOutgoingClosedAfterClientClose(semaphore, (Flow) function1.apply(optionallyConcatenateFrames(webSocketBodyOutput.concatenateFragmentedFrames(), takeUntilCloseFrame(webSocketBodyOutput.decodeCloseRequests(), semaphore, flow.map(webSocketFrame -> {
                sttp.ws.WebSocketFrame nettyFrameToFrame = WebSocketFrameConverters$.MODULE$.nettyFrameToFrame(webSocketFrame);
                webSocketFrame.release();
                return nettyFrameToFrame;
            }))).map(webSocketFrame2 -> {
                return decodeFrame$1(webSocketBodyOutput, webSocketFrame2);
            }))).map(obj -> {
                return WebSocketFrameConverters$.MODULE$.frameToNettyFrame((sttp.ws.WebSocketFrame) webSocketBodyOutput.responses().encode(obj));
            });
        }, subscriber -> {
            return wrapSubscriberWithNettyCallback$1(channelHandlerContext, subscriber);
        });
    }

    private Flow<sttp.ws.WebSocketFrame> optionallyConcatenateFrames(boolean z, Flow<sttp.ws.WebSocketFrame> flow) {
        return z ? flow.mapStateful(this::optionallyConcatenateFrames$$anonfun$1, WebSocketFrameConverters$.MODULE$.accumulateFrameState(), flow.mapStateful$default$3(this::optionallyConcatenateFrames$$anonfun$2)).collect(new OxSourceWebSocketProcessor$$anon$2()) : flow;
    }

    private Flow<sttp.ws.WebSocketFrame> takeUntilCloseFrame(boolean z, Semaphore semaphore, Flow<sttp.ws.WebSocketFrame> flow) {
        return flow.takeWhile(webSocketFrame -> {
            if (!(webSocketFrame instanceof WebSocketFrame.Close)) {
                return true;
            }
            semaphore.release();
            return false;
        }, z);
    }

    private <T> Flow<T> monitorOutgoingClosedAfterClientClose(Semaphore semaphore, Flow<T> flow) {
        return Flow$.MODULE$.usingEmit(flowEmit -> {
            unsupervised$package$.MODULE$.unsupervised(oxUnsupervised -> {
                fork$package$.MODULE$.forkUnsupervised(() -> {
                    monitorOutgoingClosedAfterClientClose$$anonfun$1$$anonfun$1$$anonfun$1(semaphore);
                    return BoxedUnit.UNIT;
                }, oxUnsupervised);
                flow.runToEmit(flowEmit);
            });
        });
    }

    private final Object decodeFrame$1(WebSocketBodyOutput webSocketBodyOutput, sttp.ws.WebSocketFrame webSocketFrame) {
        DecodeResult.Failure decode = webSocketBodyOutput.requests().decode(webSocketFrame);
        if (decode instanceof DecodeResult.Failure) {
            throw new WebSocketFrameDecodeFailure(webSocketFrame, decode);
        }
        if (decode instanceof DecodeResult.Value) {
            return ((DecodeResult.Value) decode).v();
        }
        throw new MatchError(decode);
    }

    private final Subscriber wrapSubscriberWithNettyCallback$1(final ChannelHandlerContext channelHandlerContext, final Subscriber subscriber) {
        return new Subscriber<B>(subscriber, channelHandlerContext) { // from class: sttp.tapir.server.netty.sync.internal.ws.OxSourceWebSocketProcessor$$anon$1
            private final Subscriber sub$1;
            private final ChannelHandlerContext ctx$1;

            {
                this.sub$1 = subscriber;
                this.ctx$1 = channelHandlerContext;
            }

            public void onSubscribe(Subscription subscription) {
                this.sub$1.onSubscribe(subscription);
            }

            public void onNext(Object obj) {
                this.sub$1.onNext(obj);
            }

            public void onError(Throwable th) {
                if (th instanceof ChannelClosedException.Error) {
                    Throwable _1 = ChannelClosedException$Error$.MODULE$.unapply((ChannelClosedException.Error) th)._1();
                    if (_1 instanceof IOException) {
                        OxSourceWebSocketProcessor$.sttp$tapir$server$netty$sync$internal$ws$OxSourceWebSocketProcessor$$$logger.info("Web Socket channel closed abnormally", (IOException) _1);
                        this.ctx$1.writeAndFlush(new CloseWebSocketFrame(WebSocketCloseStatus.INTERNAL_SERVER_ERROR, "Internal Server Error"));
                        this.sub$1.onError(th);
                    }
                }
                OxSourceWebSocketProcessor$.sttp$tapir$server$netty$sync$internal$ws$OxSourceWebSocketProcessor$$$logger.error("Web Socket channel closed abnormally", th);
                this.ctx$1.writeAndFlush(new CloseWebSocketFrame(WebSocketCloseStatus.INTERNAL_SERVER_ERROR, "Internal Server Error"));
                this.sub$1.onError(th);
            }

            public void onComplete() {
                this.ctx$1.writeAndFlush(new CloseWebSocketFrame(WebSocketCloseStatus.NORMAL_CLOSURE, "normal closure"));
                this.sub$1.onComplete();
            }
        };
    }

    private final Option initializeState$1$1() {
        return None$.MODULE$;
    }

    private final Option optionallyConcatenateFrames$$anonfun$1() {
        return initializeState$1$1();
    }

    private final Option optionallyConcatenateFrames$$anonfun$2() {
        return initializeState$1$1();
    }

    private static final void monitorOutgoingClosedAfterClientClose$$anonfun$1$$anonfun$1$$anonfun$1(Semaphore semaphore) {
        semaphore.acquire();
        Thread.sleep(outgoingCloseAfterCloseTimeout.toMillis());
        sttp$tapir$server$netty$sync$internal$ws$OxSourceWebSocketProcessor$$$logger.error(new StringBuilder(157).append("WebSocket outgoing messages flow either not drained, or not closed, ").append(new StringBuilder(48).append(outgoingCloseAfterCloseTimeout).append(" after receiving a close frame from the client! ").toString()).append("Make sure to complete the outgoing flow in your pipeline, once the incoming ").append("flow is done!").toString());
    }
}
