/*
 * Decompiled with CFR 0.152.
 */
package com.dremio.jdbc.shaded.com.dremio.exec.rpc;

import com.dremio.jdbc.shaded.com.dremio.common.exceptions.ErrorHelper;
import com.dremio.jdbc.shaded.com.dremio.common.exceptions.OutOfMemoryOrResourceExceptionContext;
import com.dremio.jdbc.shaded.com.dremio.common.exceptions.UserException;
import com.dremio.jdbc.shaded.com.dremio.common.memory.MemoryDebugInfo;
import com.dremio.jdbc.shaded.com.dremio.exec.proto.GeneralRPCProtos;
import com.dremio.jdbc.shaded.com.dremio.exec.rpc.InboundRpcMessage;
import com.dremio.jdbc.shaded.com.dremio.exec.rpc.OutboundRpcMessage;
import com.dremio.jdbc.shaded.com.dremio.exec.rpc.RpcEncoder;
import com.dremio.jdbc.shaded.com.google.protobuf.CodedInputStream;
import com.dremio.jdbc.shaded.com.google.protobuf.MessageLite;
import com.dremio.jdbc.shaded.io.netty.buffer.ByteBuf;
import com.dremio.jdbc.shaded.io.netty.buffer.ByteBufInputStream;
import com.dremio.jdbc.shaded.io.netty.buffer.NettyArrowBuf;
import com.dremio.jdbc.shaded.io.netty.channel.ChannelFuture;
import com.dremio.jdbc.shaded.io.netty.channel.ChannelFutureListener;
import com.dremio.jdbc.shaded.io.netty.channel.ChannelHandlerContext;
import com.dremio.jdbc.shaded.io.netty.handler.codec.ByteToMessageDecoder;
import com.dremio.jdbc.shaded.io.netty.handler.codec.CorruptedFrameException;
import com.dremio.jdbc.shaded.org.apache.arrow.memory.BufferAllocator;
import com.dremio.jdbc.shaded.org.apache.arrow.memory.OutOfMemoryException;
import com.dremio.jdbc.shaded.org.slf4j.Logger;
import com.dremio.jdbc.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class MessageDecoder
extends ByteToMessageDecoder {
    private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class);
    private BufferAllocator allocator;
    private final AtomicLong messageCounter = new AtomicLong();

    public MessageDecoder(BufferAllocator allocator) {
        this.setCumulator(COMPOSITE_CUMULATOR);
        this.allocator = allocator;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int length = MessageDecoder.decodeLengthFromMessage(ctx, in);
        if (length == -1) {
            return;
        }
        ByteBuf frame = in.slice(in.readerIndex(), length);
        try {
            InboundRpcMessage message = this.decodeMessage(ctx, frame, length);
            if (message != null) {
                out.add(message);
            }
        }
        finally {
            in.skipBytes(length);
        }
    }

    public static int decodeLengthFromMessage(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        int length = 0;
        if (!ctx.channel().isOpen()) {
            if (in.readableBytes() > 0) {
                logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", (Object)in.readableBytes());
            }
            in.skipBytes(in.readableBytes());
            return -1;
        }
        in.markReaderIndex();
        byte[] buf = new byte[5];
        for (int i = 0; i < buf.length; ++i) {
            if (!in.isReadable()) {
                in.resetReaderIndex();
                return -1;
            }
            buf[i] = in.readByte();
            if (buf[i] < 0) continue;
            length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32();
            if (length < 0) {
                throw new CorruptedFrameException("negative length: " + length);
            }
            if (length == 0) {
                throw new CorruptedFrameException("Received a message of length 0.");
            }
            if (in.readableBytes() >= length) break;
            in.resetReaderIndex();
            return -1;
        }
        return length;
    }

    private InboundRpcMessage decodeMessage(ChannelHandlerContext ctx, ByteBuf frame, int length) throws Exception {
        ByteBufInputStream is = new ByteBufInputStream(frame, length);
        this.checkTag(is, RpcEncoder.HEADER_TAG);
        GeneralRPCProtos.RpcHeader header = GeneralRPCProtos.RpcHeader.parseDelimitedFrom(is);
        this.checkTag(is, RpcEncoder.PROTOBUF_BODY_TAG);
        int pBodyLength = MessageDecoder.readRawVarint32(is);
        byte[] pBody = new byte[pBodyLength];
        frame.readBytes(pBody);
        NettyArrowBuf dBody = null;
        if (frame.readableBytes() > 0) {
            this.checkTag(is, RpcEncoder.RAW_BODY_TAG);
            int dBodyLength = MessageDecoder.readRawVarint32(is);
            if (frame.readableBytes() != dBodyLength) {
                throw new CorruptedFrameException(String.format("Expected to receive a raw body of %d bytes but received a buffer with %d bytes.", dBodyLength, frame.readableBytes()));
            }
            try {
                dBody = NettyArrowBuf.unwrapBuffer(this.allocator.buffer(dBodyLength));
                ((ByteBuf)dBody).writeBytes(frame.nioBuffer(frame.readerIndex(), dBodyLength));
            }
            catch (OutOfMemoryException | OutOfMemoryError e) {
                if (ErrorHelper.isDirectMemoryException(e)) {
                    this.sendOutOfMemory(e, ctx, header.getCoordinationId());
                    return null;
                }
                throw e;
            }
        }
        InboundRpcMessage m4 = new InboundRpcMessage(header.getMode(), header.getRpcType(), header.getCoordinationId(), pBody, dBody);
        return m4;
    }

    private void sendOutOfMemory(Throwable e, ChannelHandlerContext ctx, int coordinationId) {
        UserException uex;
        String oomDetails = "Out of memory while receiving data.";
        if (ErrorHelper.isDirectMemoryException(e)) {
            if (e instanceof OutOfMemoryException) {
                oomDetails = String.format("Out of memory while receiving data. %s ", MemoryDebugInfo.getDetailsOnAllocationFailure((OutOfMemoryException)e, this.allocator));
            }
            uex = UserException.memoryError(e).setAdditionalExceptionContext(new OutOfMemoryOrResourceExceptionContext(OutOfMemoryOrResourceExceptionContext.MemoryType.DIRECT_MEMORY, oomDetails)).build(logger);
        } else {
            uex = ErrorHelper.isJavaHeapOutOfMemory(e) ? UserException.memoryError(e).setAdditionalExceptionContext(new OutOfMemoryOrResourceExceptionContext(OutOfMemoryOrResourceExceptionContext.MemoryType.HEAP_MEMORY, oomDetails)).build(logger) : UserException.resourceError(e).message(oomDetails).build(logger);
        }
        OutboundRpcMessage outMessage = new OutboundRpcMessage(GeneralRPCProtos.RpcMode.RESPONSE_FAILURE, 0, coordinationId, (MessageLite)uex.getOrCreatePBError(false), new ByteBuf[0]);
        ChannelFuture future = ctx.writeAndFlush(outMessage);
        future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    }

    private void checkTag(ByteBufInputStream is, int expectedTag) throws IOException {
        int actualTag = MessageDecoder.readRawVarint32(is);
        if (actualTag != expectedTag) {
            throw new CorruptedFrameException(String.format("Expected to read a tag of %d but actually received a value of %d.  Happened after reading %d message.", expectedTag, actualTag, this.messageCounter.get()));
        }
    }

    public static int readRawVarint32(ByteBufInputStream is) throws IOException {
        byte tmp = is.readByte();
        if (tmp >= 0) {
            return tmp;
        }
        int result = tmp & 0x7F;
        tmp = is.readByte();
        if (tmp >= 0) {
            result |= tmp << 7;
        } else {
            result |= (tmp & 0x7F) << 7;
            tmp = is.readByte();
            if (tmp >= 0) {
                result |= tmp << 14;
            } else {
                result |= (tmp & 0x7F) << 14;
                tmp = is.readByte();
                if (tmp >= 0) {
                    result |= tmp << 21;
                } else {
                    result |= (tmp & 0x7F) << 21;
                    tmp = is.readByte();
                    result |= tmp << 28;
                    if (tmp < 0) {
                        for (int i = 0; i < 5; ++i) {
                            if (is.readByte() < 0) continue;
                            return result;
                        }
                        throw new CorruptedFrameException("Encountered a malformed varint.");
                    }
                }
            }
        }
        return result;
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }
}

