/*
 * Decompiled with CFR 0.152.
 */
package com.dremio.jdbc.shaded.com.dremio.sabot.rpc.user;

import com.dremio.jdbc.shaded.com.dremio.common.exceptions.UserException;
import com.dremio.jdbc.shaded.com.dremio.common.exceptions.UserRemoteException;
import com.dremio.jdbc.shaded.com.dremio.common.utils.protos.QueryIdHelper;
import com.dremio.jdbc.shaded.com.dremio.exec.proto.UserBitShared;
import com.dremio.jdbc.shaded.com.dremio.exec.rpc.BaseRpcOutcomeListener;
import com.dremio.jdbc.shaded.com.dremio.exec.rpc.BasicClientWithConnection;
import com.dremio.jdbc.shaded.com.dremio.exec.rpc.ConnectionThrottle;
import com.dremio.jdbc.shaded.com.dremio.exec.rpc.RpcBus;
import com.dremio.jdbc.shaded.com.dremio.exec.rpc.RpcConnectionHandler;
import com.dremio.jdbc.shaded.com.dremio.exec.rpc.RpcException;
import com.dremio.jdbc.shaded.com.dremio.exec.rpc.RpcOutcomeListener;
import com.dremio.jdbc.shaded.com.dremio.sabot.rpc.user.QueryDataBatch;
import com.dremio.jdbc.shaded.com.dremio.sabot.rpc.user.UserResultsListener;
import com.dremio.jdbc.shaded.com.google.common.base.Throwables;
import com.dremio.jdbc.shaded.com.google.common.collect.Maps;
import com.dremio.jdbc.shaded.com.google.common.collect.Queues;
import com.dremio.jdbc.shaded.io.netty.buffer.ByteBuf;
import com.dremio.jdbc.shaded.io.netty.buffer.NettyArrowBuf;
import com.dremio.jdbc.shaded.io.netty.util.concurrent.Future;
import com.dremio.jdbc.shaded.io.netty.util.concurrent.GenericFutureListener;
import com.dremio.jdbc.shaded.org.apache.arrow.memory.ArrowBuf;
import com.dremio.jdbc.shaded.org.slf4j.Logger;
import com.dremio.jdbc.shaded.org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

public class QueryResultHandler {
    private static final Logger logger = LoggerFactory.getLogger(QueryResultHandler.class);
    private final ConcurrentMap<UserBitShared.QueryId, UserResultsListener> queryIdToResultsListenersMap = Maps.newConcurrentMap();

    public RpcOutcomeListener<UserBitShared.QueryId> getWrappedListener(UserResultsListener resultsListener) {
        return new SubmissionListener(resultsListener);
    }

    public RpcConnectionHandler<BasicClientWithConnection.ServerConnection> getWrappedConnectionHandler(RpcConnectionHandler<BasicClientWithConnection.ServerConnection> handler) {
        return new ChannelClosedHandler(handler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resultArrived(byte[] pBody) throws RpcException {
        boolean isTerminalResult;
        UserBitShared.QueryResult queryResult = RpcBus.get(pBody, UserBitShared.QueryResult.parser());
        UserBitShared.QueryId queryId = queryResult.getQueryId();
        UserBitShared.QueryResult.QueryState queryState = queryResult.getQueryState();
        if (logger.isDebugEnabled()) {
            logger.debug("resultArrived: queryState: {}, queryId = {}", (Object)queryState, (Object)QueryIdHelper.getQueryId(queryId));
        }
        assert (queryResult.hasQueryState()) : "received query result without QueryState";
        boolean isFailureResult = UserBitShared.QueryResult.QueryState.FAILED == queryState;
        switch (queryState) {
            case FAILED: 
            case CANCELED: 
            case COMPLETED: {
                isTerminalResult = true;
                break;
            }
            default: {
                logger.error("Unexpected/unhandled QueryState " + String.valueOf(queryState) + " (for query " + String.valueOf(queryId) + ")");
                isTerminalResult = false;
            }
        }
        assert (isFailureResult || queryResult.getErrorCount() == 0) : "Error count for the query batch is non-zero but QueryState != FAILED";
        UserResultsListener resultsListener = this.newUserResultsListener(queryId);
        try {
            if (isFailureResult) {
                resultsListener.submissionFailed(UserRemoteException.create(queryResult.getError(0)));
            } else if (isTerminalResult) {
                resultsListener.queryCompleted(queryState);
            } else {
                logger.warn("queryState {} was ignored", (Object)queryState);
            }
        }
        catch (Throwable t2) {
            resultsListener.submissionFailed(UserException.systemError(t2).build(logger));
            Throwables.propagateIfPossible(t2);
        }
        finally {
            if (isTerminalResult && (!(resultsListener instanceof BufferingResultsListener) || ((BufferingResultsListener)resultsListener).output != null)) {
                this.queryIdToResultsListenersMap.remove(queryId, resultsListener);
            }
        }
    }

    public void batchArrived(ConnectionThrottle throttle, byte[] pBody, ByteBuf dBody) throws RpcException {
        UserBitShared.QueryData queryData = RpcBus.get(pBody, UserBitShared.QueryData.parser());
        ArrowBuf arrowBuf = dBody != null ? ((NettyArrowBuf)dBody).arrowBuf() : null;
        QueryDataBatch batch = new QueryDataBatch(queryData, arrowBuf);
        UserBitShared.QueryId queryId = queryData.getQueryId();
        if (logger.isDebugEnabled()) {
            logger.debug("batchArrived: queryId = {}", (Object)QueryIdHelper.getQueryId(queryId));
        }
        logger.trace("batchArrived: batch = {}", (Object)batch);
        UserResultsListener resultsListener = this.newUserResultsListener(queryId);
        try {
            resultsListener.dataArrived(batch, throttle);
        }
        catch (Exception e) {
            batch.release();
            resultsListener.submissionFailed(UserException.systemError(e).build(logger));
        }
    }

    private UserResultsListener newUserResultsListener(UserBitShared.QueryId queryId) {
        BufferingResultsListener bl;
        UserResultsListener resultsListener = (UserResultsListener)this.queryIdToResultsListenersMap.get(queryId);
        logger.trace("For QueryId [{}], retrieved results listener {}", (Object)queryId, (Object)resultsListener);
        if (null == resultsListener && null == (resultsListener = this.queryIdToResultsListenersMap.putIfAbsent(queryId, bl = new BufferingResultsListener()))) {
            resultsListener = bl;
        }
        return resultsListener;
    }

    private class SubmissionListener
    extends BaseRpcOutcomeListener<UserBitShared.QueryId> {
        private final UserResultsListener resultsListener;
        private final AtomicBoolean isTerminal = new AtomicBoolean(false);

        public SubmissionListener(UserResultsListener resultsListener) {
            this.resultsListener = resultsListener;
        }

        @Override
        public void failed(RpcException ex) {
            if (!this.isTerminal.compareAndSet(false, true)) {
                logger.warn("Received multiple responses to run query request.");
                return;
            }
            this.resultsListener.submissionFailed(UserException.systemError(ex).addContext("Query submission to SabotNode failed.").build(logger));
        }

        @Override
        public void success(UserBitShared.QueryId queryId, ByteBuf buf) {
            UserResultsListener oldListener;
            if (!this.isTerminal.compareAndSet(false, true)) {
                logger.warn("Received multiple responses to run query request.");
                return;
            }
            this.resultsListener.queryIdArrived(queryId);
            if (logger.isDebugEnabled()) {
                logger.debug("Received QueryId {} successfully. Adding results listener {}.", (Object)QueryIdHelper.getQueryId(queryId), (Object)this.resultsListener);
            }
            if ((oldListener = QueryResultHandler.this.queryIdToResultsListenersMap.putIfAbsent(queryId, this.resultsListener)) != null) {
                logger.debug("Unable to place user results listener, buffering listener was already in place.");
                if (oldListener instanceof BufferingResultsListener) {
                    boolean all = ((BufferingResultsListener)oldListener).transferTo(this.resultsListener);
                    if (all) {
                        QueryResultHandler.this.queryIdToResultsListenersMap.remove(queryId);
                    } else {
                        boolean replaced = QueryResultHandler.this.queryIdToResultsListenersMap.replace(queryId, oldListener, this.resultsListener);
                        if (!replaced) {
                            throw new IllegalStateException();
                        }
                    }
                } else {
                    throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
                }
            }
        }

        @Override
        public void interrupted(InterruptedException ex) {
            if (!this.isTerminal.compareAndSet(false, true)) {
                logger.warn("Received multiple responses to run query request.");
                return;
            }
            this.resultsListener.submissionFailed(UserException.systemError(ex).addContext("The client had been asked to wait as the SabotNode is potentially being over-utilized. But the client was interrupted while waiting.").build(logger));
        }
    }

    private class ChannelClosedHandler
    implements RpcConnectionHandler<BasicClientWithConnection.ServerConnection> {
        private final RpcConnectionHandler<BasicClientWithConnection.ServerConnection> parentHandler;

        public ChannelClosedHandler(RpcConnectionHandler<BasicClientWithConnection.ServerConnection> parentHandler) {
            this.parentHandler = parentHandler;
        }

        @Override
        public void connectionSucceeded(final BasicClientWithConnection.ServerConnection connection) {
            connection.getChannel().closeFuture().addListener((GenericFutureListener<? extends Future<? super Void>>)new GenericFutureListener<Future<? super Void>>(){

                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    for (UserResultsListener listener : QueryResultHandler.this.queryIdToResultsListenersMap.values()) {
                        listener.submissionFailed(UserException.connectionError().message("Connection %s closed unexpectedly. SabotNode down?", connection.getName()).build(logger));
                        if (!(listener instanceof BufferingResultsListener)) continue;
                        logger.warn("Buffering listener failed before results were transferred to the actual listener.");
                    }
                }
            });
            this.parentHandler.connectionSucceeded(connection);
        }

        @Override
        public void connectionFailed(RpcConnectionHandler.FailureType type, Throwable t2) {
            this.parentHandler.connectionFailed(type, t2);
        }
    }

    private static final class BufferingResultsListener
    implements UserResultsListener {
        private ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
        private volatile UserException ex;
        private volatile UserBitShared.QueryResult.QueryState queryState;
        private volatile UserResultsListener output;
        private volatile ConnectionThrottle throttle;

        private BufferingResultsListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean transferTo(UserResultsListener l) {
            BufferingResultsListener bufferingResultsListener = this;
            synchronized (bufferingResultsListener) {
                this.output = l;
                for (QueryDataBatch r : this.results) {
                    l.dataArrived(r, this.throttle);
                }
                if (this.ex != null) {
                    l.submissionFailed(this.ex);
                    return true;
                }
                if (this.queryState != null) {
                    l.queryCompleted(this.queryState);
                    return true;
                }
                return false;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void queryCompleted(UserBitShared.QueryResult.QueryState state) {
            assert (this.queryState == null);
            this.queryState = state;
            BufferingResultsListener bufferingResultsListener = this;
            synchronized (bufferingResultsListener) {
                if (this.output != null) {
                    this.output.queryCompleted(state);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
            this.throttle = throttle;
            BufferingResultsListener bufferingResultsListener = this;
            synchronized (bufferingResultsListener) {
                if (this.output == null) {
                    this.results.add(result);
                } else {
                    this.output.dataArrived(result, throttle);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void submissionFailed(UserException ex) {
            assert (this.queryState == null);
            this.queryState = UserBitShared.QueryResult.QueryState.FAILED;
            BufferingResultsListener bufferingResultsListener = this;
            synchronized (bufferingResultsListener) {
                if (this.output == null) {
                    this.ex = ex;
                } else {
                    this.output.submissionFailed(ex);
                }
            }
        }

        @Override
        public void queryIdArrived(UserBitShared.QueryId queryId) {
        }
    }
}

